Skip to content

proxystore.connectors.endpoint

Endpoint connector implementation.

EndpointConnectorError

Bases: Exception

Exception resulting from request to Endpoint.

EndpointKey

Bases: NamedTuple

Key to object in an Endpoint.

Attributes:

  • object_id (str) –

    Unique object ID.

  • endpoint_id (str | None) –

    Endpoint UUID where object is stored.

EndpointConnector

EndpointConnector(
    endpoints: Sequence[str | UUID],
    proxystore_dir: str | None = None,
) -> None

Connector to ProxyStore Endpoints.

Warning

Specifying a custom proxystore_dir can cause problems if the proxystore_dir is not the same on all systems that a proxy created by this store could end up on. It is recommended to leave the proxystore_dir unspecified so the correct default directory will be used.

Parameters:

  • endpoints (Sequence[str | UUID]) –

    Sequence of valid and running endpoint UUIDs to use. At least one of these endpoints must be accessible by this process.

  • proxystore_dir (str | None) –

    Optionally specify the proxystore home directory. Defaults to home_dir().

Raises:

Source code in proxystore/connectors/endpoint.py
def __init__(
    self,
    endpoints: Sequence[str | UUID],
    proxystore_dir: str | None = None,
) -> None:
    if len(endpoints) == 0:
        raise ValueError('At least one endpoint must be specified.')
    self.endpoints: list[UUID] = [
        e if isinstance(e, UUID) else UUID(e, version=4) for e in endpoints
    ]
    self.proxystore_dir = proxystore_dir

    # Maintain single session for connection pooling persistence to
    # speed up repeat requests to same endpoint.
    self._session = requests.Session()

    # Find the first locally accessible endpoint to use as our
    # home endpoint
    available_endpoints = get_configs(
        home_dir() if self.proxystore_dir is None else self.proxystore_dir,
    )
    found_endpoint: EndpointConfig | None = None
    for endpoint in available_endpoints:
        if endpoint.uuid in self.endpoints:
            logger.debug(f'Attempting connection to {endpoint.uuid}')
            response = self._session.get(
                f'http://{endpoint.host}:{endpoint.port}/endpoint',
            )
            if response.status_code == 200:
                uuid = response.json()['uuid']
                if str(endpoint.uuid) == uuid:
                    logger.debug(
                        f'Connection to {endpoint.uuid} successful, using '
                        'as local endpoint',
                    )
                    found_endpoint = endpoint
                    break
                else:
                    logger.debug(
                        f'Connection to {endpoint.uuid} returned '
                        'different UUID',
                    )
            else:
                logger.debug(f'Connection to {endpoint.uuid} failed')

    if found_endpoint is None:
        self._session.close()
        raise EndpointConnectorError(
            'Failed to find endpoint configuration matching one of the '
            'provided endpoint UUIDs.',
        )
    self.endpoint_uuid = found_endpoint.uuid
    self.endpoint_host = found_endpoint.host
    self.endpoint_port = found_endpoint.port

    self.address = f'http://{self.endpoint_host}:{self.endpoint_port}'

close()

close() -> None

Close the connector and clean up.

Source code in proxystore/connectors/endpoint.py
def close(self) -> None:
    """Close the connector and clean up."""
    self._session.close()

config()

config() -> dict[str, Any]

Get the connector configuration.

The configuration contains all the information needed to reconstruct the connector object.

Source code in proxystore/connectors/endpoint.py
def config(self) -> dict[str, Any]:
    """Get the connector configuration.

    The configuration contains all the information needed to reconstruct
    the connector object.
    """
    return {
        'endpoints': [str(ep) for ep in self.endpoints],
        'proxystore_dir': self.proxystore_dir,
    }

from_config() classmethod

from_config(config: dict[str, Any]) -> EndpointConnector

Create a new connector instance from a configuration.

Parameters:

  • config (dict[str, Any]) –

    Configuration returned by .config().

Source code in proxystore/connectors/endpoint.py
@classmethod
def from_config(cls, config: dict[str, Any]) -> EndpointConnector:
    """Create a new connector instance from a configuration.

    Args:
        config: Configuration returned by `#!python .config()`.
    """
    return cls(**config)

evict()

evict(key: EndpointKey) -> None

Evict the object associated with the key.

Parameters:

  • key (EndpointKey) –

    Key associated with object to evict.

Source code in proxystore/connectors/endpoint.py
def evict(self, key: EndpointKey) -> None:
    """Evict the object associated with the key.

    Args:
        key: Key associated with object to evict.
    """
    try:
        client.evict(
            self.address,
            key.object_id,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        raise EndpointConnectorError(
            f'Evict failed with error code {e.response.status_code}.',
        ) from e

exists()

exists(key: EndpointKey) -> bool

Check if an object associated with the key exists.

Parameters:

  • key (EndpointKey) –

    Key potentially associated with stored object.

Returns:

  • bool

    If an object associated with the key exists.

Source code in proxystore/connectors/endpoint.py
def exists(self, key: EndpointKey) -> bool:
    """Check if an object associated with the key exists.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If an object associated with the key exists.
    """
    try:
        return client.exists(
            self.address,
            key.object_id,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        raise EndpointConnectorError(
            f'Exists failed with error code {e.response.status_code}.',
        ) from e

get()

get(key: EndpointKey) -> bytes | None

Get the serialized object associated with the key.

Parameters:

  • key (EndpointKey) –

    Key associated with the object to retrieve.

Returns:

  • bytes | None

    Serialized object or None if the object does not exist.

Source code in proxystore/connectors/endpoint.py
def get(self, key: EndpointKey) -> bytes | None:
    """Get the serialized object associated with the key.

    Args:
        key: Key associated with the object to retrieve.

    Returns:
        Serialized object or `None` if the object does not exist.
    """
    try:
        return client.get(
            self.address,
            key.object_id,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        raise EndpointConnectorError(
            f'Get failed with error code {e.response.status_code}.',
        ) from e

get_batch()

get_batch(
    keys: Sequence[EndpointKey],
) -> list[bytes | None]

Get a batch of serialized objects associated with the keys.

Parameters:

Returns:

  • list[bytes | None]

    List with same order as keys with the serialized objects or None if the corresponding key does not have an associated object.

Source code in proxystore/connectors/endpoint.py
def get_batch(self, keys: Sequence[EndpointKey]) -> list[bytes | None]:
    """Get a batch of serialized objects associated with the keys.

    Args:
        keys: Sequence of keys associated with objects to retrieve.

    Returns:
        List with same order as `keys` with the serialized objects or \
        `None` if the corresponding key does not have an associated object.
    """
    return [self.get(key) for key in keys]

put()

put(obj: bytes) -> EndpointKey

Put a serialized object in the store.

Parameters:

  • obj (bytes) –

    Serialized object to put in the store.

Returns:

  • EndpointKey

    Key which can be used to retrieve the object.

Source code in proxystore/connectors/endpoint.py
def put(self, obj: bytes) -> EndpointKey:
    """Put a serialized object in the store.

    Args:
        obj: Serialized object to put in the store.

    Returns:
        Key which can be used to retrieve the object.
    """
    key = EndpointKey(
        object_id=str(uuid.uuid4()),
        endpoint_id=str(self.endpoint_uuid),
    )
    try:
        client.put(
            self.address,
            key.object_id,
            obj,
            key.endpoint_id,
            session=self._session,
        )
    except requests.exceptions.RequestException as e:
        raise EndpointConnectorError(
            f'Put failed with error code {e.response.status_code}.',
        ) from e

    return key

put_batch()

put_batch(objs: Sequence[bytes]) -> list[EndpointKey]

Put a batch of serialized objects in the store.

Parameters:

  • objs (Sequence[bytes]) –

    Sequence of serialized objects to put in the store.

Returns:

  • list[EndpointKey]

    List of keys with the same order as objs which can be used to retrieve the objects.

Source code in proxystore/connectors/endpoint.py
def put_batch(self, objs: Sequence[bytes]) -> list[EndpointKey]:
    """Put a batch of serialized objects in the store.

    Args:
        objs: Sequence of serialized objects to put in the store.

    Returns:
        List of keys with the same order as `objs` which can be used to \
        retrieve the objects.
    """
    return [self.put(obj) for obj in objs]