Skip to content

proxystore.store.multi

MultiStore Implementation.

PolicyDict

Bases: TypedDict

JSON compatible representation of a Policy.

Policy dataclass

Policy that allows validating a set of constraints.

is_valid

is_valid(
    *,
    size: int | None = None,
    subset_tags: Iterable[str] | None = None,
    superset_tags: Iterable[str] | None = None
) -> bool

Check if set of contstraints is valid for this policy.

Note

All arguments are optional keyword arguments that default to None. If left as the default, that constraint will not be checked against the policy.

Parameters:

  • size (int | None) –

    Object size.

  • subset_tags (Iterable[str] | None) –

    Set of tags that must be a subset of the Policy's subset_tags to be valid.

  • superset_tags (Iterable[str] | None) –

    Set of tags that must be a superset of the Policy's superset_tags to be valid.

Returns:

  • bool

    If the provided constraints are valid for the policy.

Source code in proxystore/store/multi.py
def is_valid(
    self,
    *,
    size: int | None = None,
    subset_tags: Iterable[str] | None = None,
    superset_tags: Iterable[str] | None = None,
) -> bool:
    """Check if set of contstraints is valid for this policy.

    Note:
        All arguments are optional keyword arguments that default to
        `None`. If left as the default, that constraint will not be
        checked against the policy.

    Args:
        size: Object size.
        subset_tags: Set of tags that must be a subset
            of the Policy's `subset_tags` to be valid.
        superset_tags: Set of tags that must be a superset
            of the Policy's `superset_tags` to be valid.

    Returns:
        If the provided constraints are valid for the policy.
    """
    if size is not None and (size < self.min_size or size > self.max_size):
        return False
    if subset_tags is not None and not set(subset_tags).issubset(
        self.subset_tags,
    ):
        return False
    if superset_tags is not None and not set(superset_tags).issuperset(
        self.superset_tags,
    ):
        return False
    return True

as_dict

as_dict() -> PolicyDict

Convert the Policy to a JSON compatible dict.

Usage

policy = Policy(...) policy_dict = policy.as_dict() Policy(**policy_dict) == policy True

Source code in proxystore/store/multi.py
def as_dict(self) -> PolicyDict:
    """Convert the Policy to a JSON compatible dict.

    Usage:
        >>> policy = Policy(...)
        >>> policy_dict = policy.as_dict()
        >>> Policy(**policy_dict) == policy
        True
    """
    # We could use dataclasses.asdict(self) but this gives us the benefit
    # of typing on the return dict.
    return PolicyDict(
        priority=self.priority,
        min_size=self.min_size,
        max_size=self.max_size,
        subset_tags=self.subset_tags,
        superset_tags=self.superset_tags,
    )

MultiStoreKey

Bases: NamedTuple

Key to objects in a MultiStore.

MultiStore

MultiStore(
    name: str,
    *,
    stores: dict[str, Policy]
    | dict[Store[Any], Policy]
    | Sequence[_StorePolicyArgs],
    cache_size: int = 0,
    stats: bool = False
) -> None

Bases: Store[MultiStoreKey]

Policy based manager for a collection of Store.

Note

This store does not implement get_bytes() or set_bytes() because MultiStore.get() and MultiStore.set() forward operations to the corresponding store.

Warning

MultiStore.close() will call Store.close() on all the stores managed by the instance and unregister them.

Parameters:

  • name (str) –

    Name of this store instance.

  • stores (dict[str, Policy] | dict[Store[Any], Policy] | Sequence[_StorePolicyArgs]) –

    Mapping of stores (either Store instances or string names of registered stores) to the corresponding Policy. If Store instances are passed, the instances will be registered.

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    collect stats on store operations.

Source code in proxystore/store/multi.py
def __init__(
    self,
    name: str,
    *,
    stores: dict[str, Policy]
    | dict[Store[Any], Policy]
    | Sequence[_StorePolicyArgs],
    cache_size: int = 0,
    stats: bool = False,
) -> None:
    # Cache and stats are controlled by the wrapped Stores.
    super().__init__(
        name,
        cache_size=0,
        stats=False,
        # We override the kwargs property so no need to pass here
        kwargs={},
    )

    self._stores: dict[str, _StorePolicy] = {}

    if isinstance(stores, dict):
        for store, policy in stores.items():
            if isinstance(store, str):
                possible_store = get_store(store)
                if possible_store is None:
                    raise RuntimeError(
                        f'A store named "{store}" is not registered.',
                    )
                actual_store = possible_store
            else:
                actual_store = store

            self._stores[actual_store.name] = _StorePolicy(
                store=actual_store,
                policy=policy,
            )
    elif isinstance(stores, Sequence):
        for store_args in stores:
            possible_store = get_store(store_args['name'])
            if possible_store is None:
                actual_store = store_args['kind'](
                    store_args['name'],
                    **store_args['kwargs'],
                )
            else:
                actual_store = possible_store
            policy = store_args['policy']

            self._stores[actual_store.name] = _StorePolicy(
                store=actual_store,
                policy=policy,
            )
    else:
        raise AssertionError('Unreachable.')

    # Register so multiple instances of `MultiStore` in a process
    # use the same underlying stores for caching/efficiency.
    for store, _ in self._stores.values():
        register_store(store, exist_ok=True)

    self._stores_by_priority = sorted(
        self._stores,
        key=lambda name: self._stores[name].policy.priority,
        reverse=True,
    )

proxy

proxy(
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
    **kwargs: Any
) -> Proxy[T]

Create a proxy that will resolve to an object in the store.

Warning

If the factory requires reinstantiating the store to correctly resolve the object, the factory should reinstantiate the store with the same arguments used to instantiate the store that created the proxy/factory. I.e. the proxy() method should pass any arguments given to Store along to the factory so the factory can correctly recreate the store if the factory is resolved in a different Python process.

Parameters:

  • obj (T) –

    Object to place in store and return proxy for.

  • serializer (SerializerT | None) –

    optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • deserializer (DeserializerT | None) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • subset_tags (Iterable[str]) –

    Iterable of tags that must be a subset of a store policy's subset_tags to match that store.

  • superset_tags (Iterable[str]) –

    Iterable of tags that must be a superset of a store policy's superset_tags to match that store.

  • kwargs (Any) –

    Additional arguments to pass to the factory.

Returns:

  • Proxy[T]

    A proxy of the object.

Source code in proxystore/store/multi.py
def proxy(
    self,
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
    **kwargs: Any,
) -> Proxy[T]:
    """Create a proxy that will resolve to an object in the store.

    Warning:
        If the factory requires reinstantiating the store to correctly
        resolve the object, the factory should reinstantiate the store
        with the same arguments used to instantiate the store that
        created the proxy/factory. I.e. the
        [`proxy()`][proxystore.store.multi.MultiStore.proxy] method
        should pass any arguments given to
        [`Store`][proxystore.store.base.Store]
        along to the factory so the factory can correctly recreate the
        store if the factory is resolved in a different Python process.

    Args:
        obj: Object to place in store and return proxy for.
        serializer: optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        deserializer: Optional callable used by the factory
            to deserialize the byte string. If `None`, the default
            deserializer ([`deserialize()`][proxystore.serialize.deserialize])
            will be used.
        subset_tags: Iterable of tags that must be a subset
            of a store policy's `subset_tags` to match that store.
        superset_tags: Iterable of tags that must be a
            superset of a store policy's `superset_tags` to match that
            store.
        kwargs: Additional arguments to pass to the factory.

    Returns:
        A proxy of the object.
    """
    key = self.set(
        obj,
        serializer=serializer,
        subset_tags=subset_tags,
        superset_tags=superset_tags,
    )
    return self.proxy_from_key(key, deserializer=deserializer, **kwargs)

proxy_batch

proxy_batch(
    objs: Sequence[T],
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
    **kwargs: Any
) -> list[Proxy[T]]

Create proxies for batch of objects in the store.

See Store.proxy() for more details.

Parameters:

  • objs (Sequence[object]) –

    Objects to place in store and return proxies for.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • deserializer (DeserializerT | None) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • subset_tags (Iterable[str]) –

    Iterable of tags that must be a subset of a store policy's subset_tags to match that store.

  • superset_tags (Iterable[str]) –

    Iterable of tags that must be a superset of a store policy's superset_tags to match that store.

  • kwargs (Any) –

    Additional arguments to pass to the Factory.

Returns:

  • list[Proxy[T]]

    A list of proxies of the objects.

Source code in proxystore/store/multi.py
def proxy_batch(
    self,
    objs: Sequence[T],
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
    **kwargs: Any,
) -> list[Proxy[T]]:
    """Create proxies for batch of objects in the store.

    See [`Store.proxy()`][proxystore.store.base.Store.proxy] for more
    details.

    Args:
        objs (Sequence[object]): Objects to place in store and return
            proxies for.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        deserializer: Optional callable used by the factory
            to deserialize the byte string. If `None`, the default
            deserializer
            ([`deserialize()`][proxystore.serialize.deserialize]) will be
            used.
        subset_tags: Iterable of tags that must be a subset
            of a store policy's `subset_tags` to match that store.
        superset_tags: Iterable of tags that must be a
            superset of a store policy's `superset_tags` to match that
            store.
        kwargs: Additional arguments to pass to the Factory.

    Returns:
        A list of proxies of the objects.
    """
    keys = self.set_batch(
        objs,
        serializer=serializer,
        subset_tags=subset_tags,
        superset_tags=superset_tags,
    )
    return [
        self.proxy_from_key(key, deserializer=deserializer, **kwargs)
        for key in keys
    ]

locked_proxy

locked_proxy(
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
    **kwargs: Any
) -> ProxyLocker[T]

Create a proxy locker that will prevent resolution.

Parameters:

  • obj (T) –

    Object to place in store and create proxy of.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • deserializer (DeserializerT | None) –

    Optional callable used by the factory to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • subset_tags (Iterable[str]) –

    Iterable of tags that must be a subset of a store policy's subset_tags to match that store.

  • superset_tags (Iterable[str]) –

    Iterable of tags that must be a superset of a store policy's superset_tags to match that store.

  • kwargs (Any) –

    Additional arguments to pass to the Factory.

Returns:

Source code in proxystore/store/multi.py
def locked_proxy(
    self,
    obj: T,
    serializer: SerializerT | None = None,
    deserializer: DeserializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
    **kwargs: Any,
) -> ProxyLocker[T]:
    """Create a proxy locker that will prevent resolution.

    Args:
        obj: Object to place in store and create proxy of.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        deserializer: Optional callable used by the factory
            to deserialize the byte string. If `None`, the default
            deserializer
            ([`deserialize()`][proxystore.serialize.deserialize]) will be
            used.
        subset_tags: Iterable of tags that must be a subset
            of a store policy's `subset_tags` to match that store.
        superset_tags: Iterable of tags that must be a
            superset of a store policy's `superset_tags` to match that
            store.
        kwargs: Additional arguments to pass to the Factory.

    Returns:
        A proxy wrapped in a [`ProxyLocker`][proxystore.proxy.ProxyLocker].
    """
    return ProxyLocker(
        self.proxy(
            obj,
            serializer=serializer,
            deserializer=deserializer,
            subset_tags=subset_tags,
            superset_tags=superset_tags,
            **kwargs,
        ),
    )

set

set(
    obj: Any,
    *,
    serializer: SerializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = ()
) -> MultiStoreKey

Set key-object pair in store.

Parameters:

  • obj (Any) –

    Object to be placed in the store.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • subset_tags (Iterable[str]) –

    Iterable of tags that must be a subset of a store policy's subset_tags to match that store.

  • superset_tags (Iterable[str]) –

    Iterable of tags that must be a superset of a store policy's superset_tags to match that store.

Returns:

  • MultiStoreKey

    A key that can be used to retrieve the object.

Raises:

  • TypeError

    If the output of serializer is not bytes.

Source code in proxystore/store/multi.py
def set(
    self,
    obj: Any,
    *,
    serializer: SerializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
) -> MultiStoreKey:
    """Set key-object pair in store.

    Args:
        obj: Object to be placed in the store.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        subset_tags: Iterable of tags that must be a subset
            of a store policy's `subset_tags` to match that store.
        superset_tags: Iterable of tags that must be a
            superset of a store policy's `superset_tags` to match that
            store.

    Returns:
        A key that can be used to retrieve the object.

    Raises:
        TypeError: If the output of `serializer` is not bytes.
    """
    if serializer is not None:
        obj = serializer(obj)
    else:
        obj = default_serializer(obj)

    if not isinstance(obj, bytes):
        raise TypeError('Serializer must produce bytes.')

    for store_name in self._stores_by_priority:
        store, policy = self._stores[store_name]
        if policy.is_valid(
            size=len(obj),
            subset_tags=subset_tags,
            superset_tags=superset_tags,
        ):
            # We already serialized object so pass identity
            # function to avoid duplicate serialization
            key = store.set(obj, serializer=_identity)
            return MultiStoreKey(
                store_name=store.name,
                store_key=key,
            )
    else:
        raise ValueError(
            'No store policy was suitable for the constraints: '
            f'subset_tags={subset_tags}, superset_tags={superset_tags}.',
        )

set_batch

set_batch(
    objs: Sequence[Any],
    *,
    serializer: SerializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = ()
) -> list[MultiStoreKey]

Set objects in store.

Parameters:

  • objs (Sequence[Any]) –

    An iterable of objects to be placed in the store.

  • serializer (SerializerT | None) –

    Optional callable which serializes the object. If None, the default serializer (serialize()) will be used.

  • subset_tags (Iterable[str]) –

    Iterable of tags that must be a subset of a store policy's subset_tags to match that store.

  • superset_tags (Iterable[str]) –

    Iterable of tags that must be a superset of a store policy's superset_tags to match that store.

Returns:

Raises:

  • TypeError

    If the output of serializer is not bytes.

Source code in proxystore/store/multi.py
def set_batch(
    self,
    objs: Sequence[Any],
    *,
    serializer: SerializerT | None = None,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
) -> list[MultiStoreKey]:
    """Set objects in store.

    Args:
        objs: An iterable of objects to be placed in the store.
        serializer: Optional callable which serializes the
            object. If `None`, the default serializer
            ([`serialize()`][proxystore.serialize.serialize]) will be used.
        subset_tags: Iterable of tags that must be a subset
            of a store policy's `subset_tags` to match that store.
        superset_tags: Iterable of tags that must be a
            superset of a store policy's `superset_tags` to match that
            store.

    Returns:
        List of keys that can be used to retrieve the objects.

    Raises:
        TypeError: If the output of `serializer` is not bytes.
    """
    return [
        self.set(
            obj,
            serializer=serializer,
            subset_tags=subset_tags,
            superset_tags=superset_tags,
        )
        for obj in objs
    ]