Skip to content

proxystore.endpoint.endpoint

Endpoint implementation.

EndpointMode

Bases: Enum

Endpoint mode.

PEERING class-attribute instance-attribute

PEERING = 1

Endpoint will establish peer connections with other endpoints.

SOLO class-attribute instance-attribute

SOLO = 2

Endpoint is operating in isolation and will ignore peer requests.

Endpoint

Endpoint(
    name: str | None = None,
    uuid: UUID | None = None,
    *,
    peer_manager: PeerManager | None = None,
    storage: Storage | None = None
)

ProxyStore Endpoint.

An endpoint is an object store with get/set functionality.

By default, an endpoint operates in EndpointMode.SOLO mode where the endpoint acts just as an isolated object store. Endpoints can also be configured in EndpointMode.PEERING mode by initializing the endpoint with a PeerManager. The PeerManager is connected to a relay server which is used to establish peer-to-peer connections with other endpoints connected to the same relay server. After peer connections are established, endpoints can forward operations between each other. Peering is available even when endpoints are behind separate NATs. See the proxystore.p2p module to learn more about peering.

Warning

Requests made to remote endpoints will only invoke the request on the remote and return the result. I.e., invoking GET on a remote will return the value but will not store it on the local endpoint.

Example

Solo Mode Usage

async with Endpoint('ep1', uuid.uuid4()) as endpoint:
    serialized_data = b'data string'
    await endpoint.set('key', serialized_data)
    assert await endpoint.get('key') == serialized_data
    await endpoint.evict('key')
    assert not await endpoint.exists('key')
Example

Peering Mode Usage

pm1 = await PeerManager(RelayClient(...))
pm2 = await PeerManager(RelayClient(...))
ep1 = await Endpoint(peer_manager=pm1)
ep2 = await Endpoint(peer_manager=pm2)

serialized_data = b'data string'
await ep1.set('key', serialized_data)
assert await ep2.get('key', endpoint=ep1.uuid) == serialized_data
assert await ep1.exists('key')
assert not await ep1.exists('key', endpoint=ep2.uuid)

await ep1.close()
await ep2.close()
Note

Endpoints can be configured and started via the proxystore-endpoint command-line interface.

Note

If the endpoint is being used in peering mode, the endpoint should be used as a context manager or initialized with await. This will ensure Endpoint.async_init() is called which initializes the background task that listens for incoming peer messages.

endpoint = await Endpoint(...)
await endpoint.close()
async with Endpoint(...) as endpoint:
    ...

Parameters:

  • name (str | None, default: None ) –

    Readable name of the endpoint. Only used if peer_manager is not provided. Otherwise the name will be set to PeerManager.name.

  • uuid (UUID | None, default: None ) –

    UUID of the endpoint. Only used if peer_manager is not provided. Otherwise the UUID will be set to PeerManager.uuid.

  • peer_manager (PeerManager | None, default: None ) –

    Optional peer manager that is connected to a relay server which will be used for establishing peer connections to other endpoints connected to the same relay server.

  • storage (Storage | None, default: None ) –

    Storage interface to use. If None, DictStorage is used.

Raises:

  • ValueError

    if neither name/uuid or peer_manager are set.

Source code in proxystore/endpoint/endpoint.py
def __init__(
    self,
    name: str | None = None,
    uuid: UUID | None = None,
    *,
    peer_manager: PeerManager | None = None,
    storage: Storage | None = None,
) -> None:
    if peer_manager is None and (name is None or uuid is None):
        raise ValueError(
            'The name and uuid parameters must be provided if '
            'a PeerManager is not provided.',
        )

    self._default_name = name
    self._default_uuid = uuid
    self._peer_manager = peer_manager
    self._storage = DictStorage() if storage is None else storage

    self._mode = (
        EndpointMode.SOLO if peer_manager is None else EndpointMode.PEERING
    )
    self._pending_requests: dict[
        str,
        asyncio.Future[EndpointRequest],
    ] = {}
    self._peer_handler_task: asyncio.Task[None] | None = None

    if self._mode is EndpointMode.SOLO:
        # Initialization is not complete for endpoints in peering mode
        # until async_init() is called.
        logger.info(
            f'{self._log_prefix}: initialized endpoint operating '
            f'in {self._mode.name} mode',
        )

name property

name: str

Name of this endpoint.

uuid property

uuid: UUID

UUID of this endpoint.

peer_manager property

peer_manager: PeerManager | None

Peer manager.

Raises:

async_init() async

async_init() -> None

Initialize connections and tasks necessary for peering.

Note

This will also call PeerManager.async_init() if one is provided so that asynchronous resources for both the PeerManager and endpoint can be initialized later after creation.

Source code in proxystore/endpoint/endpoint.py
async def async_init(self) -> None:
    """Initialize connections and tasks necessary for peering.

    Note:
        This will also call
        [`PeerManager.async_init()`][proxystore.p2p.manager.PeerManager.async_init]
        if one is provided so that asynchronous resources for both the
        [`PeerManager`][proxystore.p2p.manager.PeerManager]
        and endpoint can be initialized later after creation.
    """
    if self._peer_manager is not None and self._peer_handler_task is None:
        await self._peer_manager.async_init()
        self._peer_handler_task = spawn_guarded_background_task(
            self._handle_peer_requests,
        )
        self._peer_handler_task.set_name(
            f'endpoint-{self.uuid}-handle-peer-requests',
        )
        logger.info(
            f'{self._log_prefix}: initialized endpoint operating '
            f'in {self._mode.name} mode',
        )

evict() async

evict(key: str, endpoint: UUID | None = None) -> None

Evict key from endpoint.

Parameters:

  • key (str) –

    Key to evict.

  • endpoint (UUID | None, default: None ) –

    Endpoint to perform operation on. If unspecified or if the endpoint is on solo mode, the operation will be performed on the local endpoint.

Raises:

Source code in proxystore/endpoint/endpoint.py
async def evict(self, key: str, endpoint: UUID | None = None) -> None:
    """Evict key from endpoint.

    Args:
        key: Key to evict.
        endpoint: Endpoint to perform operation on. If
            unspecified or if the endpoint is on solo mode, the operation
            will be performed on the local endpoint.

    Raises:
        PeerRequestError: If request to a peer endpoint fails.
    """
    logger.debug(
        f'{self._log_prefix}: EVICT key={key} on endpoint={endpoint}',
    )
    if self._is_peer_request(endpoint):
        assert endpoint is not None
        request = EndpointRequest(
            kind='request',
            op='evict',
            uuid=str(uuid4()),
            key=key,
        )
        request_future = await self._request_from_peer(endpoint, request)
        await request_future
    else:
        await self._storage.evict(key)

exists() async

exists(key: str, endpoint: UUID | None = None) -> bool

Check if key exists on endpoint.

Parameters:

  • key (str) –

    Key to check.

  • endpoint (UUID | None, default: None ) –

    Endpoint to perform operation on. If unspecified or if the endpoint is on solo mode, the operation will be performed on the local endpoint.

Returns:

  • bool

    If the key exists.

Raises:

Source code in proxystore/endpoint/endpoint.py
async def exists(self, key: str, endpoint: UUID | None = None) -> bool:
    """Check if key exists on endpoint.

    Args:
        key: Key to check.
        endpoint: Endpoint to perform operation on. If
            unspecified or if the endpoint is on solo mode, the operation
            will be performed on the local endpoint.

    Returns:
        If the key exists.

    Raises:
        PeerRequestError: If request to a peer endpoint fails.
    """
    logger.debug(
        f'{self._log_prefix}: EXISTS key={key} on endpoint={endpoint}',
    )
    if self._is_peer_request(endpoint):
        assert endpoint is not None
        request = EndpointRequest(
            kind='request',
            op='exists',
            uuid=str(uuid4()),
            key=key,
        )
        request_future = await self._request_from_peer(endpoint, request)
        response = await request_future
        assert isinstance(response.exists, bool)
        return response.exists
    else:
        return await self._storage.exists(key)

get() async

get(key: str, endpoint: UUID | None = None) -> bytes | None

Get value associated with key on endpoint.

Parameters:

  • key (str) –

    Key to get value for.

  • endpoint (UUID | None, default: None ) –

    Endpoint to perform operation on. If unspecified or if the endpoint is on solo mode, the operation will be performed on the local endpoint.

Returns:

  • bytes | None

    Value associated with key.

Raises:

Source code in proxystore/endpoint/endpoint.py
async def get(
    self,
    key: str,
    endpoint: UUID | None = None,
) -> bytes | None:
    """Get value associated with key on endpoint.

    Args:
        key: Key to get value for.
        endpoint: Endpoint to perform operation on. If
            unspecified or if the endpoint is on solo mode, the operation
            will be performed on the local endpoint.

    Returns:
        Value associated with key.

    Raises:
        PeerRequestError: If request to a peer endpoint fails.
    """
    logger.debug(
        f'{self._log_prefix}: GET key={key} on endpoint={endpoint}',
    )
    if self._is_peer_request(endpoint):
        assert endpoint is not None
        request = EndpointRequest(
            kind='request',
            op='get',
            uuid=str(uuid4()),
            key=key,
        )
        request_future = await self._request_from_peer(endpoint, request)
        response = await request_future
        return response.data
    else:
        return await self._storage.get(key, None)

set() async

set(
    key: str, data: bytes, endpoint: UUID | None = None
) -> None

Set key with data on endpoint.

Parameters:

  • key (str) –

    Key to associate with value.

  • data (bytes) –

    Value to associate with key.

  • endpoint (UUID | None, default: None ) –

    Endpoint to perform operation on. If unspecified or if the endpoint is on solo mode, the operation will be performed on the local endpoint.

Raises:

  • ObjectSizeExceededError

    If the max object size is configured and the data exceeds that size.

  • PeerRequestError

    If request to a peer endpoint fails.

Source code in proxystore/endpoint/endpoint.py
async def set(
    self,
    key: str,
    data: bytes,
    endpoint: UUID | None = None,
) -> None:
    """Set key with data on endpoint.

    Args:
        key: Key to associate with value.
        data: Value to associate with key.
        endpoint: Endpoint to perform operation on. If
            unspecified or if the endpoint is on solo mode, the operation
            will be performed on the local endpoint.

    Raises:
        ObjectSizeExceededError: If the max object size is configured and
            the data exceeds that size.
        PeerRequestError: If request to a peer endpoint fails.
    """
    logger.debug(
        f'{self._log_prefix}: SET key={key} on endpoint={endpoint}',
    )

    if self._is_peer_request(endpoint):
        assert endpoint is not None
        request = EndpointRequest(
            kind='request',
            op='set',
            uuid=str(uuid4()),
            key=key,
            data=data,
        )
        request_future = await self._request_from_peer(endpoint, request)
        await request_future
    else:
        await self._storage.set(key, data)

close() async

close() -> None

Close the endpoint and any open connections safely.

Source code in proxystore/endpoint/endpoint.py
async def close(self) -> None:
    """Close the endpoint and any open connections safely."""
    if self._peer_handler_task is not None:
        self._peer_handler_task.cancel()
        try:
            await self._peer_handler_task
        except asyncio.CancelledError:
            pass
    if self._peer_manager is not None:
        await self._peer_manager.close()
    await self._storage.close()
    logger.info(f'{self._log_prefix}: endpoint closed')