Skip to content

proxystore.store.dim.margo

MargoStore implementation.

Protocol

Bases: Enum

Available Mercury plugins and transports.

OFI_TCP class-attribute

OFI_TCP = 'ofi+tcp'

libfabric tcp provider (TCP/IP)

OFI_VERBS class-attribute

OFI_VERBS = 'ofi+verbs'

libfabric Verbs provider (InfiniBand or RoCE)

OFI_GNI class-attribute

OFI_GNI = 'ofi+gni'

libfabric GNI provider (Cray Aries)

UCX_TCP class-attribute

UCX_TCP = 'ucx+tcp'

UCX TCP/IP

UCX_VERBS class-attribute

UCX_VERBS = 'ucx+verbs'

UCX Verbs

SM_SHM class-attribute

SM_SHM = 'sm+shm'

Shared memory shm

BMI_TCP class-attribute

BMI_TCP = 'bmi+tcp'

BMI tcp module (TCP/IP)

MargoStoreKey

Bases: NamedTuple

Key to objects in a MargoStore.

margo_key class-attribute

margo_key: str

Unique object key.

obj_size class-attribute

obj_size: int

Object size in bytes.

peer class-attribute

peer: str

Peer where object is located.

MargoStore

MargoStore(
    name: str,
    *,
    interface: str,
    port: int,
    protocol: Protocol = Protocol.OFI_VERBS,
    cache_size: int = 16,
    stats: bool = False
) -> None

Bases: Store[MargoStoreKey]

MargoStore implementation for intrasite communication.

This client will initialize a local Margo server (Peer service) that it will store data to.

Parameters:

  • name (str) –

    Name of the store instance.

  • interface (str) –

    The network interface to use.

  • port (int) –

    The desired port for the Margo server.

  • protocol (Protocol) –

    The communication protocol to use.

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    Collect stats on store operations.

Source code in proxystore/store/dim/margo.py
def __init__(
    self,
    name: str,
    *,
    interface: str,
    port: int,
    protocol: Protocol = Protocol.OFI_VERBS,
    cache_size: int = 16,
    stats: bool = False,
) -> None:
    global server_process
    global client_pids
    global engine
    global _rpcs

    # raise error if modules not properly loaded
    if pymargo_import_error is not None:  # pragma: no cover
        raise pymargo_import_error

    self.protocol = protocol

    self.host = get_ip_address(interface)
    self.port = port

    self.addr = f'{self.protocol}://{self.host}:{port}'

    if server_process is None:
        server_process = Process(target=self._start_server)
        server_process.start()

    if engine is None:
        # start client
        engine = Engine(
            self.protocol,
            mode=pymargo.client,
            use_progress_thread=True,
        )

        _rpcs = {
            'set': engine.register('set'),
            'get': engine.register('get'),
            'exists': engine.register('exists'),
            'evict': engine.register('evict'),
        }

    self.engine = engine
    self._rpcs = _rpcs

    self.server_started()

    self._pid = getpid()
    client_pids.add(self._pid)

    super().__init__(
        name,
        cache_size=cache_size,
        stats=stats,
        kwargs={
            'interface': interface,
            'port': self.port,
            'protocol': self.protocol,
        },
    )

server_started

server_started() -> None

Loop until server has started.

Source code in proxystore/store/dim/margo.py
def server_started(self) -> None:  # pragma: no cover
    """Loop until server has started."""
    logger.debug('Checking if server has started')
    while True:
        assert engine is not None
        try:
            self._mochi_addr = engine.lookup(self.addr)
            break
        except MargoException:
            pass

close

close() -> None

Terminate Peer server process.

Source code in proxystore/store/dim/margo.py
def close(self) -> None:
    """Terminate Peer server process."""
    global server_process
    global client_pids
    global engine

    client_pids.discard(self._pid)

    logger.info('Clean up requested')

    if len(client_pids) == 0 and server_process is not None:
        engine = None
        self._mochi_addr.shutdown()
        self.engine.finalize()
        server_process.join()
        server_process = None

call_rpc_on staticmethod

call_rpc_on(
    engine: Engine,
    addr: str,
    rpc: RemoteFunction,
    array_str: Bulk,
    key: str,
    size: int,
) -> Status

Initiate the desired RPC call on the specified provider.

Parameters:

  • engine (Engine) –

    The client-side engine.

  • addr (str) –

    The address of Margo provider to access (e.g. tcp://172.21.2.203:6367).

  • rpc (RemoteFunction) –

    The rpc to issue to the server.

  • array_str (Bulk) –

    The serialized data/buffer to send to the server.

  • key (str) –

    The identifier of the data stored on the server.

  • size (int) –

    The size of the the data.

Returns:

  • Status

    A string denoting whether the communication was successful

Source code in proxystore/store/dim/margo.py
@staticmethod
def call_rpc_on(
    engine: Engine,
    addr: str,
    rpc: RemoteFunction,
    array_str: Bulk,
    key: str,
    size: int,
) -> Status:
    """Initiate the desired RPC call on the specified provider.

    Arguments:
        engine: The client-side engine.
        addr: The address of Margo provider to access
            (e.g. tcp://172.21.2.203:6367).
        rpc: The rpc to issue to the server.
        array_str: The serialized data/buffer to send to the server.
        key: The identifier of the data stored on the server.
        size: The size of the the data.

    Returns:
        A string denoting whether the communication was successful
    """
    server_addr = engine.lookup(addr)
    return deserialize(rpc.on(server_addr)(array_str, size, key))

MargoServer

MargoServer(engine: Engine) -> None

MargoServer implementation.

Parameters:

  • engine (Engine) –

    The server engine created at the specified network address.

Source code in proxystore/store/dim/margo.py
def __init__(self, engine: Engine) -> None:
    self.data = {}

    self.engine = engine

    logger.debug('Server initialized')

set

set(
    handle: Handle, bulk_str: Bulk, bulk_size: int, key: str
) -> None

Obtain data from the client and store it in local dictionary.

Parameters:

  • handle (Handle) –

    The client handle.

  • bulk_str (Bulk) –

    The buffer containing the data to be shared.

  • bulk_size (int) –

    The size of the data being transferred.

  • key (str) –

    The data key.

Source code in proxystore/store/dim/margo.py
def set(
    self,
    handle: Handle,
    bulk_str: Bulk,
    bulk_size: int,
    key: str,
) -> None:
    """Obtain data from the client and store it in local dictionary.

    Args:
        handle: The client handle.
        bulk_str: The buffer containing the data to be shared.
        bulk_size: The size of the data being transferred.
        key: The data key.
    """
    logger.debug(f'Received set RPC for key {key}.')

    s = Status(True, None)

    local_buffer = bytearray(bulk_size)
    local_bulk = self.engine.create_bulk(local_buffer, bulk.write_only)
    self.engine.transfer(
        bulk.pull,
        handle.get_addr(),
        bulk_str,
        0,
        local_bulk,
        0,
        bulk_size,
    )
    self.data[key] = local_buffer

    handle.respond(serialize(s))

get

get(
    handle: Handle, bulk_str: Bulk, bulk_size: int, key: str
) -> None

Return data at a given key back to the client.

Parameters:

  • handle (Handle) –

    The client handle.

  • bulk_str (Bulk) –

    The buffer that will store shared data.

  • bulk_size (int) –

    The size of the data to be received.

  • key (str) –

    The data's key.

Source code in proxystore/store/dim/margo.py
def get(
    self,
    handle: Handle,
    bulk_str: Bulk,
    bulk_size: int,
    key: str,
) -> None:
    """Return data at a given key back to the client.

    Args:
        handle: The client handle.
        bulk_str: The buffer that will store shared data.
        bulk_size: The size of the data to be received.
        key: The data's key.
    """
    logger.debug(f'Received get RPC for key {key}.')

    s = Status(True, None)

    try:
        local_array = self.data[key]
        local_bulk = self.engine.create_bulk(local_array, bulk.read_only)
        self.engine.transfer(
            bulk.push,
            handle.get_addr(),
            bulk_str,
            0,
            local_bulk,
            0,
            bulk_size,
        )
    except KeyError as error:
        logger.error(f'key {error} not found.')
        s = Status(False, error)

    handle.respond(serialize(s))

evict

evict(
    handle: Handle, bulk_str: str, bulk_size: int, key: str
) -> None

Remove key from local dictionary.

Parameters:

  • handle (Handle) –

    The client handle.

  • bulk_str (str) –

    The buffer that will store shared data.

  • bulk_size (int) –

    The size of the data to be received.

  • key (str) –

    The data's key.

Source code in proxystore/store/dim/margo.py
def evict(
    self,
    handle: Handle,
    bulk_str: str,
    bulk_size: int,
    key: str,
) -> None:
    """Remove key from local dictionary.

    Args:
        handle: The client handle.
        bulk_str: The buffer that will store shared data.
        bulk_size: The size of the data to be received.
        key: The data's key.
    """
    logger.debug(f'Received exists RPC for key {key}')

    self.data.pop(key, None)
    s = Status(True, None)

    handle.respond(serialize(s))

exists

exists(
    handle: Handle, bulk_str: str, bulk_size: int, key: str
) -> None

Check if key exists within local dictionary.

Parameters:

  • handle (Handle) –

    The client handle.

  • bulk_str (str) –

    The buffer that will store shared data.

  • bulk_size (int) –

    The size of the data to be received.

  • key (str) –

    The data's key.

Source code in proxystore/store/dim/margo.py
def exists(
    self,
    handle: Handle,
    bulk_str: str,
    bulk_size: int,
    key: str,
) -> None:
    """Check if key exists within local dictionary.

    Args:
        handle: The client handle.
        bulk_str: The buffer that will store shared data.
        bulk_size: The size of the data to be received.
        key: The data's key.
    """
    logger.debug(f'Received exists RPC for key {key}')

    s = Status(True, None)

    # converting to int then string because length appears to be 7 for
    # True with pickle protocol 4 and cannot always guarantee that that
    # protocol will be selected
    local_array = serialize(str(int(key in self.data)))
    local_bulk = self.engine.create_bulk(local_array, bulk.read_only)
    size = len(local_array)
    self.engine.transfer(
        bulk.push,
        handle.get_addr(),
        bulk_str,
        0,
        local_bulk,
        0,
        size,
    )

    handle.respond(serialize(s))

when_finalize

when_finalize() -> None

Print a statement advising that engine finalization was triggered.

Source code in proxystore/store/dim/margo.py
def when_finalize() -> None:
    """Print a statement advising that engine finalization was triggered."""
    logger.info('Finalize was called. Cleaning up.')