Skip to content

proxystore.store.dim.ucx

UCXStore implementation.

UCXStoreKey

Bases: NamedTuple

Key to objects in a MargoStore.

ucx_key class-attribute

ucx_key: str

Unique object key.

obj_size class-attribute

obj_size: int

Object size in bytes.

peer class-attribute

peer: str

Peer where object is located.

UCXStore

UCXStore(
    name: str,
    *,
    interface: str,
    port: int,
    cache_size: int = 16,
    stats: bool = False
) -> None

Bases: Store[UCXStoreKey]

Implementation for the client-facing component of UCXStore.

This client will initialize a local UCX server (Peer service) to store data to.

Parameters:

  • name (str) –

    Name of the store instance.

  • interface (str) –

    The network interface to use.

  • port (int) –

    The desired port for the UCX server.

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    Collect stats on store operations.

Source code in proxystore/store/dim/ucx.py
def __init__(
    self,
    name: str,
    *,
    interface: str,
    port: int,
    cache_size: int = 16,
    stats: bool = False,
) -> None:
    global server_process

    if ucx_import_error is not None:  # pragma: no cover
        raise ucx_import_error

    logger.debug('Instantiating client and server')

    self.host = get_ip_address(interface)
    self.port = port

    self.addr = f'{self.host}:{self.port}'

    try:
        self._loop = asyncio.get_running_loop()
    except RuntimeError:
        self._loop = asyncio.new_event_loop()

    if server_process is None:
        server_process = Process(
            target=launch_server,
            args=(self.host, self.port),
        )
        server_process.start()
        self._loop.run_until_complete(
            wait_for_server(self.host, self.port),
        )

    # TODO: Verify if create_endpoint error handling will successfully
    # connect to endpoint or if error handling needs to be done here

    super().__init__(
        name,
        cache_size=cache_size,
        stats=stats,
        kwargs={'interface': interface, 'port': self.port},
    )

close

close() -> None

Terminate Peer server process.

Source code in proxystore/store/dim/ucx.py
def close(self) -> None:
    """Terminate Peer server process."""
    global server_process

    logger.info('Clean up requested')

    if server_process is not None:
        server_process.terminate()
        server_process.join()
        server_process = None

    logger.debug('Clean up completed')

UCXServer

UCXServer(host: str, port: int) -> None

UCXServer implementation.

Parameters:

  • host (str) –

    The server host.

  • port (int) –

    The server port.

Source code in proxystore/store/dim/ucx.py
def __init__(self, host: str, port: int) -> None:
    self.host = host
    self.port = port
    self.data = {}
    self.ucp_listener = None

set

set(key: str, data: bytes) -> Status

Obtain data from the client and store it in local dictionary.

Parameters:

  • key (str) –

    The object key to use.

  • data (bytes) –

    The data to store.

Returns:

  • Status

    Operation status.

Source code in proxystore/store/dim/ucx.py
def set(self, key: str, data: bytes) -> Status:
    """Obtain data from the client and store it in local dictionary.

    Args:
        key: The object key to use.
        data: The data to store.

    Returns:
        Operation status.
    """
    self.data[key] = data
    return Status(success=True, error=None)

get

get(key: str) -> bytes | Status

Return data at a given key back to the client.

Parameters:

  • key (str) –

    The object key.

Returns:

Source code in proxystore/store/dim/ucx.py
def get(self, key: str) -> bytes | Status:
    """Return data at a given key back to the client.

    Args:
        key: The object key.

    Returns:
        Operation status.
    """
    try:
        return self.data[key]
    except KeyError as e:
        return Status(success=False, error=e)

evict

evict(key: str) -> Status

Remove key from local dictionary.

Parameters:

  • key (str) –

    The object to evict's key.

Returns:

  • Status

    Operation status.

Source code in proxystore/store/dim/ucx.py
def evict(self, key: str) -> Status:
    """Remove key from local dictionary.

    Args:
        key: The object to evict's key.

    Returns:
        Operation status.
    """
    self.data.pop(key, None)
    return Status(success=True, error=None)

exists

exists(key: str) -> bool

Check if a key exists within local dictionary.

Parameters:

  • key (str) –

    The object's key.

Returns:

  • bool

    If the object exists.

Source code in proxystore/store/dim/ucx.py
def exists(self, key: str) -> bool:
    """Check if a key exists within local dictionary.

    Args:
        key: The object's key.

    Returns:
        If the object exists.
    """
    return key in self.data

handler async

handler(ep: ucp.Endpoint) -> None

Handle endpoint requests.

Parameters:

  • ep (ucp.Endpoint) –

    The endpoint to communicate with.

Source code in proxystore/store/dim/ucx.py
async def handler(self, ep: ucp.Endpoint) -> None:
    """Handle endpoint requests.

    Args:
        ep: The endpoint to communicate with.
    """
    json_kv = await ep.recv_obj()

    if json_kv == bytes(1):
        await ep.send_obj(bytes(1))
        return

    kv = deserialize(bytes(json_kv))

    key = kv['key']
    data = kv['data']
    func = kv['op']

    if func == 'set':
        res = self.set(key, data)
    else:
        if func == 'get':
            func = self.get
        elif func == 'exists':
            func = self.exists
        elif func == 'evict':
            func = self.evict
        else:
            raise AssertionError('Unreachable.')
        res = func(key)

    if isinstance(res, Status) or isinstance(res, bool):
        serialized_res = serialize(res)
    else:
        serialized_res = res

    await ep.send_obj(serialized_res)

run async

run() -> None

Run this UCXServer forever.

Creates a listener for the handler method and waits on SIGINT/TERM events to exit. Also handles cleaning up UCP objects.

Source code in proxystore/store/dim/ucx.py
async def run(self) -> None:
    """Run this UCXServer forever.

    Creates a listener for the handler method and waits on SIGINT/TERM
    events to exit. Also handles cleaning up UCP objects.
    """
    self.ucp_listener = ucp.create_listener(self.handler, self.port)

    # Set the stop condition when receiving SIGINT (ctrl-C) and SIGTERM.
    loop = asyncio.get_running_loop()
    stop = loop.create_future()
    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)

    await stop
    self.close()
    await reset_ucp_async()

launch_server

launch_server(host: str, port: int) -> None

Launch the UCXServer in asyncio.

Parameters:

  • host (str) –

    The host for server to listen on.

  • port (int) –

    The port for server to listen on.

Source code in proxystore/store/dim/ucx.py
def launch_server(host: str, port: int) -> None:
    """Launch the UCXServer in asyncio.

    Args:
        host: The host for server to listen on.
        port: The port for server to listen on.
    """
    logger.info(f'starting server on host {host} with port {port}')

    ps = UCXServer(host, port)
    # CI occasionally timeouts when starting this server in the
    # store_implementation session fixture. It seems to not happen when
    # debug=True, but this is just a temporary fix.
    asyncio.run(ps.run(), debug=True)

    logger.info(f'server running at address {host}:{port}')

reset_ucp

reset_ucp() -> None

Hard reset all of UCP.

UCP provides ucp.reset(); however, this function does not correctly shutdown all asyncio tasks and readers. This function wraps ucp.reset() and additionally removes all readers on the event loop and cancels/awaits all asyncio tasks.

Source code in proxystore/store/dim/ucx.py
def reset_ucp() -> None:  # pragma: no cover
    """Hard reset all of UCP.

    UCP provides `ucp.reset()`; however, this function does not correctly
    shutdown all asyncio tasks and readers. This function wraps
    `ucp.reset()` and additionally removes all readers on the event loop
    and cancels/awaits all asyncio tasks.
    """

    def inner_context() -> None:
        ctx = ucp.core._get_ctx()

        for task in ctx.progress_tasks:
            if task is None:
                continue
            task.event_loop.remove_reader(ctx.epoll_fd)
            if task.asyncio_task is not None:
                try:
                    task.asyncio_task.cancel()
                    task.event_loop.run_until_complete(task.asyncio_task)
                except asyncio.CancelledError:
                    pass

    # We access ucp.core._get_ctx() inside this nested function so our local
    # reference to the UCP context goes out of scope before calling
    # ucp.reset(). ucp.reset() will fail if there are any weak references to
    # to the UCP context because it assumes those may be Listeners or
    # Endpoints that were not properly closed.
    inner_context()

    try:
        ucp.reset()
    except ucp.UCXError:
        pass

reset_ucp_async async

reset_ucp_async() -> None

Hard reset all of UCP.

UCP provides ucp.reset(); however, this function does not correctly shutdown all asyncio tasks and readers. This function wraps ucp.reset() and additionally removes all readers on the event loop and cancels/awaits all asyncio tasks.

Source code in proxystore/store/dim/ucx.py
async def reset_ucp_async() -> None:  # pragma: no cover
    """Hard reset all of UCP.

    UCP provides `ucp.reset()`; however, this function does not correctly
    shutdown all asyncio tasks and readers. This function wraps
    `ucp.reset()` and additionally removes all readers on the event loop
    and cancels/awaits all asyncio tasks.
    """

    async def inner_context() -> None:
        ctx = ucp.core._get_ctx()

        for task in ctx.progress_tasks:
            if task is None:
                continue
            task.event_loop.remove_reader(ctx.epoll_fd)
            if task.asyncio_task is not None:
                try:
                    task.asyncio_task.cancel()
                    await task.asyncio_task
                except asyncio.CancelledError:
                    pass

    # We access ucp.core._get_ctx() inside this nested function so our local
    # reference to the UCP context goes out of scope before calling
    # ucp.reset(). ucp.reset() will fail if there are any weak references to
    # to the UCP context because it assumes those may be Listeners or
    # Endpoints that were not properly closed.
    await inner_context()

    try:
        ucp.reset()
    except ucp.UCXError:
        pass

wait_for_server async

wait_for_server(
    host: str, port: int, timeout: float = 5.0
) -> None

Wait until the UCXServer responds.

Parameters:

  • host (str) –

    The host of UCXServer to ping.

  • port (int) –

    Theport of UCXServer to ping.

  • timeout (float) –

    The max time in seconds to wait for server response.

Source code in proxystore/store/dim/ucx.py
async def wait_for_server(host: str, port: int, timeout: float = 5.0) -> None:
    """Wait until the UCXServer responds.

    Args:
        host: The host of UCXServer to ping.
        port: Theport of UCXServer to ping.
        timeout: The max time in seconds to wait for server response.
    """
    sleep_time = 0.01
    time_waited = 0.0

    while True:
        try:
            ep = await ucp.create_endpoint(host, port)
        except ucp._libs.exceptions.UCXNotConnected as e:  # pragma: no cover
            if time_waited >= timeout:
                raise RuntimeError(
                    'Failed to connect to server within timeout '
                    f'({timeout} seconds).',
                ) from e
            await asyncio.sleep(sleep_time)
            time_waited += sleep_time
        else:
            break  # pragma: no cover

    await ep.send_obj(bytes(1))
    _ = await ep.recv_obj()
    await ep.close()
    assert ep.closed()