Skip to content

proxystore.store

The ProxyStore Store interface.

Store

Store(
    name: str,
    connector: ConnectorT,
    *,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    cache_size: int = 16,
    metrics: bool = False,
    populate_target: bool = True,
    register: bool = False
)

Bases: Generic[ConnectorT]

Key-value store interface for proxies.

Tip

A Store instance can be used as a context manager which will automatically call close() on exit.

with Store('my-store', connector=...) as store:
    key = store.put('value')
    store.get(key)
Warning

The default value of populate_target=True can cause unexpected behavior when providing custom serializer/deserializers because neither the serializer nor deserializer will be applied to the target object being cached in the resulting Proxy.

import pickle
from proxystore.store import Store
from proxystore.connectors.local import LocalConnector

with Store('example', LocalConnector(), register=True) as store:
    data = [1, 2, 3]
    data_bytes = pickle.dumps(data)

    data_proxy = store.proxy(
        data_bytes,
        serializer=lambda s: s,
        deserializer=pickle.loads,
        populate_target=True,
    )

    print(data_proxy)
    # b'\x80\x04\x95\x0b\x00\x00\x00\x00\x00\x00\x00]\x94(K\x01K\x02K\x03e.'

In this example, the serialized data_bytes was populated as the target object in the resulting proxy so the proxy looks like a proxy of bytes rather than the intended list of integers. To fix this, set populate_target=False so the custom deserializer is correctly applied to data_bytes when the proxy is resolved.

Note

This class is generally thread-safe, with cache access and connector operations guarded by a lock that is local to each store instance.

Warning

This class cannot be pickled. If you need to recreate a Store within another process, share a StoreConfig, a serializable and pickle-compatbile type, that can be created using Store.config().

To reconstruct the instance from the config, use Store.from_config() or get_or_create_store().

Parameters:

  • name (str) –

    Name of the store instance.

  • connector (ConnectorT) –

    Connector instance to use for object storage.

  • serializer (SerializerT | None, default: None ) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • deserializer (DeserializerT | None, default: None ) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • cache_size (int, default: 16 ) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • metrics (bool, default: False ) –

    Enable recording operation metrics.

  • populate_target (bool, default: True ) –

    Set the default value of populate_target for proxy methods of the store.

  • register (bool, default: False ) –

    Register the store instance after initialization.

Raises:

Source code in proxystore/store/base.py
def __init__(
    self,
    name: str,
    connector: ConnectorT,
    *,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    cache_size: int = 16,
    metrics: bool = False,
    populate_target: bool = True,
    register: bool = False,
) -> None:
    if cache_size < 0:
        raise ValueError(
            f'Cache size cannot be negative. Got {cache_size}.',
        )

    self.connector = connector
    self.cache: LRUCache[ConnectorKeyT, Any] = LRUCache(cache_size)
    self._name = name
    self._metrics = StoreMetrics() if metrics else None
    self._cache_size = cache_size
    self._serializer = serializer
    self._deserializer = deserializer
    self._populate_target = populate_target
    self._register = register

    if self._register:
        try:
            proxystore.store.register_store(self)
        except StoreExistsError as e:
            if sys.version_info >= (3, 11):  # pragma: >=3.11 cover
                e.add_note(
                    'Consider using get_store(name) rather than '
                    'initializing a new instance with register=True.',
                )
            else:  # pragma: <3.11 cover
                pass
            raise

    self._lock = threading.RLock()

    logger.info(f'Initialized {self}')

name property

name: str

Name of this Store instance.

metrics property

metrics: StoreMetrics | None

Optional metrics for this instance.

serializer property

serializer: SerializerT

Serializer for this instance.

deserializer property

deserializer: DeserializerT

Deserializer for this instance.

close

close(*args: Any, **kwargs: Any) -> None

Close the connector associated with the store.

This will (1) close the connector and (2) unregister the store if register=True was set during initialization.

Warning

This method should only be called at the end of the program when the store will no longer be used, for example once all proxies have been resolved.

Parameters:

Source code in proxystore/store/base.py
def close(self, *args: Any, **kwargs: Any) -> None:
    """Close the connector associated with the store.

    This will (1) close the connector and (2) unregister the store if
    `register=True` was set during initialization.

    Warning:
        This method should only be called at the end of the program
        when the store will no longer be used, for example once all
        proxies have been resolved.

    Args:
        args: Positional arguments to pass to
            [`Connector.close()`][proxystore.connectors.protocols.Connector.close].
        kwargs: Keyword arguments to pass to
            [`Connector.close()`][proxystore.connectors.protocols.Connector.close].
    """
    if self._register:
        proxystore.store.unregister_store(self.name)
    with self._lock:
        self.connector.close(*args, **kwargs)

config

config() -> StoreConfig

Get the store configuration.

Example
>>> store = Store(...)
>>> config = store.config()
>>> store = Store.from_config(config)

Returns:

Source code in proxystore/store/base.py
def config(self) -> StoreConfig:
    """Get the store configuration.

    Example:
        ```python
        >>> store = Store(...)
        >>> config = store.config()
        >>> store = Store.from_config(config)
        ```

    Returns:
        Store configuration.
    """
    return StoreConfig(
        name=self.name,
        connector=ConnectorConfig(
            kind=get_object_path(type(self.connector)),
            options=self.connector.config(),
        ),
        serializer=self._serializer,
        deserializer=self._deserializer,
        cache_size=self._cache_size,
        metrics=self.metrics is not None,
        populate_target=self._populate_target,
        auto_register=self._register,
    )

from_config classmethod

from_config(config: StoreConfig) -> Store[Any]

Create a new store instance from a configuration.

Parameters:

  • config (StoreConfig) –

    Configuration returned by .config().

Returns:

Source code in proxystore/store/base.py
@classmethod
def from_config(cls, config: StoreConfig) -> Store[Any]:
    """Create a new store instance from a configuration.

    Args:
        config: Configuration returned by `#!python .config()`.

    Returns:
        Store instance.
    """
    connector = cast(ConnectorT, config.connector.get_connector())
    return cls(
        name=config.name,
        connector=connector,
        serializer=config.serializer,
        deserializer=config.deserializer,
        cache_size=config.cache_size,
        metrics=config.metrics,
        populate_target=config.populate_target,
        register=config.auto_register,
    )

future

future(
    *,
    evict: bool = False,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    polling_interval: float = 1,
    polling_backoff_factor: float = 1,
    polling_interval_limit: float | None = None,
    polling_timeout: float | None = None
) -> Future[T]

Create a future to an object.

Example
from proxystore.connectors.file import FileConnector
from proxystore.store import Store
from proxystore.store.future import Future

def remote_foo(future: Future) -> None:
    # Computation that generates a result value needed by
    # the remote_bar function.
    future.set_result(...)

def remote_bar(data: Any) -> None:
    # Function uses data, which is a proxy, as normal, blocking
    # until the remote_foo function has called set_result.
    ...

with Store('future-example', FileConnector(...)) as store:
    future = store.future()

    # The invoke_remove function invokes a provided function
    # on a remote process. For example, this could be a serverless
    # function execution.
    foo_result_future = invoke_remote(remote_foo, future)
    bar_result_future = invoke_remote(remote_bar, future.proxy())

    foo_result_future.result()
    bar_result_future.result()
Warning

This method only works if the connector is of type DeferrableConnector.

Warning

This method and the Future.proxy() are experimental features and may change in future releases.

Parameters:

  • evict (bool, default: False ) –

    If a proxy returned by Future.proxy() should evict the object once resolved.

  • serializer (SerializerT | None, default: None ) –

    Optionally override the default serializer for the store instance.

  • deserializer (DeserializerT | None, default: None ) –

    Optionally override the default deserializer for the store instance.

  • polling_interval (float, default: 1 ) –

    Initial seconds to sleep between polling the store for the object.

  • polling_backoff_factor (float, default: 1 ) –

    Multiplicative factor applied to the polling_interval applied after each unsuccessful poll.

  • polling_interval_limit (float | None, default: None ) –

    Maximum polling interval allowed. Prevents the backoff factor from increasing the current polling interval to unreasonable values.

  • polling_timeout (float | None, default: None ) –

    Optional maximum number of seconds to poll for. If the timeout is reached an error is raised.

Returns:

  • Future[T]

    Future which can be used to get the result object at a later time or create a proxy which will resolve to the result of the future.

Raises:

Source code in proxystore/store/base.py
def future(
    self,
    *,
    evict: bool = False,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    polling_interval: float = 1,
    polling_backoff_factor: float = 1,
    polling_interval_limit: float | None = None,
    polling_timeout: float | None = None,
) -> Future[T]:
    """Create a future to an object.

    Example:
        ```python
        from proxystore.connectors.file import FileConnector
        from proxystore.store import Store
        from proxystore.store.future import Future

        def remote_foo(future: Future) -> None:
            # Computation that generates a result value needed by
            # the remote_bar function.
            future.set_result(...)

        def remote_bar(data: Any) -> None:
            # Function uses data, which is a proxy, as normal, blocking
            # until the remote_foo function has called set_result.
            ...

        with Store('future-example', FileConnector(...)) as store:
            future = store.future()

            # The invoke_remove function invokes a provided function
            # on a remote process. For example, this could be a serverless
            # function execution.
            foo_result_future = invoke_remote(remote_foo, future)
            bar_result_future = invoke_remote(remote_bar, future.proxy())

            foo_result_future.result()
            bar_result_future.result()
        ```

    Warning:
        This method only works if the `connector` is of type
        [`DeferrableConnector`][proxystore.connectors.protocols.DeferrableConnector].

    Warning:
        This method and the
        [`Future.proxy()`][proxystore.store.future.Future.proxy]
        are experimental features and may change in future releases.

    Args:
        evict: If a proxy returned by
            [`Future.proxy()`][proxystore.store.future.Future.proxy]
            should evict the object once resolved.
        serializer: Optionally override the default serializer for the
            store instance.
        deserializer: Optionally override the default deserializer for the
            store instance.
        polling_interval: Initial seconds to sleep between polling the
            store for the object.
        polling_backoff_factor: Multiplicative factor applied to the
            polling_interval applied after each unsuccessful poll.
        polling_interval_limit: Maximum polling interval allowed. Prevents
            the backoff factor from increasing the current polling interval
            to unreasonable values.
        polling_timeout: Optional maximum number of seconds to poll for. If
            the timeout is reached an error is raised.

    Returns:
        Future which can be used to get the result object at a later time \
        or create a proxy which will resolve to the result of the future.

    Raises:
        NotImplementedError: If the `connector` is not of type
            [`DeferrableConnector`][proxystore.connectors.protocols.DeferrableConnector].
    """
    timer = Timer().start()

    if not isinstance(self.connector, DeferrableConnector):
        raise NotImplementedError(
            'The provided connector is type '
            f'{type(self.connector).__name__} which does not implement '
            f'the {DeferrableConnector.__name__} necessary to use the '
            f'{Future.__name__} interface.',
        )

    with Timer() as connector_timer:
        key = self.connector.new_key()

    if self.metrics is not None:
        ctime = connector_timer.elapsed_ms
        self.metrics.add_time('store.future.connector', key, ctime)

    factory: PollingStoreFactory[ConnectorT, T] = PollingStoreFactory(
        key,
        store_config=self.config(),
        deserializer=deserializer,
        evict=evict,
        polling_interval=polling_interval,
        polling_backoff_factor=polling_backoff_factor,
        polling_interval_limit=polling_interval_limit,
        polling_timeout=polling_timeout,
    )
    future = Future(factory, serializer=serializer)

    timer.stop()
    if self.metrics is not None:
        self.metrics.add_time('store.future', key, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): FUTURE {key} in '
        f'{timer.elapsed_ms:.3f} ms',
    )
    return future

evict

evict(key: ConnectorKeyT) -> None

Evict the object associated with the key.

Parameters:

Source code in proxystore/store/base.py
def evict(self, key: ConnectorKeyT) -> None:
    """Evict the object associated with the key.

    Args:
        key: Key associated with object to evict.
    """
    timer = Timer().start()

    with self._lock:
        with Timer() as connector_timer:
            self.connector.evict(key)

        if self.metrics is not None:
            ctime = connector_timer.elapsed_ms
            self.metrics.add_time('store.evict.connector', key, ctime)

        self.cache.evict(key)

    timer.stop()
    if self.metrics is not None:
        self.metrics.add_time('store.evict', key, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): EVICT {key} in '
        f'{timer.elapsed_ms:.3f} ms',
    )

exists

exists(key: ConnectorKeyT) -> bool

Check if an object associated with the key exists.

Parameters:

  • key (ConnectorKeyT) –

    Key potentially associated with stored object.

Returns:

  • bool

    If an object associated with the key exists.

Source code in proxystore/store/base.py
def exists(self, key: ConnectorKeyT) -> bool:
    """Check if an object associated with the key exists.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If an object associated with the key exists.
    """
    timer = Timer().start()

    with self._lock:
        res = self.cache.exists(key)
        if not res:
            with Timer() as connector_timer:
                res = self.connector.exists(key)

            if self.metrics is not None:
                ctime = connector_timer.elapsed_ms
                self.metrics.add_time('store.exists.connector', key, ctime)

    timer.stop()
    if self.metrics is not None:
        self.metrics.add_time('store.exists', key, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): EXISTS {key} in '
        f'{timer.elapsed_ms:.3f} ms',
    )
    return res

get

get(
    key: ConnectorKeyT,
    *,
    deserializer: DeserializerT | None = None,
    default: object | None = None
) -> Any | None

Get the object associated with the key.

Parameters:

  • key (ConnectorKeyT) –

    Key associated with the object to retrieve.

  • deserializer (DeserializerT | None, default: None ) –

    Optionally override the default deserializer for the store instance.

  • default (object | None, default: None ) –

    An optional value to be returned if an object associated with the key does not exist.

Returns:

  • Any | None

    Object or None if the object does not exist.

Raises:

  • SerializationError

    If an exception is caught when deserializing the object associated with the key.

Source code in proxystore/store/base.py
def get(
    self,
    key: ConnectorKeyT,
    *,
    deserializer: DeserializerT | None = None,
    default: object | None = None,
) -> Any | None:
    """Get the object associated with the key.

    Args:
        key: Key associated with the object to retrieve.
        deserializer: Optionally override the default deserializer for the
            store instance.
        default: An optional value to be returned if an object
            associated with the key does not exist.

    Returns:
        Object or `None` if the object does not exist.

    Raises:
        SerializationError: If an exception is caught when deserializing
            the object associated with the key.
    """
    timer = Timer().start()

    with self._lock:
        cached = self.cache.get(key, _MISSING_OBJECT)
        if cached is not _MISSING_OBJECT:
            timer.stop()
            if self.metrics is not None:
                self.metrics.add_counter('store.get.cache_hits', key, 1)
                self.metrics.add_time('store.get', key, timer.elapsed_ms)

            logger.debug(
                f'Store(name="{self.name}"): GET {key} in '
                f'{timer.elapsed_ms:.3f} ms (cached=True)',
            )
            return cached

        with Timer() as connector_timer:
            value = self.connector.get(key)

        if self.metrics is not None:
            ctime = connector_timer.elapsed_ms
            self.metrics.add_counter('store.get.cache_misses', key, 1)
            self.metrics.add_time('store.get.connector', key, ctime)

        if value is not None:
            with Timer() as deserializer_timer:
                deserializer = (
                    deserializer
                    if deserializer is not None
                    else self.deserializer
                )
                try:
                    result = deserializer(value)
                except Exception as e:
                    name = get_object_path(deserializer)
                    raise SerializationError(
                        'Failed to deserialize object '
                        f'(deserializer={name}, key={key}).',
                    ) from e

            if self.metrics is not None:
                dtime = deserializer_timer.elapsed_ms
                obj_size = len(value)
                self.metrics.add_time('store.get.deserialize', key, dtime)
                self.metrics.add_attribute(
                    'store.get.object_size',
                    key,
                    obj_size,
                )

            self.cache.set(key, result)
        else:
            result = default

    timer.stop()
    if self.metrics is not None:
        self.metrics.add_time('store.get', key, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): GET {key} in '
        f'{timer.elapsed_ms:.3f} ms (cached=False)',
    )
    return result

is_cached

is_cached(key: ConnectorKeyT) -> bool

Check if an object associated with the key is cached locally.

Parameters:

  • key (ConnectorKeyT) –

    Key potentially associated with stored object.

Returns:

  • bool

    If the object is cached.

Source code in proxystore/store/base.py
def is_cached(self, key: ConnectorKeyT) -> bool:
    """Check if an object associated with the key is cached locally.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If the object is cached.
    """
    with self._lock:
        return self.cache.exists(key)

proxy

proxy(
    obj: T | NonProxiableT,
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = False,
    **kwargs: Any
) -> Proxy[T] | NonProxiableT

Create a proxy that will resolve to an object in the store.

Parameters:

  • obj (T | NonProxiableT) –

    The object to place in store and return a proxy for.

  • evict (bool, default: False ) –

    If the proxy should evict the object once resolved. Mutually exclusive with the lifetime parameter.

  • lifetime (Lifetime | None, default: None ) –

    Attach the proxy to this lifetime. The object associated with the proxy will be evicted when the lifetime ends. Mutually exclusive with the evict parameter.

  • serializer (SerializerT | None, default: None ) –

    Optionally override the default serializer for the store instance.

  • deserializer (DeserializerT | None, default: None ) –

    Optionally override the default deserializer for the store instance.

  • populate_target (bool | None, default: None ) –

    Pass cache_defaults=True and target=obj to the Proxy constructor. I.e., return a proxy that (1) is already resolved, (2) can be used in isinstance checks without resolving, and (3) is hashable without resolving if obj is a hashable type. This is False by default because the returned proxy will hold a reference to obj which will prevent garbage collecting obj. If None, defaults to the store-wide setting.

  • skip_nonproxiable (bool, default: False ) –

    Return non-proxiable types (e.g., built-in constants like bool or None) rather than raising a NonProxiableTypeError.

  • kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to Connector.put().

Returns:

  • Proxy[T] | NonProxiableT

    A proxy of the object unless obj is a non-proxiable type skip_nonproxiable is True in which case obj is returned directly.

Raises:

  • NonProxiableTypeError

    If obj is a non-proxiable type. This behavior can be overridden by setting skip_nonproxiable=True.

  • ValueError

    If evict is True and lifetime is not None because these parameters are mutually exclusive.

Source code in proxystore/store/base.py
def proxy(
    self,
    obj: T | NonProxiableT,
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = False,
    **kwargs: Any,
) -> Proxy[T] | NonProxiableT:
    """Create a proxy that will resolve to an object in the store.

    Args:
        obj: The object to place in store and return a proxy for.
        evict: If the proxy should evict the object once resolved.
            Mutually exclusive with the `lifetime` parameter.
        lifetime: Attach the proxy to this lifetime. The object associated
            with the proxy will be evicted when the lifetime ends.
            Mutually exclusive with the `evict` parameter.
        serializer: Optionally override the default serializer for the
            store instance.
        deserializer: Optionally override the default deserializer for the
            store instance.
        populate_target: Pass `cache_defaults=True` and `target=obj` to
            the [`Proxy`][proxystore.proxy.Proxy] constructor. I.e.,
            return a proxy that (1) is already resolved, (2) can be used
            in [`isinstance`][isinstance] checks without resolving, and (3)
            is hashable without resolving if `obj` is a hashable type.
            This is `False` by default because the returned proxy will
            hold a reference to `obj` which will prevent garbage
            collecting `obj`. If `None`, defaults to the store-wide
            setting.
        skip_nonproxiable: Return non-proxiable types (e.g., built-in
            constants like `bool` or `None`) rather than raising a
            [`NonProxiableTypeError`][proxystore.store.exceptions.NonProxiableTypeError].
        kwargs: Additional keyword arguments to pass to
            [`Connector.put()`][proxystore.connectors.protocols.Connector.put].

    Returns:
        A proxy of the object unless `obj` is a non-proxiable type \
        `#!python skip_nonproxiable is True` in which case `obj` is \
        returned directly.

    Raises:
        NonProxiableTypeError: If `obj` is a non-proxiable type. This
            behavior can be overridden by setting
            `#!python skip_nonproxiable=True`.
        ValueError: If `evict` is `True` and `lifetime` is not `None`
            because these parameters are mutually exclusive.
    """
    if evict and lifetime is not None:
        raise ValueError(
            'The evict and lifetime parameters are mutually exclusive. '
            'Only set one of evict or lifetime.',
        )

    if isinstance(obj, _NON_PROXIABLE_TYPES):
        if skip_nonproxiable:
            # MyPy raises the following error which is not correct:
            #     Incompatible return value type (got "Optional[bool]",
            #     expected "Optional[Proxy[T]]")  [return-value]
            return obj  # type: ignore[return-value]
        else:
            raise NonProxiableTypeError(
                f'Object of {type(obj)} is not proxiable.',
            )

    with Timer() as timer:
        key = self.put(obj, serializer=serializer, **kwargs)
        factory: StoreFactory[ConnectorT, T] = StoreFactory(
            key,
            store_config=self.config(),
            deserializer=deserializer,
            evict=evict,
        )
        populate_target = (
            self._populate_target
            if populate_target is None
            else populate_target
        )
        if populate_target:
            # If obj were None, we would have escaped early when
            # checking _NON_PROXIABLE_TYPES.
            assert obj is not None
            proxy = Proxy(factory, cache_defaults=True, target=obj)
        else:
            proxy = Proxy(factory)

        if lifetime is not None:
            lifetime.add_proxy(proxy)

    if self.metrics is not None:
        self.metrics.add_time('store.proxy', key, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): PROXY {key} in '
        f'{timer.elapsed_ms:.3f} ms',
    )
    return proxy

proxy_batch

proxy_batch(
    objs: Sequence[T | NonProxiableT],
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = False,
    **kwargs: Any
) -> list[Proxy[T] | NonProxiableT]

Create proxies that will resolve to an object in the store.

Parameters:

  • objs (Sequence[T | NonProxiableT]) –

    The objects to place in store and return a proxies for.

  • evict (bool, default: False ) –

    If a proxy should evict its object once resolved. Mutually exclusive with the lifetime parameter.

  • lifetime (Lifetime | None, default: None ) –

    Attach the proxies to this lifetime. The objects associated with each proxy will be evicted when the lifetime ends. Mutually exclusive with the evict parameter.

  • serializer (SerializerT | None, default: None ) –

    Optionally override the default serializer for the store instance.

  • deserializer (DeserializerT | None, default: None ) –

    Optionally override the default deserializer for the store instance.

  • populate_target (bool | None, default: None ) –

    Pass cache_defaults=True and target=obj to the Proxy constructor. I.e., return a proxy that (1) is already resolved, (2) can be used in isinstance checks without resolving, and (3) is hashable without resolving if obj is a hashable type. If None, defaults to the store-wide setting.

  • skip_nonproxiable (bool, default: False ) –

    Return non-proxiable types (e.g., built-in constants like bool or None) rather than raising a NonProxiableTypeError.

  • kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to Connector.put_batch().

Returns:

  • list[Proxy[T] | NonProxiableT]

    A list of proxies of each object or the object itself if said object is not proxiable and skip_nonproxiable is True.

Raises:

  • NonProxiableTypeError

    If obj is a non-proxiable type. This behavior can be overridden by setting skip_nonproxiable=True.

  • ValueError

    If evict is True and lifetime is not None because these parameters are mutually exclusive.

Source code in proxystore/store/base.py
def proxy_batch(  # type: ignore[misc]
    self,
    objs: Sequence[T | NonProxiableT],
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = False,
    **kwargs: Any,
) -> list[Proxy[T] | NonProxiableT]:
    """Create proxies that will resolve to an object in the store.

    Args:
        objs: The objects to place in store and return a proxies for.
        evict: If a proxy should evict its object once resolved.
            Mutually exclusive with the `lifetime` parameter.
        lifetime: Attach the proxies to this lifetime. The objects
            associated with each proxy will be evicted when the lifetime
            ends. Mutually exclusive with the `evict` parameter.
        serializer: Optionally override the default serializer for the
            store instance.
        deserializer: Optionally override the default deserializer for the
            store instance.
        populate_target: Pass `cache_defaults=True` and `target=obj` to
            the [`Proxy`][proxystore.proxy.Proxy] constructor. I.e.,
            return a proxy that (1) is already resolved, (2) can be used
            in [`isinstance`][isinstance] checks without resolving, and (3)
            is hashable without resolving if `obj` is a hashable type.
            If `None`, defaults to the store-wide setting.
        skip_nonproxiable: Return non-proxiable types (e.g., built-in
            constants like `bool` or `None`) rather than raising a
            [`NonProxiableTypeError`][proxystore.store.exceptions.NonProxiableTypeError].
        kwargs: Additional keyword arguments to pass to
            [`Connector.put_batch()`][proxystore.connectors.protocols.Connector.put_batch].

    Returns:
        A list of proxies of each object or the object itself if said \
        object is not proxiable and `#!python skip_nonproxiable is True`.

    Raises:
        NonProxiableTypeError: If `obj` is a non-proxiable type. This
            behavior can be overridden by setting
            `#!python skip_nonproxiable=True`.
        ValueError: If `evict` is `True` and `lifetime` is not `None`
            because these parameters are mutually exclusive.
    """
    if evict and lifetime is not None:
        raise ValueError(
            'The evict and lifetime parameters are mutually exclusive. '
            'Only set one of evict or lifetime.',
        )

    with Timer() as timer:
        # Find if there are non-proxiable types and if that's okay
        non_proxiable: list[tuple[int, Any]] = []
        for i, obj in enumerate(objs):
            if isinstance(obj, _NON_PROXIABLE_TYPES):
                non_proxiable.append((i, obj))

        if len(non_proxiable) > 0 and not skip_nonproxiable:
            raise NonProxiableTypeError(
                f'Input sequence contains {len(non_proxiable)} '
                'objects that are not proxiable.',
            )

        # Pop non-proxiable types so we can batch proxy the proxiable ones
        non_proxiable_indicies = [i for i, _ in non_proxiable]
        proxiable_objs = [
            obj
            for i, obj in enumerate(objs)
            if i not in non_proxiable_indicies
        ]

        keys = self.put_batch(
            proxiable_objs,
            serializer=serializer,
            **kwargs,
        )
        factories: list[StoreFactory[ConnectorT, T]] = [
            StoreFactory(
                key,
                store_config=self.config(),
                evict=evict,
                deserializer=deserializer,
            )
            for key in keys
        ]

        populate_target = (
            self._populate_target
            if populate_target is None
            else populate_target
        )

        proxies: list[Proxy[T]] = []
        for factory, obj in zip(factories, proxiable_objs):
            if populate_target:
                proxy = Proxy(factory, cache_defaults=True, target=obj)
            else:
                proxy = Proxy(factory)
            proxies.append(proxy)

        if lifetime is not None:
            lifetime.add_proxy(*proxies)

        # Put non-proxiable objects back in their original positions.
        # The indices of non_proxiable must still be sorted
        for original_index, original_object in non_proxiable:
            proxies.insert(original_index, original_object)

    if self.metrics is not None:
        self.metrics.add_time('store.proxy_batch', keys, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): PROXY_BATCH ({len(proxies)} items) '
        f'in {timer.elapsed_ms:.3f} ms',
    )
    return cast(list[Union[Proxy[T], NonProxiableT]], proxies)

proxy_from_key

proxy_from_key(
    key: ConnectorKeyT,
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    deserializer: DeserializerT | None = None
) -> Proxy[T]

Create a proxy that will resolve to an object already in the store.

Parameters:

  • key (ConnectorKeyT) –

    The key associated with an object already in the store.

  • evict (bool, default: False ) –

    If the proxy should evict the object once resolved. Mutually exclusive with the lifetime parameter.

  • lifetime (Lifetime | None, default: None ) –

    Attach the proxy to this lifetime. The object associated with the proxy will be evicted when the lifetime ends. Mutually exclusive with the evict parameter.

  • deserializer (DeserializerT | None, default: None ) –

    Optionally override the default deserializer for the store instance.

Returns:

  • Proxy[T]

    A proxy of the object.

Raises:

  • ValueError

    If evict is True and lifetime is not None because these parameters are mutually exclusive.

Source code in proxystore/store/base.py
def proxy_from_key(
    self,
    key: ConnectorKeyT,
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    deserializer: DeserializerT | None = None,
) -> Proxy[T]:
    """Create a proxy that will resolve to an object already in the store.

    Args:
        key: The key associated with an object already in the store.
        evict: If the proxy should evict the object once resolved.
            Mutually exclusive with the `lifetime` parameter.
        lifetime: Attach the proxy to this lifetime. The object associated
            with the proxy will be evicted when the lifetime ends.
            Mutually exclusive with the `evict` parameter.
        deserializer: Optionally override the default deserializer for the
            store instance.

    Returns:
        A proxy of the object.

    Raises:
        ValueError: If `evict` is `True` and `lifetime` is not `None`
            because these parameters are mutually exclusive.
    """
    if evict and lifetime is not None:
        raise ValueError(
            'The evict and lifetime parameters are mutually exclusive. '
            'Only set one of evict or lifetime.',
        )

    factory: StoreFactory[ConnectorT, T] = StoreFactory(
        key,
        store_config=self.config(),
        deserializer=deserializer,
        evict=evict,
    )
    proxy = Proxy(factory)

    logger.debug(f'Store(name="{self.name}"): PROXY_FROM_KEY {key}')

    if lifetime is not None:
        lifetime.add_proxy(proxy)

    return proxy

locked_proxy

locked_proxy(
    obj: T | NonProxiableT,
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = True,
    **kwargs: Any
) -> ProxyLocker[T] | NonProxiableT

Proxy an object and return ProxyLocker.

Parameters:

  • obj (T | NonProxiableT) –

    The object to place in store and return a proxy for.

  • evict (bool, default: False ) –

    If the proxy should evict the object once resolved. Mutually exclusive with the lifetime parameter.

  • lifetime (Lifetime | None, default: None ) –

    Attach the proxy to this lifetime. The object associated with the proxy will be evicted when the lifetime ends. Mutually exclusive with the evict parameter.

  • serializer (SerializerT | None, default: None ) –

    Optionally override the default serializer for the store instance.

  • deserializer (DeserializerT | None, default: None ) –

    Optionally override the default deserializer for the store instance.

  • populate_target (bool | None, default: None ) –

    Pass cache_defaults=True and target=obj to the Proxy constructor. I.e., return a proxy that (1) is already resolved, (2) can be used in isinstance checks without resolving, and (3) is hashable without resolving if obj is a hashable type. If None, defaults to the store-wide setting.

  • skip_nonproxiable (bool, default: True ) –

    Return non-proxiable types (e.g., built-in constants like bool or None) rather than raising a NonProxiableTypeError.

  • kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to Connector.put().

Returns:

  • ProxyLocker[T] | NonProxiableT

    A proxy wrapped in a ProxyLocker unless obj is a non-proxiable type skip_nonproxiable is True in which case obj is returned directly.

Raises:

  • NonProxiableTypeError

    If obj is a non-proxiable type. This behavior can be overridden by setting skip_nonproxiable=True.

  • ValueError

    If evict is True and lifetime is not None because these parameters are mutually exclusive.

Source code in proxystore/store/base.py
def locked_proxy(
    self,
    obj: T | NonProxiableT,
    *,
    evict: bool = False,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = True,
    **kwargs: Any,
) -> ProxyLocker[T] | NonProxiableT:
    """Proxy an object and return [`ProxyLocker`][proxystore.proxy.ProxyLocker].

    Args:
        obj: The object to place in store and return a proxy for.
        evict: If the proxy should evict the object once resolved.
            Mutually exclusive with the `lifetime` parameter.
        lifetime: Attach the proxy to this lifetime. The object associated
            with the proxy will be evicted when the lifetime ends.
            Mutually exclusive with the `evict` parameter.
        serializer: Optionally override the default serializer for the
            store instance.
        deserializer: Optionally override the default deserializer for the
            store instance.
        populate_target: Pass `cache_defaults=True` and `target=obj` to
            the [`Proxy`][proxystore.proxy.Proxy] constructor. I.e.,
            return a proxy that (1) is already resolved, (2) can be used
            in [`isinstance`][isinstance] checks without resolving, and (3)
            is hashable without resolving if `obj` is a hashable type.
            If `None`, defaults to the store-wide setting.
        skip_nonproxiable: Return non-proxiable types (e.g., built-in
            constants like `bool` or `None`) rather than raising a
            [`NonProxiableTypeError`][proxystore.store.exceptions.NonProxiableTypeError].
        kwargs: Additional keyword arguments to pass to
            [`Connector.put()`][proxystore.connectors.protocols.Connector.put].

    Returns:
        A proxy wrapped in a \
        [`ProxyLocker`][proxystore.proxy.ProxyLocker] unless `obj` is a \
        non-proxiable type `#!python skip_nonproxiable is True` in which \
        case `obj` is returned directly.

    Raises:
        NonProxiableTypeError: If `obj` is a non-proxiable type. This
            behavior can be overridden by setting
            `#!python skip_nonproxiable=True`.
        ValueError: If `evict` is `True` and `lifetime` is not `None`
            because these parameters are mutually exclusive.
    """  # noqa: E501
    possible_proxy = self.proxy(
        obj,
        evict=evict,
        lifetime=lifetime,
        serializer=serializer,
        deserializer=deserializer,
        populate_target=populate_target,
        skip_nonproxiable=skip_nonproxiable,
        **kwargs,
    )

    if isinstance(possible_proxy, Proxy):
        return ProxyLocker(possible_proxy)
    return possible_proxy

owned_proxy

owned_proxy(
    obj: T | NonProxiableT,
    *,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = True,
    **kwargs: Any
) -> OwnedProxy[T] | NonProxiableT

Create a proxy that will enforce ownership rules over the object.

An OwnedProxy will auto-evict the object once it goes out of scope. This proxy type can also be borrowed.

Parameters:

  • obj (T | NonProxiableT) –

    The object to place in store and return a proxy for.

  • serializer (SerializerT | None, default: None ) –

    Optionally override the default serializer for the store instance.

  • deserializer (DeserializerT | None, default: None ) –

    Optionally override the default deserializer for the store instance.

  • populate_target (bool | None, default: None ) –

    Pass cache_defaults=True and target=obj to the Proxy constructor. I.e., return a proxy that (1) is already resolved, (2) can be used in isinstance checks without resolving, and (3) is hashable without resolving if obj is a hashable type. If None, defaults to the store-wide setting.

  • skip_nonproxiable (bool, default: True ) –

    Return non-proxiable types (e.g., built-in constants like bool or None) rather than raising a NonProxiableTypeError.

  • kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to Connector.put().

Returns:

  • OwnedProxy[T] | NonProxiableT

    A proxy of the object unless obj is a non-proxiable type skip_nonproxiable is True in which case obj is returned directly.

Raises:

  • NonProxiableTypeError

    If obj is a non-proxiable type. This behavior can be overridden by setting skip_nonproxiable=True.

Source code in proxystore/store/base.py
def owned_proxy(
    self,
    obj: T | NonProxiableT,
    *,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    populate_target: bool | None = None,
    skip_nonproxiable: bool = True,
    **kwargs: Any,
) -> OwnedProxy[T] | NonProxiableT:
    """Create a proxy that will enforce ownership rules over the object.

    An [`OwnedProxy`][proxystore.store.ref.OwnedProxy] will auto-evict
    the object once it goes out of scope. This proxy type can also
    be borrowed.

    Args:
        obj: The object to place in store and return a proxy for.
        serializer: Optionally override the default serializer for the
            store instance.
        deserializer: Optionally override the default deserializer for the
            store instance.
        populate_target: Pass `cache_defaults=True` and `target=obj` to
            the [`Proxy`][proxystore.proxy.Proxy] constructor. I.e.,
            return a proxy that (1) is already resolved, (2) can be used
            in [`isinstance`][isinstance] checks without resolving, and (3)
            is hashable without resolving if `obj` is a hashable type.
            If `None`, defaults to the store-wide setting.
        skip_nonproxiable: Return non-proxiable types (e.g., built-in
            constants like `bool` or `None`) rather than raising a
            [`NonProxiableTypeError`][proxystore.store.exceptions.NonProxiableTypeError].
        kwargs: Additional keyword arguments to pass to
            [`Connector.put()`][proxystore.connectors.protocols.Connector.put].

    Returns:
        A proxy of the object unless `obj` is a non-proxiable type \
        `#!python skip_nonproxiable is True` in which case `obj` is \
        returned directly.

    Raises:
        NonProxiableTypeError: If `obj` is a non-proxiable type. This
            behavior can be overridden by setting
            `#!python skip_nonproxiable=True`.
    """
    possible_proxy = self.proxy(
        obj,
        evict=False,
        serializer=serializer,
        deserializer=deserializer,
        populate_target=populate_target,
        skip_nonproxiable=skip_nonproxiable,
        **kwargs,
    )

    if isinstance(possible_proxy, Proxy):
        populate_target = (
            self._populate_target
            if populate_target is None
            else populate_target
        )
        return into_owned(possible_proxy, populate_target=populate_target)
    return possible_proxy

put

put(
    obj: Any,
    *,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    **kwargs: Any
) -> ConnectorKeyT

Put an object in the store.

Parameters:

  • obj (Any) –

    Object to put in the store.

  • serializer (SerializerT | None, default: None ) –

    Optionally override the default serializer for the store instance.

  • lifetime (Lifetime | None, default: None ) –

    Attach the key to this lifetime. The object associated with the key will be evicted when the lifetime ends.

  • kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to Connector.put().

Returns:

  • ConnectorKeyT

    A key which can be used to retrieve the object.

Raises:

  • TypeError

    If the output of serializer is not bytes.

Source code in proxystore/store/base.py
def put(
    self,
    obj: Any,
    *,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    **kwargs: Any,
) -> ConnectorKeyT:
    """Put an object in the store.

    Args:
        obj: Object to put in the store.
        serializer: Optionally override the default serializer for the
            store instance.
        lifetime: Attach the key to this lifetime. The object associated
            with the key will be evicted when the lifetime ends.
        kwargs: Additional keyword arguments to pass to
            [`Connector.put()`][proxystore.connectors.protocols.Connector.put].

    Returns:
        A key which can be used to retrieve the object.

    Raises:
        TypeError: If the output of `serializer` is not bytes.
    """
    timer = Timer().start()

    with Timer() as serialize_timer:
        if serializer is not None:
            obj = serializer(obj)
        else:
            obj = self.serializer(obj)

    if not isinstance(obj, bytes):
        raise TypeError('Serializer must produce bytes.')

    with self._lock:
        with Timer() as connector_timer:
            key = self.connector.put(obj, **kwargs)

    if lifetime is not None:
        lifetime.add_key(key, store=self)

    timer.stop()
    if self.metrics is not None:
        ctime = connector_timer.elapsed_ms
        stime = serialize_timer.elapsed_ms
        self.metrics.add_attribute('store.put.object_size', key, len(obj))
        self.metrics.add_time('store.put.serialize', key, stime)
        self.metrics.add_time('store.put.connector', key, ctime)
        self.metrics.add_time('store.put', key, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): PUT {key} in '
        f'{timer.elapsed_ms:.3f} ms',
    )
    return key

put_batch

put_batch(
    objs: Sequence[Any],
    *,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    **kwargs: Any
) -> list[ConnectorKeyT]

Put multiple objects in the store.

Parameters:

  • objs (Sequence[Any]) –

    Sequence of objects to put in the store.

  • serializer (SerializerT | None, default: None ) –

    Optionally override the default serializer for the store instance.

  • lifetime (Lifetime | None, default: None ) –

    Attach the keys to this lifetime. The objects associated with each key will be evicted when the lifetime ends.

  • kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to Connector.put_batch().

Returns:

Raises:

  • TypeError

    If the output of serializer is not bytes.

Source code in proxystore/store/base.py
def put_batch(
    self,
    objs: Sequence[Any],
    *,
    lifetime: Lifetime | None = None,
    serializer: SerializerT | None = None,
    **kwargs: Any,
) -> list[ConnectorKeyT]:
    """Put multiple objects in the store.

    Args:
        objs: Sequence of objects to put in the store.
        serializer: Optionally override the default serializer for the
            store instance.
        lifetime: Attach the keys to this lifetime. The objects associated
            with each key will be evicted when the lifetime ends.
        kwargs: Additional keyword arguments to pass to
            [`Connector.put_batch()`][proxystore.connectors.protocols.Connector.put_batch].

    Returns:
        A list of keys which can be used to retrieve the objects.

    Raises:
        TypeError: If the output of `serializer` is not bytes.
    """
    timer = Timer().start()

    def _serialize(obj: Any) -> bytes:
        if serializer is not None:
            obj = serializer(obj)
        else:
            obj = self.serializer(obj)

        if not isinstance(obj, bytes):
            raise TypeError('Serializer must produce bytes.')

        return obj

    with Timer() as serialize_timer:
        _objs = list(map(_serialize, objs))

    with self._lock:
        with Timer() as connector_timer:
            keys = self.connector.put_batch(_objs, **kwargs)

    if lifetime is not None:
        lifetime.add_key(*keys, store=self)

    timer.stop()
    if self.metrics is not None:
        ctime = connector_timer.elapsed_ms
        stime = serialize_timer.elapsed_ms
        sizes = sum(len(obj) for obj in _objs)
        self.metrics.add_attribute(
            'store.put_batch.object_sizes',
            keys,
            sizes,
        )
        self.metrics.add_time('store.put_batch.serialize', keys, stime)
        self.metrics.add_time('store.put_batch.connector', keys, ctime)
        self.metrics.add_time('store.put_batch', keys, timer.elapsed_ms)

    logger.debug(
        f'Store(name="{self.name}"): PUT_BATCH ({len(keys)} items) in '
        f'{timer.elapsed_ms:.3f} ms',
    )
    return keys

StoreConfig

Bases: BaseModel

Store configuration.

Tip

See the Store parameters for more information about each configuration option.

Attributes:

  • name (str) –

    Store name.

  • connector (ConnectorConfig) –

    Connector configuration.

  • serializer (Optional[SerializerT]) –

    Optional serializer.

  • deserializer (Optional[DeserializerT]) –

    Optional deserializer.

  • cache_size (int) –

    Cache size.

  • metrics (bool) –

    Enable recording operation metrics.

  • populate_target (bool) –

    Set the default value for the populate_target parameter of proxy methods.

  • auto_register (bool) –

    Auto-register the store.

from_toml classmethod

from_toml(filepath: str | Path) -> Self

Create a configuration file from a TOML file.

Example

See write_toml().

Parameters:

  • filepath (str | Path) –

    Path to TOML file to load.

Source code in proxystore/store/config.py
@classmethod
def from_toml(cls, filepath: str | pathlib.Path) -> Self:
    """Create a configuration file from a TOML file.

    Example:
        See
        [`write_toml()`][proxystore.store.config.StoreConfig.write_toml].

    Args:
        filepath: Path to TOML file to load.
    """
    with open(filepath, 'rb') as f:
        return load(cls, f)

write_toml

write_toml(filepath: str | Path) -> None

Write a configuration to a TOML file.

Example

from proxystore.store.config import ConnectorConfig
from proxystore.store.config import StoreConfig

config = StoreConfig(
    name='example',
    connector=ConnectorConfig(
        kind='file',
        options={'store_dir': '/tmp/proxystore-cache'},
    ),
)

config.write_toml('config.toml')
The resulting TOML file contains the full configuration, including default options, and can be loaded again using StoreConfig.from_toml('config.toml').
config.toml
name = "example"
cache_size = 16
metrics = false
populate_target = true
auto_register = false

[connector]
kind = "file"

[connector.options]
store_dir = "/tmp/proxystore-cache"

Parameters:

  • filepath (str | Path) –

    Path to TOML file to write.

Source code in proxystore/store/config.py
def write_toml(self, filepath: str | pathlib.Path) -> None:
    """Write a configuration to a TOML file.

    Example:
        ```python
        from proxystore.store.config import ConnectorConfig
        from proxystore.store.config import StoreConfig

        config = StoreConfig(
            name='example',
            connector=ConnectorConfig(
                kind='file',
                options={'store_dir': '/tmp/proxystore-cache'},
            ),
        )

        config.write_toml('config.toml')
        ```
        The resulting TOML file contains the full configuration,
        including default options, and can be loaded again
        using `#!python StoreConfig.from_toml('config.toml')`.
        ```toml title="config.toml"
        name = "example"
        cache_size = 16
        metrics = false
        populate_target = true
        auto_register = false

        [connector]
        kind = "file"

        [connector.options]
        store_dir = "/tmp/proxystore-cache"
        ```

    Args:
        filepath: Path to TOML file to write.
    """
    filepath = pathlib.Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)
    with open(filepath, 'wb') as f:
        dump(self, f)

StoreFactory

StoreFactory(
    key: ConnectorKeyT,
    store_config: StoreConfig,
    *,
    evict: bool = False,
    deserializer: DeserializerT | None = None
)

Bases: Generic[ConnectorT, T]

Factory that resolves an object from a store.

Adds support for asynchronously retrieving objects from a Store instance.

The factory takes the store_config parameter that is used to reinitialize the store if the factory is sent to a remote process where the store has not already been initialized.

Parameters:

  • key (ConnectorKeyT) –

    Key corresponding to object in store.

  • store_config (StoreConfig) –

    Store configuration used to reinitialize the store if needed.

  • evict (bool, default: False ) –

    If True, evict the object from the store once resolve() is called.

  • deserializer (DeserializerT | None, default: None ) –

    Optional callable used to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

Source code in proxystore/store/factory.py
def __init__(
    self,
    key: ConnectorKeyT,
    store_config: StoreConfig,
    *,
    evict: bool = False,
    deserializer: DeserializerT | None = None,
) -> None:
    self.key = key
    self.store_config = store_config
    self.evict = evict
    self.deserializer = deserializer

    # The following are not included when a factory is serialized
    # because they are specific to that instance of the factory
    self._obj_future: Future[T] | None = None

get_store

get_store() -> Store[ConnectorT]

Get store and reinitialize if necessary.

Source code in proxystore/store/factory.py
def get_store(self) -> Store[ConnectorT]:
    """Get store and reinitialize if necessary."""
    return proxystore.store.get_or_create_store(
        self.store_config,
        register=True,
    )

resolve

resolve() -> T

Get object associated with key from store.

Raises:

Source code in proxystore/store/factory.py
def resolve(self) -> T:
    """Get object associated with key from store.

    Raises:
        ProxyResolveMissingKeyError: If the key associated with this
            factory does not exist in the store.
    """
    with Timer() as timer:
        store = self.get_store()
        obj = store.get(
            self.key,
            deserializer=self.deserializer,
            default=_MISSING_OBJECT,
        )

        if obj is _MISSING_OBJECT:
            raise ProxyResolveMissingKeyError(
                self.key,
                type(store),
                store.name,
            )

        if self.evict:
            store.evict(self.key)

    if store.metrics is not None:
        total_time = timer.elapsed_ns
        store.metrics.add_time('factory.resolve', self.key, total_time)

    return cast(T, obj)

resolve_async

resolve_async() -> None

Asynchronously get object associated with key from store.

Source code in proxystore/store/factory.py
def resolve_async(self) -> None:
    """Asynchronously get object associated with key from store."""
    logger.debug(f'Starting asynchronous resolve of {self.key}')
    self._obj_future = _default_pool.submit(self.resolve)

get_store

get_store(val: str | Proxy[T]) -> Store[Any] | None

Get a registered store by name.

Parameters:

  • val (str | Proxy[T]) –

    name of the store to get or a Proxy instance.

Returns:

  • Store[Any] | None

    Store if a store matching the name or belonging to the proxy exists. If the store does not exist, returns None.

Raises:

Source code in proxystore/store/__init__.py
def get_store(val: str | Proxy[T]) -> Store[Any] | None:
    """Get a registered store by name.

    Args:
        val: name of the store to get or a [`Proxy`][proxystore.proxy.Proxy]
            instance.

    Returns:
        [`Store`][proxystore.store.base.Store] if a store matching the \
        name or belonging to the proxy exists. If the store does not exist, \
        returns `None`.

    Raises:
        ProxyStoreFactoryError: If the value is a proxy but does not contain a
            factory of type
            [`StoreFactory`][proxystore.store.base.StoreFactory].
    """
    if isinstance(val, Proxy):
        # If the object is a proxy, get the factory that will access the store
        factory = get_factory(val)
        if isinstance(factory, StoreFactory):
            return factory.get_store()
        else:
            raise ProxyStoreFactoryError(
                'The proxy must contain a factory with type '
                f'{StoreFactory.__name__}. {type(factory).__name__} '
                'is not supported.',
            )
    else:
        name = val

    with _stores_lock:
        if name in _stores:
            return _stores[name]
        return None

get_or_create_store

get_or_create_store(
    store_config: StoreConfig, *, register: bool = True
) -> Store[Any]

Get a registered store or initialize a new instance from the config.

Parameters:

  • store_config (StoreConfig) –

    Store configuration used to reinitialize the store if needed.

  • register (bool, default: True ) –

    Optionally register the store if a new instance was initialized.

Returns:

Source code in proxystore/store/__init__.py
def get_or_create_store(
    store_config: StoreConfig,
    *,
    register: bool = True,
) -> Store[Any]:
    """Get a registered store or initialize a new instance from the config.

    Args:
        store_config: Store configuration used to reinitialize the store if
            needed.
        register: Optionally register the store if a new instance was
            initialized.

    Returns:
        [`Store`][proxystore.store.base.Store] instance.
    """
    with _stores_lock:
        store = get_store(store_config.name)
        if store is None:
            store = Store.from_config(store_config)
            if register:
                # Set exists_ok here because the store may have initialized
                # itself if register=True.
                register_store(store, exist_ok=True)
        return store

register_store

register_store(
    store: Store[Any], exist_ok: bool = False
) -> None

Register the store instance to the global registry.

Note

Global means globally accessible within the Python process.

Tip

Use the store_registration context manager to automatically register and unregister as store.

Parameters:

  • store (Store[Any]) –

    Store instance to register.

  • exist_ok (bool, default: False ) –

    If a store with the same name exists, overwrite it.

Raises:

  • StoreExistsError

    If a store with the same name is already registered and exist_ok is false.

Source code in proxystore/store/__init__.py
def register_store(store: Store[Any], exist_ok: bool = False) -> None:
    """Register the store instance to the global registry.

    Note:
        Global means globally accessible within the Python process.

    Tip:
        Use the [`store_registration`][proxystore.store.store_registration]
        context manager to automatically register and unregister as store.

    Args:
        store: Store instance to register.
        exist_ok: If a store with the same name exists, overwrite it.

    Raises:
        StoreExistsError: If a store with the same name is already registered
            and `exist_ok` is false.
    """
    with _stores_lock:
        if store.name in _stores and not exist_ok:
            raise StoreExistsError(
                f'A store named "{store.name}" already exists.',
            )

        _stores[store.name] = store
        logger.info(f'Registered a store named "{store.name}"')

store_registration

store_registration(
    *stores: Store[Any], exist_ok: bool = False
) -> Generator[None, None, None]

Context manager that registers and unregisters a set of stores.

Example
from proxystore.connectors.local import LocalConnector
from proxystore.store import Store
from proxystore.store import store_registration

with Store('store', LocalConnector()) as store:
    with store_registration(store):
        ...

stores = [
    Store('store1', LocalConnector()),
    Store('store2', LocalConnector()),
]
with store_registration(*stores):
    ...

Parameters:

  • stores (Store[Any], default: () ) –

    Set of Store instances to register then unregister when the context manager is exited.

  • exist_ok (bool, default: False ) –

    If a store with the same name exists, overwrite it.

Raises:

  • StoreExistsError

    If a store with the same name is already registered and exist_ok is false.

Source code in proxystore/store/__init__.py
@contextlib.contextmanager
def store_registration(
    *stores: Store[Any],
    exist_ok: bool = False,
) -> Generator[None, None, None]:
    """Context manager that registers and unregisters a set of stores.

    Example:
        ```python
        from proxystore.connectors.local import LocalConnector
        from proxystore.store import Store
        from proxystore.store import store_registration

        with Store('store', LocalConnector()) as store:
            with store_registration(store):
                ...

        stores = [
            Store('store1', LocalConnector()),
            Store('store2', LocalConnector()),
        ]
        with store_registration(*stores):
            ...
        ```

    Args:
        stores: Set of [`Store`][proxystore.store.base.Store] instances to
            register then unregister when the context manager is exited.
        exist_ok: If a store with the same name exists, overwrite it.

    Raises:
        StoreExistsError: If a store with the same name is already registered
            and `exist_ok` is false.
    """
    for store in stores:
        register_store(store, exist_ok=exist_ok)

    yield

    for store in stores:
        unregister_store(store)

unregister_store

unregister_store(name_or_store: str | Store[Any]) -> None

Unregisters the store instance from the global registry.

Note

This function is a no-op if no store matching the name exists (i.e., no exception will be raised).

Parameters:

  • name_or_store (str | Store[Any]) –

    Name of the store to unregister or a store itself.

Source code in proxystore/store/__init__.py
def unregister_store(name_or_store: str | Store[Any]) -> None:
    """Unregisters the store instance from the global registry.

    Note:
        This function is a no-op if no store matching the name
        exists (i.e., no exception will be raised).

    Args:
        name_or_store: Name of the store to unregister or a store itself.
    """
    name = (
        name_or_store if isinstance(name_or_store, str) else name_or_store.name
    )
    with _stores_lock:
        if name in _stores:
            del _stores[name]
            logger.info(f'Unregistered a store named {name}')