Skip to content

proxystore.store.endpoint

EndpointStore Implementation.

EndpointStoreError

Bases: Exception

Exception resulting from request to Endpoint.

EndpointStoreKey

Bases: NamedTuple

Key to object in an Endpoint.

object_id class-attribute

object_id: str

Unique object ID.

endpoint_id class-attribute

endpoint_id: str | None

Endpoint UUID where object is stored.

EndpointStore

EndpointStore(
    name: str,
    *,
    endpoints: Sequence[str | UUID],
    proxystore_dir: str | None = None,
    cache_size: int = 16,
    stats: bool = False
) -> None

Bases: Store[EndpointStoreKey]

EndpointStore backend class.

Warning

Specifying a custom proxystore_dir can cause problems if the proxystore_dir is not the same on all systems that a proxy created by this store could end up on. It is recommended to leave the proxystore_dir unspecified so the correct default directory will be used.

Parameters:

  • name (str) –

    Name of the store instance (default: None).

  • endpoints (Sequence[str | UUID]) –

    Sequence of valid and running endpoint UUIDs to use. At least one of these endpoints must be accessible by this process.

  • proxystore_dir (str | None) –

    Optionally specify the proxystore home directory. Defaults to [home_dir()[proxystore.utils.home_dir].

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    Collect stats on store operations.

Raises:

Source code in proxystore/store/endpoint.py
def __init__(
    self,
    name: str,
    *,
    endpoints: Sequence[str | UUID],
    proxystore_dir: str | None = None,
    cache_size: int = 16,
    stats: bool = False,
) -> None:
    if len(endpoints) == 0:
        raise ValueError('At least one endpoint must be specified.')
    self.endpoints: list[UUID] = [
        e if isinstance(e, UUID) else UUID(e, version=4) for e in endpoints
    ]
    self.proxystore_dir = (
        home_dir() if proxystore_dir is None else proxystore_dir
    )

    # Find the first locally accessible endpoint to use as our
    # home endpoint
    available_endpoints = get_configs(self.proxystore_dir)
    found_endpoint: EndpointConfig | None = None
    for endpoint in available_endpoints:
        if endpoint.uuid in self.endpoints:
            logger.debug(f'attempting connection to {endpoint.uuid}')
            response = requests.get(
                f'http://{endpoint.host}:{endpoint.port}/endpoint',
            )
            if response.status_code == 200:
                uuid = response.json()['uuid']
                if str(endpoint.uuid) == uuid:
                    logger.debug(
                        f'connection to {endpoint.uuid} successful, using '
                        'as home endpoint',
                    )
                    found_endpoint = endpoint
                    break
                else:
                    logger.debug(f'{endpoint.uuid} has different UUID')
            else:
                logger.debug(f'connection to {endpoint.uuid} unsuccessful')

    if found_endpoint is None:
        raise EndpointStoreError(
            'Failed to find endpoint configuration matching one of the '
            'provided endpoint UUIDs.',
        )
    self.endpoint_uuid = found_endpoint.uuid
    self.endpoint_host = found_endpoint.host
    self.endpoint_port = found_endpoint.port

    self.address = f'http://{self.endpoint_host}:{self.endpoint_port}'

    super().__init__(
        name,
        cache_size=cache_size,
        stats=stats,
        kwargs={
            'endpoints': self.endpoints,
            # Note: don't pass self.proxystore_dir here because it may
            # change depending on the system we are on (as a proxy could
            # reinitialize this store on a different system).
            'proxystore_dir': proxystore_dir,
        },
    )

evict

evict(key: EndpointStoreKey) -> None

Evict object associated with key.

Parameters:

Raises:

Source code in proxystore/store/endpoint.py
def evict(self, key: EndpointStoreKey) -> None:
    """Evict object associated with key.

    Args:
        key: Key corresponding to object in store to evict.

    Raises:
        EndpointStoreError: If the Endpoint returns a non-200 status code.
    """
    response = requests.post(
        f'{self.address}/evict',
        params={'key': key.object_id, 'endpoint': key.endpoint_id},
    )
    if response.status_code != 200:
        raise EndpointStoreError(f'EVICT returned {response}')

    self._cache.evict(key)
    logger.debug(
        f"EVICT key='{key}' FROM {self.__class__.__name__}"
        f"(name='{self.name}')",
    )

exists

exists(key: EndpointStoreKey) -> bool

Check if key exists.

Parameters:

Raises:

Source code in proxystore/store/endpoint.py
def exists(self, key: EndpointStoreKey) -> bool:
    """Check if key exists.

    Args:
        key: Key to check.

    Raises:
        EndpointStoreError: If the Endpoint returns a non-200 status code.
    """
    response = requests.get(
        f'{self.address}/exists',
        params={'key': key.object_id, 'endpoint': key.endpoint_id},
    )
    if response.status_code == 200:
        return response.json()['exists']
    else:
        raise EndpointStoreError(f'EXISTS returned {response}')

get_bytes

get_bytes(key: EndpointStoreKey) -> bytes | None

Get serialized object from remote store.

Parameters:

Returns:

  • bytes | None

    Serialized object or None if it does not exist on the endpoint.

Raises:

  • EndpointStoreError

    If the Endpoint returns a status code other than 200 (success) or 400 (missing key).

Source code in proxystore/store/endpoint.py
def get_bytes(self, key: EndpointStoreKey) -> bytes | None:
    """Get serialized object from remote store.

    Args:
        key: Key corresponding to object.

    Returns:
        Serialized object or `None` if it does not exist on the endpoint.

    Raises:
        EndpointStoreError: If the Endpoint returns a status code other
            than 200 (success) or 400 (missing key).
    """
    response = requests.get(
        f'{self.address}/get',
        params={'key': key.object_id, 'endpoint': key.endpoint_id},
        stream=True,
    )
    if response.status_code == 200:
        data = bytearray()
        for chunk in response.iter_content(chunk_size=None):
            data += chunk
        return bytes(data)
    elif response.status_code == 400:
        return None
    else:
        raise EndpointStoreError(f'GET returned {response}')

set_bytes

set_bytes(key: EndpointStoreKey, data: bytes) -> None

Set serialized object in remote store with key.

Parameters:

Raises:

Source code in proxystore/store/endpoint.py
def set_bytes(self, key: EndpointStoreKey, data: bytes) -> None:
    """Set serialized object in remote store with key.

    Args:
        key: Key corresponding to object.
        data: Serialized object.

    Raises:
        EndpointStoreError: If the endpoint does not return a 200 status
            code for success.
    """
    response = requests.post(
        f'{self.address}/set',
        headers={'Content-Type': 'application/octet-stream'},
        params={'key': key.object_id, 'endpoint': key.endpoint_id},
        data=chunk_bytes(data, MAX_CHUNK_LENGTH),
        stream=True,
    )
    if response.status_code != 200:
        raise EndpointStoreError(f'SET returned {response}')