Skip to content

proxystore.connectors.endpoint

Endpoint connector implementation.

EndpointConnectorError

Bases: Exception

Exception resulting from request to Endpoint.

EndpointKey

Bases: NamedTuple

Key to object in an Endpoint.

Attributes:

  • object_id (str) –

    Unique object ID.

  • endpoint_id (str | None) –

    Endpoint UUID where object is stored.

EndpointConnector

EndpointConnector(
    endpoints: Sequence[str | UUID],
    proxystore_dir: str | None = None,
)

Connector to ProxyStore Endpoints.

Warning

Specifying a custom proxystore_dir can cause problems if the proxystore_dir is not the same on all systems that a proxy created by this store could end up on. It is recommended to leave the proxystore_dir unspecified so the correct default directory will be used.

Parameters:

  • endpoints (Sequence[str | UUID]) –

    Sequence of valid and running endpoint UUIDs to use. At least one of these endpoints must be accessible by this process.

  • proxystore_dir (str | None, default: None ) –

    Optionally specify the proxystore home directory. Defaults to home_dir().

Raises:

Source code in proxystore/connectors/endpoint.py
def __init__(
    self,
    endpoints: Sequence[str | UUID],
    proxystore_dir: str | None = None,
) -> None:
    if len(endpoints) == 0:
        raise ValueError('At least one endpoint must be specified.')
    self.endpoints: list[UUID] = [
        e if isinstance(e, UUID) else UUID(e, version=4) for e in endpoints
    ]
    self.proxystore_dir = proxystore_dir

    # Maintain single session for connection pooling persistence to
    # speed up repeat requests to same endpoint.
    self._session = requests.Session()

    # Find the first locally accessible endpoint to use as our
    # home endpoint
    available_endpoints = get_configs(
        home_dir() if self.proxystore_dir is None else self.proxystore_dir,
    )
    found_endpoint: EndpointConfig | None = None
    for endpoint in available_endpoints:
        endpoint_uuid = UUID(endpoint.uuid)
        if endpoint_uuid not in self.endpoints:
            continue
        if endpoint.host is None:
            logger.warning(
                'Found valid configuration for endpoint '
                f'"{endpoint.name}" ({endpoint_uuid}), but the endpoint '
                'has not been started',
            )
            continue
        logger.debug(f'Attempting connection to {endpoint_uuid}')
        response = self._session.get(
            f'http://{endpoint.host}:{endpoint.port}/endpoint',
        )
        if response.status_code == 200:
            uuid_ = response.json()['uuid']
            if endpoint_uuid == UUID(uuid_):
                logger.debug(
                    f'Connection to {endpoint_uuid} successful, using '
                    'as local endpoint',
                )
                found_endpoint = endpoint
                break
            else:
                logger.debug(
                    f'Connection to {endpoint_uuid} returned '
                    'different UUID',
                )
        else:
            logger.debug(f'Connection to {endpoint_uuid} failed')

    if found_endpoint is None:
        self._session.close()
        raise EndpointConnectorError(
            'Failed to find an endpoint configuration matching one of the '
            'provided endpoint UUIDs, or an endpoint configuration was '
            'found but the endpoint could not be connected to. '
            'Enable debug level logging for more more details.',
        )
    self.endpoint_uuid: uuid.UUID = uuid.UUID(found_endpoint.uuid)
    self.endpoint_host: str | None = found_endpoint.host
    self.endpoint_port: int = found_endpoint.port

    self.address = f'http://{self.endpoint_host}:{self.endpoint_port}'

close

close() -> None

Close the connector and clean up.

Source code in proxystore/connectors/endpoint.py
def close(self) -> None:
    """Close the connector and clean up."""
    self._session.close()

config

config() -> dict[str, Any]

Get the connector configuration.

The configuration contains all the information needed to reconstruct the connector object.

Source code in proxystore/connectors/endpoint.py
def config(self) -> dict[str, Any]:
    """Get the connector configuration.

    The configuration contains all the information needed to reconstruct
    the connector object.
    """
    return {
        'endpoints': [str(ep) for ep in self.endpoints],
        'proxystore_dir': self.proxystore_dir,
    }

from_config classmethod

from_config(config: dict[str, Any]) -> EndpointConnector

Create a new connector instance from a configuration.

Parameters:

  • config (dict[str, Any]) –

    Configuration returned by .config().

Source code in proxystore/connectors/endpoint.py
@classmethod
def from_config(cls, config: dict[str, Any]) -> EndpointConnector:
    """Create a new connector instance from a configuration.

    Args:
        config: Configuration returned by `#!python .config()`.
    """
    return cls(**config)

evict

evict(key: EndpointKey) -> None

Evict the object associated with the key.

Parameters:

  • key (EndpointKey) –

    Key associated with object to evict.

Source code in proxystore/connectors/endpoint.py
def evict(self, key: EndpointKey) -> None:
    """Evict the object associated with the key.

    Args:
        key: Key associated with object to evict.
    """
    try:
        client.evict(
            self.address,
            key.object_id,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        assert e.response is not None
        raise EndpointConnectorError(
            f'Evict failed with error code {e.response.status_code}.',
        ) from e

exists

exists(key: EndpointKey) -> bool

Check if an object associated with the key exists.

Parameters:

  • key (EndpointKey) –

    Key potentially associated with stored object.

Returns:

  • bool

    If an object associated with the key exists.

Source code in proxystore/connectors/endpoint.py
def exists(self, key: EndpointKey) -> bool:
    """Check if an object associated with the key exists.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If an object associated with the key exists.
    """
    try:
        return client.exists(
            self.address,
            key.object_id,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        assert e.response is not None
        raise EndpointConnectorError(
            f'Exists failed with error code {e.response.status_code}.',
        ) from e

get

get(key: EndpointKey) -> bytes | None

Get the serialized object associated with the key.

Parameters:

  • key (EndpointKey) –

    Key associated with the object to retrieve.

Returns:

  • bytes | None

    Serialized object or None if the object does not exist.

Source code in proxystore/connectors/endpoint.py
def get(self, key: EndpointKey) -> bytes | None:
    """Get the serialized object associated with the key.

    Args:
        key: Key associated with the object to retrieve.

    Returns:
        Serialized object or `None` if the object does not exist.
    """
    try:
        return client.get(
            self.address,
            key.object_id,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        assert e.response is not None
        raise EndpointConnectorError(
            f'Get failed with error code {e.response.status_code}.',
        ) from e

get_batch

get_batch(
    keys: Sequence[EndpointKey],
) -> list[bytes | None]

Get a batch of serialized objects associated with the keys.

Parameters:

Returns:

  • list[bytes | None]

    List with same order as keys with the serialized objects or None if the corresponding key does not have an associated object.

Source code in proxystore/connectors/endpoint.py
def get_batch(self, keys: Sequence[EndpointKey]) -> list[bytes | None]:
    """Get a batch of serialized objects associated with the keys.

    Args:
        keys: Sequence of keys associated with objects to retrieve.

    Returns:
        List with same order as `keys` with the serialized objects or \
        `None` if the corresponding key does not have an associated object.
    """
    return [self.get(key) for key in keys]

new_key

new_key(obj: bytes | None = None) -> EndpointKey

Create a new key.

Warning

The returned key will be associated with this instance's local endpoint. I.e., when set() is called on this key, the connector must be connected to the same local endpoint.

Parameters:

  • obj (bytes | None, default: None ) –

    Optional object which the key will be associated with. Ignored in this implementation.

Returns:

  • EndpointKey

    Key which can be used to retrieve an object once set() has been called on the key.

Source code in proxystore/connectors/endpoint.py
def new_key(self, obj: bytes | None = None) -> EndpointKey:
    """Create a new key.

    Warning:
        The returned key will be associated with this instance's local
        endpoint. I.e., when
        [`set()`][proxystore.connectors.endpoint.EndpointConnector.set]
        is called on this key, the connector must be connected to the same
        local endpoint.

    Args:
        obj: Optional object which the key will be associated with.
            Ignored in this implementation.

    Returns:
        Key which can be used to retrieve an object once \
        [`set()`][proxystore.connectors.endpoint.EndpointConnector.set] \
        has been called on the key.
    """
    return EndpointKey(
        object_id=str(uuid.uuid4()),
        endpoint_id=str(self.endpoint_uuid),
    )

put

put(obj: bytes) -> EndpointKey

Put a serialized object in the store.

Parameters:

  • obj (bytes) –

    Serialized object to put in the store.

Returns:

  • EndpointKey

    Key which can be used to retrieve the object.

Source code in proxystore/connectors/endpoint.py
def put(self, obj: bytes) -> EndpointKey:
    """Put a serialized object in the store.

    Args:
        obj: Serialized object to put in the store.

    Returns:
        Key which can be used to retrieve the object.
    """
    key = EndpointKey(
        object_id=str(uuid.uuid4()),
        endpoint_id=str(self.endpoint_uuid),
    )
    self.set(key, obj)
    return key

put_batch

put_batch(objs: Sequence[bytes]) -> list[EndpointKey]

Put a batch of serialized objects in the store.

Parameters:

  • objs (Sequence[bytes]) –

    Sequence of serialized objects to put in the store.

Returns:

  • list[EndpointKey]

    List of keys with the same order as objs which can be used to retrieve the objects.

Source code in proxystore/connectors/endpoint.py
def put_batch(self, objs: Sequence[bytes]) -> list[EndpointKey]:
    """Put a batch of serialized objects in the store.

    Args:
        objs: Sequence of serialized objects to put in the store.

    Returns:
        List of keys with the same order as `objs` which can be used to \
        retrieve the objects.
    """
    return [self.put(obj) for obj in objs]

set

set(key: EndpointKey, obj: bytes) -> None

Set the object associated with a key.

Note

The Connector provides write-once, read-many semantics. Thus, set() should only be called once per key, otherwise unexpected behavior can occur.

Parameters:

  • key (EndpointKey) –

    Key that the object will be associated with.

  • obj (bytes) –

    Object to associate with the key.

Source code in proxystore/connectors/endpoint.py
def set(self, key: EndpointKey, obj: bytes) -> None:
    """Set the object associated with a key.

    Note:
        The [`Connector`][proxystore.connectors.protocols.Connector]
        provides write-once, read-many semantics. Thus,
        [`set()`][proxystore.connectors.endpoint.EndpointConnector.set]
        should only be called once per key, otherwise unexpected behavior
        can occur.

    Args:
        key: Key that the object will be associated with.
        obj: Object to associate with the key.
    """
    try:
        client.put(
            self.address,
            key.object_id,
            obj,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        assert e.response is not None
        raise EndpointConnectorError(
            f'Put failed with error code {e.response.status_code}.',
        ) from e