Skip to content

proxystore.store.base

Base Store Abstract Class.

SerializerT module-attribute

SerializerT = Callable[[Any], bytes]

Serializer type alias.

DeserializerT module-attribute

DeserializerT = Callable[[bytes], Any]

Deserializer type alias.

StoreFactory

StoreFactory(
    key: KeyT,
    store_type: type[Store[KeyT]],
    store_name: str,
    store_kwargs: dict[str, Any] | None = None,
    *,
    evict: bool = False,
    deserializer: DeserializerT | None = None
) -> None

Bases: Factory[T], Generic[KeyT, T]

Base Factory for Stores.

Adds support for asynchronously retrieving objects from a Store.

The factory takes the store_type and store_kwargs parameters that are used to reinitialize the store if the factory is sent to a remote process where the store has not already been initialized.

Parameters:

  • key (KeyT) –

    Key corresponding to object in store.

  • store_type (type[Store[KeyT]]) –

    Type of store this factory will resolve an object from.

  • store_name (str) –

    Name of store.

  • store_kwargs (dict[str, Any] | None) –

    Optional keyword arguments used to reinitialize store.

  • evict (bool) –

    If True, evict the object from the store once resolve() is called.

  • deserializer (DeserializerT | None) –

    Optional callable used to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

Source code in proxystore/store/base.py
def __init__(
    self,
    key: KeyT,
    store_type: type[Store[KeyT]],
    store_name: str,
    store_kwargs: dict[str, Any] | None = None,
    *,
    evict: bool = False,
    deserializer: DeserializerT | None = None,
) -> None:
    self.key = key
    self.store_type = store_type
    self.store_name = store_name
    self.store_kwargs = {} if store_kwargs is None else store_kwargs
    self.evict = evict
    self.deserializer = deserializer

    # The following are not included when a factory is serialized
    # because they are specific to that instance of the factory
    self._obj_future: Future[T] | None = None
    self.stats: FunctionEventStats | None = None
    if 'stats' in self.store_kwargs and self.store_kwargs['stats'] is True:
        self.stats = FunctionEventStats()
        # Monkeypatch methods with wrappers to track their stats
        setattr(  # noqa: B010
            self,
            'resolve',
            self.stats.wrap(self.resolve, preset_key=self.key),
        )
        setattr(  # noqa: B010
            self,
            'resolve_async',
            self.stats.wrap(self.resolve_async, preset_key=self.key),
        )

get_store

get_store() -> Store[KeyT]

Get store and reinitialize if necessary.

Raises:

  • ValueError

    If the type of the returned store does not match the expected store type passed to the factory constructor.

Source code in proxystore/store/base.py
def get_store(self) -> Store[KeyT]:
    """Get store and reinitialize if necessary.

    Raises:
        ValueError: If the type of the returned store does not match the
            expected store type passed to the factory constructor.
    """
    store = ps.store.get_store(self.store_name)
    if store is None:
        store = self.store_type(self.store_name, **self.store_kwargs)
        ps.store.register_store(store)

    if not isinstance(store, self.store_type):
        raise ValueError(
            f'store_name={self.store_name} passed to '
            f'{type(self).__name__} does not correspond to store of '
            f'type {self.store_type.__name__}',
        )

    return store

resolve

resolve() -> T

Get object associated with key from store.

Raises:

Source code in proxystore/store/base.py
def resolve(self) -> T:
    """Get object associated with key from store.

    Raises:
        ProxyResolveMissingKeyError: If the key associated with this
            factory does not exist in the store.
    """
    if self._obj_future is not None:
        obj = self._obj_future.result()
        self._obj_future = None
        return obj

    return self._get_value()

resolve_async

resolve_async() -> None

Asynchronously get object associated with key from store.

Source code in proxystore/store/base.py
def resolve_async(self) -> None:
    """Asynchronously get object associated with key from store."""
    if self._should_resolve_async():
        self._obj_future = _default_pool.submit(self._get_value)

Store

Store(
    name: str,
    *,
    cache_size: int = 16,
    stats: bool = False,
    kwargs: dict[str, Any] | None
) -> None

Bases: Generic[KeyT]

Key-value store interface.

Provides base functionality for interaction with an object store including serialization and caching.

Subclasses of Store must implement create_key(), evict(), exists(), get_bytes(), and set_bytes(). Subclasses may implement close() if needed.

The Store handles caching and stores all objects as key-bytestring pairs, i.e., objects passed to get() or set() will be appropriately (de)serialized before being passed to get_bytes() or set_bytes(), respectively.

Parameters:

  • name (str) –

    Name of the store instance.

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    Collect stats on store operations.

  • kwargs (dict[str, Any] | None) –

    Additional keyword arguments to return from Store.kwargs. I.e., the additional keyword arguments needed to reinitialize this store.

Raises:

Source code in proxystore/store/base.py
def __init__(
    self,
    name: str,
    *,
    cache_size: int = 16,
    stats: bool = False,
    kwargs: dict[str, Any] | None,
) -> None:
    if cache_size < 0:
        raise ValueError(
            f'Cache size cannot be negative. Got {cache_size}.',
        )

    self.name = name

    self._cache: LRUCache[KeyT, Any] = LRUCache(cache_size)
    self._kwargs = {'stats': stats, 'cache_size': cache_size}
    if kwargs is not None:  # pragma: no branch
        self._kwargs.update(kwargs)

    self._stats: FunctionEventStats | None = None
    if stats:
        self._stats = FunctionEventStats()
        # Monkeypatch methods with wrappers to track their stats
        for attr in dir(self):
            if (
                callable(getattr(self, attr))
                and not attr.startswith('_')
                and attr in STORE_METHOD_KEY_IS_RESULT
            ):
                method = getattr(self, attr)
                # For most method, the key is the first arg which wrap()
                # expects by default, but there are a couple where the
                # key is passed as a kwarg
                wrapped = self._stats.wrap(
                    method,
                    key_is_result=STORE_METHOD_KEY_IS_RESULT[attr],
                )
                setattr(self, attr, wrapped)

    logger.debug(f'initialized {self}')

has_stats property

has_stats: bool

Whether the store keeps track of performance stats.

kwargs property

kwargs: dict[str, Any]

Kwargs for this store instance.

close

close() -> None

Cleanup any objects associated with the store.

Many Store types do not have any objects that requiring cleaning up so this method a no-op by default unless overridden.

Warning

This method should only be called at the end of the program when the store will no longer be used, for example once all proxies have been resolved.

Source code in proxystore/store/base.py
def close(self) -> None:
    """Cleanup any objects associated with the store.

    Many [`Store`][proxystore.store.base.Store] types do not have any
    objects that requiring cleaning up so this method a no-op by default
    unless overridden.

    Warning:
        This method should only be called at the end of the program
        when the store will no longer be used, for example once all
        proxies have been resolved.
    """
    pass

create_key

create_key(obj: Any) -> KeyT

Create key for the object.

Parameters:

  • obj (Any) –

    Object to be placed in store.

Returns:

  • KeyT

    A key.

Source code in proxystore/store/base.py
def create_key(self, obj: Any) -> KeyT:
    """Create key for the object.

    Args:
        obj: Object to be placed in store.

    Returns:
        A key.
    """
    raise NotImplementedError

evict abstractmethod

evict(key: KeyT) -> None

Evict object associated with key.

Parameters:

  • key (KeyT) –

    The key corresponding to object in store to evict.

Source code in proxystore/store/base.py
@abstractmethod
def evict(self, key: KeyT) -> None:
    """Evict object associated with key.

    Args:
        key: The key corresponding to object in store to evict.
    """
    raise NotImplementedError

exists abstractmethod

exists(key: KeyT) -> bool

Check if key exists.

Parameters:

  • key (KeyT) –

    The key to check.

Returns:

  • bool

    If the key exists in the store.

Source code in proxystore/store/base.py
@abstractmethod
def exists(self, key: KeyT) -> bool:
    """Check if key exists.

    Args:
        key: The key to check.

    Returns:
        If the key exists in the store.
    """
    raise NotImplementedError

get

get(
    key: KeyT,
    *,
    deserializer: DeserializerT | None = None,
    default: object | None = None
) -> Any | None

Return object associated with key.

Parameters:

  • key (KeyT) –

    The key corresponding to object.

  • deserializer (DeserializerT | None) –

    Optional callable used to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • default (object | None) –

    Optionally provide value to be returned if an object associated with the key does not exist.

Returns:

  • Any | None

    The object associated with key or default if key does not exist.

Source code in proxystore/store/base.py
def get(
    self,
    key: KeyT,
    *,
    deserializer: DeserializerT | None = None,
    default: object | None = None,
) -> Any | None:
    """Return object associated with key.

    Args:
        key: The key corresponding to object.
        deserializer: Optional callable used to deserialize the
            byte string. If `None`, the default deserializer
            ([`deserialize()`][proxystore.serialize.deserialize]) will be
            used.
        default: Optionally provide value to be returned if an object
            associated with the key does not exist.

    Returns:
        The object associated with key or `default` if key does not exist.
    """
    if self.is_cached(key):
        value = self._cache.get(key)
        logger.debug(
            f"GET key='{key}' FROM {self.__class__.__name__}"
            f"(name='{self.name}'): was_cached=True",
        )
        return value

    value = self.get_bytes(key)
    if value is not None:
        if deserializer is not None:
            value = deserializer(value)
        else:
            value = default_deserializer(value)
        self._cache.set(key, value)
        logger.debug(
            f"GET key='{key}' FROM {self.__class__.__name__}"
            f"(name='{self.name}'): was_cached=False",
        )
        return value

    logger.debug(
        f"GET key='{key}' FROM {self.__class__.__name__}"
        f"(name='{self.name}'): key did not exist, returned default",
    )
    return default

get_bytes abstractmethod

get_bytes(key: KeyT) -> bytes | None

Get serialized object from remote store.

Parameters:

  • key (KeyT) –

    The key corresponding to the object.

Returns:

  • bytes | None

    The serialized object or None if it does not exist.

Source code in proxystore/store/base.py
@abstractmethod
def get_bytes(self, key: KeyT) -> bytes | None:
    """Get serialized object from remote store.

    Args:
        key: The key corresponding to the object.

    Returns:
        The serialized object or `None` if it does not exist.
    """
    raise NotImplementedError

is_cached

is_cached(key: KeyT) -> bool

Check if object is cached locally.

Parameters:

  • key (KeyT) –

    The key corresponding to the object.

Returns:

  • bool

    If the object associated with the key is cached.

Source code in proxystore/store/base.py
def is_cached(self, key: KeyT) -> bool:
    """Check if object is cached locally.

    Args:
        key: The key corresponding to the object.

    Returns:
        If the object associated with the key is cached.
    """
    return self._cache.exists(key)

proxy

proxy(
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    **kwargs: Any
) -> Proxy[T]

Create a proxy that will resolve to an object in the store.

Warning

If the factory requires reinstantiating the store to correctly resolve the object, the factory should reinstantiate the store with the same arguments used to instantiate the store that created the proxy/factory. I.e. the :func:proxy() function should pass any arguments given to :func:Store.__init__() along to the factory so the factory can correctly recreate the store if the factory is resolved in a different Python process.

Parameters:

  • obj (T) –

    The object to place in store and return proxy for.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • deserializer (DeserializerT | None) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • kwargs (Any) –

    Additional arguments to pass to the Factory.

Returns:

  • Proxy[T]

    A proxy of the object.

Source code in proxystore/store/base.py
def proxy(
    self,
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    **kwargs: Any,
) -> Proxy[T]:
    """Create a proxy that will resolve to an object in the store.

    Warning:
        If the factory requires reinstantiating the store to correctly
        resolve the object, the factory should reinstantiate the store
        with the same arguments used to instantiate the store that
        created the proxy/factory. I.e. the :func:`proxy()` function
        should pass any arguments given to :func:`Store.__init__()`
        along to the factory so the factory can correctly recreate the
        store if the factory is resolved in a different Python process.

    Args:
        obj: The object to place in store and return proxy for.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        deserializer: Optional callable used by the factory
            to deserialize the byte string. If `None`, the default
            deserializer
            ([`deserialize()`][proxystore.serialize.deserialize]) will be
            used.
        kwargs: Additional arguments to pass to the Factory.

    Returns:
        A proxy of the object.
    """
    key = self.set(obj, serializer=serializer)
    logger.debug(
        f"PROXY key='{key}' FROM {self.__class__.__name__}"
        f"(name='{self.name}')",
    )
    factory: StoreFactory[KeyT, T] = StoreFactory(
        key,
        store_type=type(self),
        store_name=self.name,
        store_kwargs=self.kwargs,
        deserializer=deserializer,
        **kwargs,
    )
    return Proxy(factory)

proxy_batch

proxy_batch(
    objs: Sequence[T],
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    **kwargs: Any
) -> list[Proxy[T]]

Create proxies for batch of objects in the store.

See Store.proxy() for more details.

Parameters:

  • objs (Sequence[T]) –

    The objects to place in store and return proxies for.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • deserializer (DeserializerT | None) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • kwargs (Any) –

    additional arguments to pass to the Factory.

Returns:

  • list[Proxy[T]]

    A list of proxies of the objects.

Source code in proxystore/store/base.py
def proxy_batch(
    self,
    objs: Sequence[T],
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    **kwargs: Any,
) -> list[Proxy[T]]:
    """Create proxies for batch of objects in the store.

    See [`Store.proxy()`][proxystore.store.base.Store.proxy] for more
    details.

    Args:
        objs: The objects to place in store and return proxies for.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        deserializer: Optional callable used by the factory
            to deserialize the byte string. If `None`, the default
            deserializer
            ([`deserialize()`][proxystore.serialize.deserialize]) will be
            used.
        kwargs: additional arguments to pass to the Factory.

    Returns:
        A list of proxies of the objects.
    """
    keys = self.set_batch(objs, serializer=serializer)
    return [
        self.proxy_from_key(key, deserializer=deserializer, **kwargs)
        for key in keys
    ]

proxy_from_key

proxy_from_key(
    key: KeyT,
    deserializer: DeserializerT | None = None,
    **kwargs: Any
) -> Proxy[T]

Create a proxy to an object already in the store.

Note

This method will not verify that the key is valid so an error will not be raised until the returned proxy is resolved.

Parameters:

  • key (KeyT) –

    The key corresponding to an object already in the store that will be the target object of the returned proxy.

  • deserializer (DeserializerT | None) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • kwargs (Any) –

    Additional arguments to pass to the Factory.

Returns:

Source code in proxystore/store/base.py
def proxy_from_key(
    self,
    key: KeyT,
    deserializer: DeserializerT | None = None,
    **kwargs: Any,
) -> Proxy[T]:
    """Create a proxy to an object already in the store.

    Note:
        This method will not verify that the key is valid so an error
        will not be raised until the returned proxy is resolved.

    Args:
        key: The key corresponding to an object already in the store
            that will be the target object of the returned proxy.
        deserializer: Optional callable used by the factory
            to deserialize the byte string. If `None`, the default
            deserializer
            ([`deserialize()`][proxystore.serialize.deserialize]) will be
            used.
        kwargs: Additional arguments to pass to the Factory.

    Returns:
        A proxy.
    """
    logger.debug(
        f"PROXY key='{key}' FROM {self.__class__.__name__}"
        f"(name='{self.name}')",
    )
    factory: StoreFactory[KeyT, T] = StoreFactory(
        key,
        store_type=type(self),
        store_name=self.name,
        store_kwargs=self.kwargs,
        deserializer=deserializer,
        **kwargs,
    )
    return Proxy(factory)

locked_proxy

locked_proxy(
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    **kwargs: Any
) -> ProxyLocker[T]

Create a proxy locker that will prevent resolution.

Parameters:

  • obj (T) –

    The object to place in store and create proxy of.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • deserializer (DeserializerT | None) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • kwargs (Any) –

    Additional arguments to pass to the Factory.

Returns:

Source code in proxystore/store/base.py
def locked_proxy(
    self,
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    **kwargs: Any,
) -> ProxyLocker[T]:
    """Create a proxy locker that will prevent resolution.

    Args:
        obj: The object to place in store and create proxy of.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        deserializer: Optional callable used by the factory
            to deserialize the byte string. If `None`, the default
            deserializer
            ([`deserialize()`][proxystore.serialize.deserialize]) will be
            used.
        kwargs: Additional arguments to pass to the Factory.

    Returns:
        A proxy wrapped in a [`ProxyLocker`][proxystore.proxy.ProxyLocker].
    """
    return ProxyLocker(
        self.proxy(
            obj,
            serializer=serializer,
            deserializer=deserializer,
            **kwargs,
        ),
    )

set

set(
    obj: Any, *, serializer: SerializerT | None = None
) -> KeyT

Set key-object pair in store.

Parameters:

  • obj (Any) –

    The object to be placed in the store.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

Returns:

  • KeyT

    A key that can be used to retrieve the object.

Raises:

  • TypeError

    If the output of serializer is not bytes.

Source code in proxystore/store/base.py
def set(
    self,
    obj: Any,
    *,
    serializer: SerializerT | None = None,
) -> KeyT:
    """Set key-object pair in store.

    Args:
        obj: The object to be placed in the store.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.

    Returns:
        A key that can be used to retrieve the object.

    Raises:
        TypeError: If the output of `serializer` is not bytes.
    """
    if serializer is not None:
        obj = serializer(obj)
    else:
        obj = default_serializer(obj)

    if not isinstance(obj, bytes):
        raise TypeError('Serializer must produce bytes.')

    key = self.create_key(obj)
    self.set_bytes(key, obj)

    logger.debug(
        f"SET key='{key}' IN {self.__class__.__name__}"
        f"(name='{self.name}')",
    )
    return key

set_batch

set_batch(
    objs: Sequence[Any],
    *,
    serializer: SerializerT | None = None
) -> list[KeyT]

Set objects in store.

Parameters:

  • objs (Sequence[Any]) –

    Iterable of objects to be placed in the store.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

Returns:

  • list[KeyT]

    List of keys that can be used to retrieve the objects.

Raises:

  • TypeError

    If the output of serializer is not bytes.

Source code in proxystore/store/base.py
def set_batch(
    self,
    objs: Sequence[Any],
    *,
    serializer: SerializerT | None = None,
) -> list[KeyT]:
    """Set objects in store.

    Args:
        objs: Iterable of objects to be placed in the store.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.

    Returns:
        List of keys that can be used to retrieve the objects.

    Raises:
        TypeError: If the output of `serializer` is not bytes.
    """
    return [self.set(obj, serializer=serializer) for obj in objs]

set_bytes abstractmethod

set_bytes(key: KeyT, data: bytes) -> None

Set serialized object in remote store with key.

Parameters:

  • key (KeyT) –

    The key corresponding to the object.

  • data (bytes) –

    The serialized object.

Source code in proxystore/store/base.py
@abstractmethod
def set_bytes(self, key: KeyT, data: bytes) -> None:
    """Set serialized object in remote store with key.

    Args:
        key: The key corresponding to the object.
        data: The serialized object.
    """
    raise NotImplementedError

stats

stats(
    key_or_proxy: KeyT | Proxy[T],
) -> dict[str, TimeStats]

Get stats on the store.

Parameters:

  • key_or_proxy (KeyT | Proxy[T]) –

    A key to get stats for or a proxy to extract the key from.

Returns:

  • dict[str, TimeStats]

    A dict with keys corresponding to method names and values which are TimeStats instances with the statistics for calls to the corresponding method with the specified key.

Example
{
    "get": TimeStats(
        calls=32,
        avg_time_ms=0.0123,
        min_time_ms=0.0012,
        max_time_ms=0.1234,
    ),
    "set": TimeStats(...),
    "evict": TimeStats(...),
    ...
}

Raises:

  • ValueError

    If self was initialized with stats=False.

Source code in proxystore/store/base.py
def stats(self, key_or_proxy: KeyT | Proxy[T]) -> dict[str, TimeStats]:
    """Get stats on the store.

    Args:
        key_or_proxy: A key to get stats for or a proxy to extract the key
            from.

    Returns:
        A dict with keys corresponding to method names and values which \
        are [`TimeStats`][proxystore.store.stats.TimeStats] instances \
        with the statistics for calls to the corresponding method with \
        the specified key.

    Example:
        ```python
        {
            "get": TimeStats(
                calls=32,
                avg_time_ms=0.0123,
                min_time_ms=0.0012,
                max_time_ms=0.1234,
            ),
            "set": TimeStats(...),
            "evict": TimeStats(...),
            ...
        }
        ```

    Raises:
        ValueError: If `self` was initialized with `#!python stats=False`.
    """
    if self._stats is None:
        raise ValueError(
            'Stats are not being tracked because this store was '
            'initialized with stats=False.',
        )
    stats = {}
    if isinstance(key_or_proxy, ps.proxy.Proxy):
        key = get_key(key_or_proxy)
        # Merge stats from the proxy into self
        if hasattr(key_or_proxy.__factory__, 'stats'):
            proxy_stats = key_or_proxy.__factory__.stats
            if proxy_stats is not None:
                for event in proxy_stats:
                    stats[event.function] = copy.copy(proxy_stats[event])
    else:
        key = key_or_proxy

    for event in list(self._stats.keys()):
        if event.key == key:
            stats[event.function] = copy.copy(self._stats[event])
    return stats