Skip to content

proxystore.connectors.multi

Multi-connector implementation.

ConnectorPolicyConfig module-attribute

ConnectorPolicyConfig = Tuple[
    str, Dict[str, Any], PolicyDict
]

Type of the configuration for a connector and policy pair.

Element zero is the fully qualified path of the connector type, element one is the connector's configuration dictionary, and element two is the policy in dictionary form.

PolicyDict

Bases: TypedDict

JSON compatible representation of a Policy.

Policy dataclass

Policy(
    priority: int = 0,
    host_pattern: Iterable[str] | str | None = None,
    min_size_bytes: int = 0,
    max_size_bytes: int = sys.maxsize,
    subset_tags: list[str] = list(),
    superset_tags: list[str] = list(),
)

Policy that allows validating a set of constraints.

Attributes:

  • priority (int) –

    Priority for breaking ties between policies (higher is preferred).

  • host_pattern (Iterable[str] | str | None) –

    Pattern or iterable of patterns of valid hostnames. The hostname returned by hostname() is matched against host_pattern using re.fullmatch(). If host_pattern is an iterable, at least one of the patterns must match the hostname.

  • min_size_bytes (int) –

    Minimum size in bytes allowed.

  • max_size_bytes (int) –

    Maximum size in bytes allowed.

  • subset_tags (list[str]) –

    Subset tags. See is_valid() for more details.

  • superset_tags (list[str]) –

    Superset tags. See is_valid() for more details.

is_valid()

is_valid(
    *,
    size_bytes: int | None = None,
    subset_tags: Iterable[str] | None = None,
    superset_tags: Iterable[str] | None = None
) -> bool

Check if set of constraints is valid for this policy.

Note

All arguments are optional keyword arguments that default to None. If left as the default, that constraint will not be checked against the policy.

Parameters:

  • size_bytes (int | None, default: None ) –

    Object size in bytes.

  • subset_tags (Iterable[str] | None, default: None ) –

    Set of tags that must be a subset of the Policy's subset_tags to be valid.

  • superset_tags (Iterable[str] | None, default: None ) –

    Set of tags that must be a superset of the Policy's superset_tags to be valid.

Returns:

  • bool

    If the provided constraints are valid for the policy.

Source code in proxystore/connectors/multi.py
def is_valid(
    self,
    *,
    size_bytes: int | None = None,
    subset_tags: Iterable[str] | None = None,
    superset_tags: Iterable[str] | None = None,
) -> bool:
    """Check if set of constraints is valid for this policy.

    Note:
        All arguments are optional keyword arguments that default to
        `None`. If left as the default, that constraint will not be
        checked against the policy.

    Args:
        size_bytes: Object size in bytes.
        subset_tags: Set of tags that must be a subset
            of the Policy's `subset_tags` to be valid.
        superset_tags: Set of tags that must be a superset
            of the Policy's `superset_tags` to be valid.

    Returns:
        If the provided constraints are valid for the policy.
    """
    if size_bytes is not None and (
        size_bytes < self.min_size_bytes
        or size_bytes > self.max_size_bytes
    ):
        return False
    if subset_tags is not None and not set(subset_tags).issubset(
        self.subset_tags,
    ):
        return False
    if superset_tags is not None and not set(superset_tags).issuperset(
        self.superset_tags,
    ):
        return False
    return self.is_valid_on_host()

is_valid_on_host()

is_valid_on_host() -> bool

Check if this policy is valid on the current host.

Source code in proxystore/connectors/multi.py
def is_valid_on_host(self) -> bool:
    """Check if this policy is valid on the current host."""
    if self.host_pattern is None:
        return True

    patterns: Iterable[str]
    if isinstance(self.host_pattern, str):
        patterns = [self.host_pattern]
    else:
        patterns = self.host_pattern
    hostname = utils.hostname()
    return any(re.fullmatch(p, hostname) for p in patterns)

as_dict()

as_dict() -> PolicyDict

Convert the Policy to a JSON compatible dict.

Example
>>> policy = Policy(...)
>>> policy_dict = policy.as_dict()
>>> Policy(**policy_dict) == policy
True
Source code in proxystore/connectors/multi.py
def as_dict(self) -> PolicyDict:
    """Convert the Policy to a JSON compatible dict.

    Example:
        ```python
        >>> policy = Policy(...)
        >>> policy_dict = policy.as_dict()
        >>> Policy(**policy_dict) == policy
        True
        ```
    """
    # We could use dataclasses.asdict(self) but this gives us the benefit
    # of typing on the return dict.
    host_pattern = (
        self.host_pattern
        if isinstance(self.host_pattern, str) or self.host_pattern is None
        else list(self.host_pattern)
    )
    return PolicyDict(
        priority=self.priority,
        host_pattern=host_pattern,
        min_size_bytes=self.min_size_bytes,
        max_size_bytes=self.max_size_bytes,
        subset_tags=self.subset_tags,
        superset_tags=self.superset_tags,
    )

MultiConnectorError

Bases: Exception

Exceptions raised by the MultiConnector.

MultiKey

Bases: NamedTuple

Key to objects in MultiConnector.

Attributes:

  • connector_name (str) –

    Name of connector that the associated object is stored in.

  • connector_key (Any) –

    Key associated with the object.

MultiConnector

MultiConnector(
    connectors: dict[str, tuple[Connector[Any], Policy]],
    dormant_connectors: (
        dict[str, ConnectorPolicyConfig] | None
    ) = None,
)

Policy based manager for a Connector collection.

Example
from proxystore.connectors.file import FileConnector
from proxystore.connectors.multi import Policy
from proxystore.connectors.multi import MultiConnector
from proxystore.connectors.redis import RedisConnector

file_connector = FileConnector(...)
redis_connector = RedisConnector(...)

connectors = {
    'small': (file_connector, Policy(max_size_bytes=1000000)),
    'large': (redis_connector, Policy(min_size_bytes=1000000)),
}
connector = MultiConnector(connector)
Note

Methods of this class will raise MultiConnectorError if they are passed an invalid key where a key could be invalid because the connector which created the key is not known by this class instance or because the corresponding connector is dormant.

Parameters:

  • connectors (dict[str, tuple[Connector[Any], Policy]]) –

    Mapping of names to tuples of a Connector and Policy.

  • dormant_connectors (dict[str, ConnectorPolicyConfig] | None, default: None ) –

    Mapping of names to tuples containing the configuration of a dormant connector. A dormant connector is a connector that is unused in this process, but could potentially be initialized and used on another process. For example, because the host_pattern of the policy does not match the current host. It is not recommended to create dormant connector configurations yourself. Rather, create your connectors and use the host_pattern of the policy to determine when a connector should be dormant.

Source code in proxystore/connectors/multi.py
def __init__(
    self,
    connectors: dict[str, tuple[Connector[Any], Policy]],
    dormant_connectors: dict[str, ConnectorPolicyConfig] | None = None,
) -> None:
    self.connectors = {
        name: _ConnectorPolicy(connector, policy)
        for name, (connector, policy) in connectors.items()
    }
    self.dormant_connectors = dormant_connectors

    names = list(self.connectors.keys())
    self.connectors_by_priority = sorted(
        names,
        key=lambda name: self.connectors[name].policy.priority,
        reverse=True,
    )

close()

close() -> None

Close the connector and clean up.

Warning

This will call close() on all managed connectors.

Source code in proxystore/connectors/multi.py
def close(self) -> None:
    """Close the connector and clean up.

    Warning:
        This will call `close()` on all managed connectors.
    """
    for connector, _ in self.connectors.values():
        connector.close()

config()

Get the connector configuration.

The configuration contains all the information needed to reconstruct the connector object.

Source code in proxystore/connectors/multi.py
def config(self) -> dict[str, ConnectorPolicyConfig]:
    """Get the connector configuration.

    The configuration contains all the information needed to reconstruct
    the connector object.
    """
    configs: dict[str, ConnectorPolicyConfig] = (
        self.dormant_connectors
        if self.dormant_connectors is not None
        else {}
    )
    configs.update(
        {
            name: (
                get_object_path(type(connector)),
                connector.config(),
                policy.as_dict(),
            )
            for name, (connector, policy) in self.connectors.items()
        },
    )
    return configs

from_config() classmethod

from_config(
    config: dict[str, ConnectorPolicyConfig]
) -> MultiConnector

Create a new connector instance from a configuration.

Parameters:

Source code in proxystore/connectors/multi.py
@classmethod
def from_config(
    cls,
    config: dict[str, ConnectorPolicyConfig],
) -> MultiConnector:
    """Create a new connector instance from a configuration.

    Args:
        config: Configuration returned by `#!python .config()`.
    """
    connectors: dict[str, tuple[Connector[Any], Policy]] = {}
    dormant_connectors: dict[str, ConnectorPolicyConfig] = {}
    for name, (conn_path, conn_config, policy_dict) in config.items():
        policy = Policy(**policy_dict)
        if policy.is_valid_on_host():
            connector_type = import_from_path(conn_path)
            connector = connector_type.from_config(conn_config)
            connectors[name] = (connector, policy)
        else:
            dormant_connectors[name] = config[name]
    return cls(
        connectors=connectors,
        dormant_connectors=dormant_connectors,
    )

evict()

evict(key: MultiKey) -> None

Evict the object associated with the key.

Parameters:

  • key (MultiKey) –

    Key associated with object to evict.

Source code in proxystore/connectors/multi.py
def evict(self, key: MultiKey) -> None:
    """Evict the object associated with the key.

    Args:
        key: Key associated with object to evict.
    """
    connector = self._connector_from_key(key)
    connector.evict(key.connector_key)

exists()

exists(key: MultiKey) -> bool

Check if an object associated with the key exists.

Parameters:

  • key (MultiKey) –

    Key potentially associated with stored object.

Returns:

  • bool

    If an object associated with the key exists.

Source code in proxystore/connectors/multi.py
def exists(self, key: MultiKey) -> bool:
    """Check if an object associated with the key exists.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If an object associated with the key exists.
    """
    connector = self._connector_from_key(key)
    return connector.exists(key.connector_key)

get()

get(key: MultiKey) -> bytes | None

Get the serialized object associated with the key.

Parameters:

  • key (MultiKey) –

    Key associated with the object to retrieve.

Returns:

  • bytes | None

    Serialized object or None if the object does not exist.

Source code in proxystore/connectors/multi.py
def get(self, key: MultiKey) -> bytes | None:
    """Get the serialized object associated with the key.

    Args:
        key: Key associated with the object to retrieve.

    Returns:
        Serialized object or `None` if the object does not exist.
    """
    connector = self._connector_from_key(key)
    return connector.get(key.connector_key)

get_batch()

get_batch(keys: Sequence[MultiKey]) -> list[bytes | None]

Get a batch of serialized objects associated with the keys.

Parameters:

  • keys (Sequence[MultiKey]) –

    Sequence of keys associated with objects to retrieve.

Returns:

  • list[bytes | None]

    List with same order as keys with the serialized objects or None if the corresponding key does not have an associated object.

Source code in proxystore/connectors/multi.py
def get_batch(self, keys: Sequence[MultiKey]) -> list[bytes | None]:
    """Get a batch of serialized objects associated with the keys.

    Args:
        keys: Sequence of keys associated with objects to retrieve.

    Returns:
        List with same order as `keys` with the serialized objects or \
        `None` if the corresponding key does not have an associated object.
    """
    return [self.get(key) for key in keys]

put()

put(
    obj: bytes,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
) -> MultiKey

Put a serialized object in the store.

Parameters:

  • obj (bytes) –

    Serialized object to put in the store.

  • subset_tags (Iterable[str], default: () ) –

    Iterable of tags that must be a subset of a connector's policy subset_tags to match.

  • superset_tags (Iterable[str], default: () ) –

    Iterable of tags that must be a superset of a connectors's policy superset_tags to match.

Returns:

  • MultiKey

    Key which can be used to retrieve the object.

Raises:

Source code in proxystore/connectors/multi.py
def put(
    self,
    obj: bytes,
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
) -> MultiKey:
    """Put a serialized object in the store.

    Args:
        obj: Serialized object to put in the store.
        subset_tags: Iterable of tags that must be a subset
            of a connector's policy `subset_tags` to match.
        superset_tags: Iterable of tags that must be a superset
            of a connectors's policy `superset_tags` to match.

    Returns:
        Key which can be used to retrieve the object.

    Raises:
        MultiConnectorError: If no connector policy matches the arguments.
    """
    for connector_name in self.connectors_by_priority:
        connector, policy = self.connectors[connector_name]
        if policy.is_valid(
            size_bytes=len(obj),
            subset_tags=subset_tags,
            superset_tags=superset_tags,
        ):
            key = connector.put(obj)
            return MultiKey(
                connector_name=connector_name,
                connector_key=key,
            )
    raise MultiConnectorError(
        'No connector policy was suitable for the constraints: '
        f'subset_tags={subset_tags}, superset_tags={superset_tags}.',
    )

put_batch()

put_batch(
    objs: Sequence[bytes],
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
) -> list[MultiKey]

Put a batch of serialized objects in the store.

Warning

This method calls put() individually for each item in the batch so items in the batch can potentially be placed in different connectors.

Parameters:

  • objs (Sequence[bytes]) –

    Sequence of serialized objects to put in the store.

  • subset_tags (Iterable[str], default: () ) –

    Iterable of tags that must be a subset of a connector's policy subset_tags to match.

  • superset_tags (Iterable[str], default: () ) –

    Iterable of tags that must be a superset of a connectors's policy superset_tags to match.

Returns:

  • list[MultiKey]

    List of keys with the same order as objs which can be used to retrieve the objects.

Raises:

Source code in proxystore/connectors/multi.py
def put_batch(
    self,
    objs: Sequence[bytes],
    subset_tags: Iterable[str] = (),
    superset_tags: Iterable[str] = (),
) -> list[MultiKey]:
    """Put a batch of serialized objects in the store.

    Warning:
        This method calls
        [`put()`][proxystore.connectors.multi.MultiConnector] individually
        for each item in the batch so items in the batch can potentially
        be placed in different connectors.

    Args:
        objs: Sequence of serialized objects to put in the store.
        subset_tags: Iterable of tags that must be a subset
            of a connector's policy `subset_tags` to match.
        superset_tags: Iterable of tags that must be a superset
            of a connectors's policy `superset_tags` to match.

    Returns:
        List of keys with the same order as `objs` which can be used to \
        retrieve the objects.

    Raises:
        MultiConnectorError: If no connector policy matches the arguments.
    """
    return [
        self.put(obj, subset_tags=subset_tags, superset_tags=superset_tags)
        for obj in objs
    ]