Skip to content

proxystore.store.dim.websockets

Websockets implementation.

WebsocketStoreKey

Bases: NamedTuple

Key to objects in a WebsocketStore.

websocket_key class-attribute

websocket_key: str

Unique object key.

obj_size class-attribute

obj_size: int

Object size in bytes.

peer class-attribute

peer: str

Peer where object is located.

WebsocketStore

WebsocketStore(
    name: str,
    *,
    interface: str,
    port: int,
    max_size: int = MAX_SIZE_DEFAULT,
    cache_size: int = 16,
    stats: bool = False
) -> None

Bases: Store[WebsocketStoreKey]

Distributed in-memory store using websockets.

This client will initialize a local Websocket server (Peer service) that it will store data to.

Parameters:

  • name (str) –

    Name of the store instance.

  • interface (str) –

    The network interface to use.

  • port (int) –

    The desired port for communication.

  • max_size (int) –

    The maximum size to be communicated via websockets.

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    Collect stats on store operations.

Source code in proxystore/store/dim/websockets.py
def __init__(
    self,
    name: str,
    *,
    interface: str,
    port: int,
    max_size: int = MAX_SIZE_DEFAULT,
    cache_size: int = 16,
    stats: bool = False,
) -> None:
    global server_process

    # Websockets is not a default dependency so we don't want to raise
    # an error unless the user actually tries to use this code
    if websockets_import_error is not None:  # pragma: no cover
        raise websockets_import_error

    logger.debug('Instantiating client and server')

    self.max_size = max_size
    self.chunk_size = MAX_CHUNK_LENGTH

    self.host = get_ip_address(interface)
    self.port = port

    self.addr = f'ws://{self.host}:{self.port}'

    if server_process is None:
        server_process = Process(target=self._start_server)
        server_process.start()

    try:
        self._loop = asyncio.get_running_loop()
    except RuntimeError:
        self._loop = asyncio.new_event_loop()
    self._loop.run_until_complete(self.server_started())

    super().__init__(
        name,
        cache_size=cache_size,
        stats=stats,
        kwargs={
            'interface': interface,
            'port': self.port,
            'max_size': self.max_size,
        },
    )

handler async

handler(event: bytes, addr: str) -> bytes

Websocket handler function implementation.

Parameters:

  • event (bytes) –

    A pickled dictionary consisting of the data, its key and the operation to perform on the data

  • addr (str) –

    The address of the server to connect to

Returns:

  • bytes

    The result of the operation on the data.

Source code in proxystore/store/dim/websockets.py
async def handler(self, event: bytes, addr: str) -> bytes:
    """Websocket handler function implementation.

    Args:
        event: A pickled dictionary consisting of the data,
            its key and the operation to perform on the data
        addr: The address of the server to connect to

    Returns:
        The result of the operation on the data.

    """
    async with connect(
        addr,
        max_size=self.max_size,
    ) as websocket:
        await websocket.send(utils.chunk_bytes(event, self.chunk_size))
        res = await websocket.recv()

    assert isinstance(res, bytes)

    return res

close

close() -> None

Terminate Peer server process.

Source code in proxystore/store/dim/websockets.py
def close(self) -> None:
    """Terminate Peer server process."""
    global server_process

    logger.info('Clean up requested')

    if server_process is not None:
        server_process.terminate()
        server_process.join()
        server_process = None

    logger.debug('Clean up completed')

WebsocketServer

WebsocketServer(
    host: str, port: int, max_size: int
) -> None

WebsocketServer implementation.

Parameters:

  • host (str) –

    IP address of the location to start the server.

  • port (int) –

    The port to initiate communication on.

  • max_size (int) –

    The maximum size allowed for websocket communication.

Source code in proxystore/store/dim/websockets.py
def __init__(
    self,
    host: str,
    port: int,
    max_size: int,
) -> None:
    self.host = host
    self.port = port
    self.max_size = max_size
    self.chunk_size = MAX_CHUNK_LENGTH
    self.data = {}
    super().__init__()

set

set(key: str, data: bytes) -> Status

Obtain and store locally data from client.

Parameters:

  • key (str) –

    Object key to use.

  • data (bytes) –

    Data to store.

Returns:

  • Status

    Operation status.

Source code in proxystore/store/dim/websockets.py
def set(self, key: str, data: bytes) -> Status:
    """Obtain and store locally data from client.

    Args:
        key: Object key to use.
        data: Data to store.

    Returns:
        Operation status.
    """
    self.data[key] = data
    return Status(success=True, error=None)

get

get(key: str) -> bytes | Status

Return data at a given key back to the client.

Parameters:

  • key (str) –

    The object key,

Returns:

Source code in proxystore/store/dim/websockets.py
def get(self, key: str) -> bytes | Status:
    """Return data at a given key back to the client.

    Args:
        key: The object key,

    Returns:
        Operation status.
    """
    try:
        return self.data[key]
    except KeyError as e:
        return Status(False, e)

evict

evict(key: str) -> Status

Remove key from local dictionary.

Parameters:

  • key (str) –

    The object to evict's key.

Returns:

  • Status

    Operation status.

Source code in proxystore/store/dim/websockets.py
def evict(self, key: str) -> Status:
    """Remove key from local dictionary.

    Args:
        key: The object to evict's key.

    Returns:
        Operation status.
    """
    self.data.pop(key, None)
    return Status(success=True, error=None)

exists

exists(key: str) -> bool

Check if a key exists within local dictionary.

Parameters:

  • key (str) –

    The object's key.

Returns:

  • bool

    Operations status.

Source code in proxystore/store/dim/websockets.py
def exists(self, key: str) -> bool:
    """Check if a key exists within local dictionary.

    Args:
        key: The object's key.

    Returns:
        Operations status.
    """
    return key in self.data

handler async

handler(websocket: WebSocketServerProtocol) -> None

Handle websocket connection requests.

Parameters:

  • websocket (WebSocketServerProtocol) –

    The websocket server.

Source code in proxystore/store/dim/websockets.py
async def handler(self, websocket: WebSocketServerProtocol) -> None:
    """Handle websocket connection requests.

    Args:
        websocket: The websocket server.
    """
    async for pkv in websocket:
        assert isinstance(pkv, bytes)
        kv = deserialize(pkv)

        key = kv['key']
        data = kv['data']
        func = kv['op']

        if func == 'set':
            res = self.set(key, data)
        else:
            if func == 'get':
                func = self.get
            elif func == 'exists':
                func = self.exists
            elif func == 'evict':
                func = self.evict
            else:
                raise AssertionError('Unreachable.')
            res = func(key)

        if isinstance(res, Status) or isinstance(res, bool):
            serialized_res = serialize(res)
        else:
            serialized_res = res

        await websocket.send(
            utils.chunk_bytes(serialized_res, self.chunk_size),
        )

launch async

launch() -> None

Launch the server.

Source code in proxystore/store/dim/websockets.py
async def launch(self) -> None:
    """Launch the server."""
    loop = asyncio.get_running_loop()
    stop = loop.create_future()

    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)

    async with serve(
        self.handler,
        self.host,
        self.port,
        max_size=self.max_size,
    ):
        await stop  # run forever