Skip to content

proxystore.store.dim.zmq

ZeroMQ implementation.

ZeroMQStoreKey

Bases: NamedTuple

Key to objects in a ZeroMQStore.

zmq_key class-attribute

zmq_key: str

Unique object key.

obj_size class-attribute

obj_size: int

Object size in bytes.

peer class-attribute

peer: str

Peer where object is located.

ZeroMQStore

ZeroMQStore(
    name: str,
    *,
    interface: str,
    port: int,
    max_size: int = MAX_SIZE_DEFAULT,
    cache_size: int = 16,
    stats: bool = False
) -> None

Bases: Store[ZeroMQStoreKey]

Distributed in-memory store using Zero MQ.

This client will initialize a local ZeroMQ server (Peer service) that it will store data to.

Parameters:

  • name (str) –

    Name of the store instance.

  • interface (str) –

    The network interface to use.

  • port (int) –

    The desired port for communication.

  • max_size (int) –

    The maximum size to be communicated via zmq.

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    Collect stats on store operations.

Source code in proxystore/store/dim/zmq.py
def __init__(
    self,
    name: str,
    *,
    interface: str,
    port: int,
    max_size: int = MAX_SIZE_DEFAULT,
    cache_size: int = 16,
    stats: bool = False,
) -> None:
    global server_process

    # ZMQ is not a default dependency so we don't want to raise
    # an error unless the user actually tries to use this code
    if zmq_import_error is not None:  # pragma: no cover
        raise zmq_import_error

    logger.debug('Instantiating client and server')

    self.max_size = max_size
    self.chunk_size = MAX_CHUNK_LENGTH

    self.host = get_ip_address(interface)
    self.port = port

    self.addr = f'tcp://{self.host}:{self.port}'

    if server_process is None:
        server_process = Process(target=self._start_server)
        server_process.start()

    self.context = zmq.asyncio.Context()
    self.socket = self.context.socket(zmq.REQ)

    try:
        self._loop = asyncio.get_running_loop()
    except RuntimeError:
        self._loop = asyncio.new_event_loop()
    self._loop.run_until_complete(wait_for_server(self.host, self.port))

    super().__init__(
        name,
        cache_size=cache_size,
        stats=stats,
        kwargs={
            'interface': interface,
            'port': self.port,
            'max_size': self.max_size,
        },
    )

handler async

handler(event: bytes, addr: str) -> bytes

ZeroMQ handler function implementation.

Parameters:

  • event (bytes) –

    A pickled dictionary consisting of the data, its key, and the operation to perform on the data.

  • addr (str) –

    The address of the server to connect to.

Returns:

  • bytes

    The serialized result of the operation on the data.

Source code in proxystore/store/dim/zmq.py
async def handler(self, event: bytes, addr: str) -> bytes:
    """ZeroMQ handler function implementation.

    Args:
        event: A pickled dictionary consisting of the data,
            its key, and the operation to perform on the data.
        addr: The address of the server to connect to.

    Returns:
        The serialized result of the operation on the data.

    """
    with self.socket.connect(addr):
        await self.socket.send_multipart(
            list(utils.chunk_bytes(event, self.chunk_size)),
        )
        res = b''.join(await self.socket.recv_multipart())

    assert isinstance(res, bytes)

    return res

close

close() -> None

Terminate Peer server process.

Source code in proxystore/store/dim/zmq.py
def close(self) -> None:
    """Terminate Peer server process."""
    global server_process

    logger.info('Clean up requested')

    if server_process is not None:  # pragma: no cover
        server_process.terminate()
        server_process.join()
        server_process = None

    logger.debug('Clean up completed')

ZeroMQServer

ZeroMQServer(host: str, port: int, max_size: int) -> None

ZeroMQServer implementation.

Parameters:

  • host (str) –

    IP address of the location to start the server.

  • port (int) –

    The port to initiate communication on.

  • max_size (int) –

    The maximum size allowed for zmq communication.

Source code in proxystore/store/dim/zmq.py
def __init__(self, host: str, port: int, max_size: int) -> None:
    self.host = host
    self.port = port
    self.max_size = max_size
    self.chunk_size = MAX_CHUNK_LENGTH
    self.data = {}

    self.context = zmq.asyncio.Context()
    self.socket = self.context.socket(zmq.REP)
    self.socket.bind(f'tcp://{self.host}:{self.port}')

set

set(key: str, data: bytes) -> Status

Obtain and store locally data from client.

Parameters:

  • key (str) –

    Object key to use.

  • data (bytes) –

    Data to store.

Returns:

  • Status

    Operation status.

Source code in proxystore/store/dim/zmq.py
def set(self, key: str, data: bytes) -> Status:
    """Obtain and store locally data from client.

    Args:
        key: Object key to use.
        data: Data to store.

    Returns:
        Operation status.
    """
    self.data[key] = data
    return Status(success=True, error=None)

get

get(key: str) -> bytes | Status

Return data at a given key back to the client.

Parameters:

  • key (str) –

    The object key.

Returns:

Source code in proxystore/store/dim/zmq.py
def get(self, key: str) -> bytes | Status:
    """Return data at a given key back to the client.

    Args:
        key: The object key.

    Returns:
        Operation status.
    """
    try:
        return self.data[key]
    except KeyError as e:
        return Status(False, e)

evict

evict(key: str) -> Status

Remove key from local dictionary.

Parameters:

  • key (str) –

    The object to evict's key.

Returns:

  • Status

    Operation status.

Source code in proxystore/store/dim/zmq.py
def evict(self, key: str) -> Status:
    """Remove key from local dictionary.

    Args:
        key: The object to evict's key.

    Returns:
        Operation status.
    """
    self.data.pop(key, None)
    return Status(success=True, error=None)

exists

exists(key: str) -> bool

Check if a key exists within local dictionary.

Parameters:

  • key (str) –

    The object's key.

Returns:

  • bool

    If the key exists.

Source code in proxystore/store/dim/zmq.py
def exists(self, key: str) -> bool:
    """Check if a key exists within local dictionary.

    Args:
        key: The object's key.

    Returns:
        If the key exists.
    """
    return key in self.data

handler async

handler() -> None

Handle zmq connection requests.

Source code in proxystore/store/dim/zmq.py
async def handler(self) -> None:
    """Handle zmq connection requests."""
    while not self.socket.closed:  # pragma: no branch
        try:
            for pkv in await self.socket.recv_multipart():
                assert isinstance(pkv, bytes)

                if pkv == b'ping':
                    self.socket.send(b'pong')
                    continue

                kv = deserialize(pkv)

                key = kv['key']
                data = kv['data']
                func = kv['op']

                if func == 'set':
                    res = self.set(key, data)
                else:
                    if func == 'get':
                        func = self.get
                    elif func == 'exists':
                        func = self.exists
                    elif func == 'evict':
                        func = self.evict
                    else:
                        raise AssertionError('Unreachable.')
                    res = func(key)

                if isinstance(res, Status) or isinstance(res, bool):
                    serialized_res = serialize(res)
                else:
                    serialized_res = res

                await self.socket.send_multipart(
                    list(
                        utils.chunk_bytes(serialized_res, self.chunk_size),
                    ),
                )
        except zmq.ZMQError as e:  # pragma: no cover
            logger.exception(e)
            await asyncio.sleep(0.01)
        except asyncio.exceptions.CancelledError:  # pragma: no cover
            logger.debug('loop terminated')

launch async

launch() -> None

Launch the server.

Source code in proxystore/store/dim/zmq.py
async def launch(self) -> None:
    """Launch the server."""
    loop = asyncio.get_running_loop()
    loop.create_future()

    loop.add_signal_handler(signal.SIGINT, self.socket.close, None)
    loop.add_signal_handler(signal.SIGTERM, self.socket.close, None)

    await self.handler()

wait_for_server async

wait_for_server(
    host: str, port: int, timeout: float = 5.0
) -> None

Wait until the ZeroMQServer responds.

Parameters:

  • host (str) –

    The host of the server to ping.

  • port (int) –

    The port of the server to ping.

  • timeout (float) –

    The max time in seconds to wait for server response.

Raises:

  • RuntimeError

    if the server does not respond within the timeout.

Source code in proxystore/store/dim/zmq.py
async def wait_for_server(host: str, port: int, timeout: float = 5.0) -> None:
    """Wait until the ZeroMQServer responds.

    Args:
        host: The host of the server to ping.
        port: The port of the server to ping.
        timeout: The max time in seconds to wait for server response.

    Raises:
        RuntimeError: if the server does not respond within the timeout.
    """
    start = time.time()
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.REQ)
    socket.setsockopt(zmq.LINGER, 0)

    with socket.connect(f'tcp://{host}:{port}'):
        await socket.send(b'ping')

        poller = zmq.asyncio.Poller()
        poller.register(socket, zmq.POLLIN)

        while time.time() - start < timeout:
            event = await poller.poll(timeout)
            if len(event) != 0:
                response = await socket.recv()
                assert response == b'pong'
                socket.close()
                return

    socket.close()

    raise RuntimeError(
        'Failed to connect to server within timeout ({timeout} seconds).',
    )