Skip to content

proxystore.store.factory

Factory implementations.

StoreFactory

StoreFactory(
    key: ConnectorKeyT,
    store_config: StoreConfig,
    *,
    evict: bool = False,
    deserializer: DeserializerT | None = None
)

Bases: Generic[ConnectorT, T]

Factory that resolves an object from a store.

Adds support for asynchronously retrieving objects from a Store instance.

The factory takes the store_config parameter that is used to reinitialize the store if the factory is sent to a remote process where the store has not already been initialized.

Parameters:

  • key (ConnectorKeyT) –

    Key corresponding to object in store.

  • store_config (StoreConfig) –

    Store configuration used to reinitialize the store if needed.

  • evict (bool, default: False ) –

    If True, evict the object from the store once resolve() is called.

  • deserializer (DeserializerT | None, default: None ) –

    Optional callable used to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

Source code in proxystore/store/factory.py
def __init__(
    self,
    key: ConnectorKeyT,
    store_config: StoreConfig,
    *,
    evict: bool = False,
    deserializer: DeserializerT | None = None,
) -> None:
    self.key = key
    self.store_config = store_config
    self.evict = evict
    self.deserializer = deserializer

    # The following are not included when a factory is serialized
    # because they are specific to that instance of the factory
    self._obj_future: Future[T] | None = None

get_store()

get_store() -> Store[ConnectorT]

Get store and reinitialize if necessary.

Source code in proxystore/store/factory.py
def get_store(self) -> Store[ConnectorT]:
    """Get store and reinitialize if necessary."""
    return proxystore.store.get_or_create_store(
        self.store_config,
        register=True,
    )

resolve()

resolve() -> T

Get object associated with key from store.

Raises:

Source code in proxystore/store/factory.py
def resolve(self) -> T:
    """Get object associated with key from store.

    Raises:
        ProxyResolveMissingKeyError: If the key associated with this
            factory does not exist in the store.
    """
    with Timer() as timer:
        store = self.get_store()
        obj = store.get(
            self.key,
            deserializer=self.deserializer,
            default=_MISSING_OBJECT,
        )

        if obj is _MISSING_OBJECT:
            raise ProxyResolveMissingKeyError(
                self.key,
                type(store),
                store.name,
            )

        if self.evict:
            store.evict(self.key)

    if store.metrics is not None:
        total_time = timer.elapsed_ns
        store.metrics.add_time('factory.resolve', self.key, total_time)

    return cast(T, obj)

resolve_async()

resolve_async() -> None

Asynchronously get object associated with key from store.

Source code in proxystore/store/factory.py
def resolve_async(self) -> None:
    """Asynchronously get object associated with key from store."""
    logger.debug(f'Starting asynchronous resolve of {self.key}')
    self._obj_future = _default_pool.submit(self.resolve)

PollingStoreFactory

PollingStoreFactory(
    key: ConnectorKeyT,
    store_config: StoreConfig,
    *,
    deserializer: DeserializerT | None = None,
    evict: bool = False,
    polling_interval: float = 1,
    polling_backoff_factor: float = 1,
    polling_interval_limit: float | None = None,
    polling_timeout: float | None = None
)

Bases: StoreFactory[ConnectorT, T]

Factory that polls a store until and object can be resolved.

This is an extension of the StoreFactory with the resolve() method overridden to poll the store until the target object is available.

Parameters:

  • key (ConnectorKeyT) –

    Key corresponding to object in store.

  • store_config (StoreConfig) –

    Store configuration used to reinitialize the store if needed.

  • deserializer (DeserializerT | None, default: None ) –

    Optional callable used to deserialize the byte string. If None, the default deserializer (deserialize()) will be used.

  • evict (bool, default: False ) –

    If True, evict the object from the store once resolve() is called.

  • polling_interval (float, default: 1 ) –

    Initial seconds to sleep between polling the store for the object.

  • polling_backoff_factor (float, default: 1 ) –

    Multiplicative factor applied to the polling_interval applied after each unsuccessful poll.

  • polling_interval_limit (float | None, default: None ) –

    Maximum polling interval allowed. Prevents the backoff factor from increasing the current polling interval to unreasonable values.

  • polling_timeout (float | None, default: None ) –

    Optional maximum number of seconds to poll for. If the timeout is reached an error is raised.

Source code in proxystore/store/factory.py
def __init__(
    self,
    key: ConnectorKeyT,
    store_config: StoreConfig,
    *,
    deserializer: DeserializerT | None = None,
    evict: bool = False,
    polling_interval: float = 1,
    polling_backoff_factor: float = 1,
    polling_interval_limit: float | None = None,
    polling_timeout: float | None = None,
) -> None:
    super().__init__(
        key,
        store_config,
        evict=evict,
        deserializer=deserializer,
    )
    self._polling_interval = polling_interval
    self._polling_backoff_factor = polling_backoff_factor
    self._polling_interval_limit = polling_interval_limit
    self._polling_timeout = polling_timeout

get_store()

get_store() -> Store[ConnectorT]

Get store and reinitialize if necessary.

Source code in proxystore/store/factory.py
def get_store(self) -> Store[ConnectorT]:
    """Get store and reinitialize if necessary."""
    return proxystore.store.get_or_create_store(
        self.store_config,
        register=True,
    )

resolve_async()

resolve_async() -> None

Asynchronously get object associated with key from store.

Source code in proxystore/store/factory.py
def resolve_async(self) -> None:
    """Asynchronously get object associated with key from store."""
    logger.debug(f'Starting asynchronous resolve of {self.key}')
    self._obj_future = _default_pool.submit(self.resolve)

resolve()

resolve() -> T

Get object associated with key from store.

Raises:

Source code in proxystore/store/factory.py
def resolve(self) -> T:
    """Get object associated with key from store.

    Raises:
        ProxyResolveMissingKeyError: If the object associated with the
            key is not available after `polling_timeout` seconds.
    """
    with Timer() as timer:
        store = self.get_store()
        sleep_interval = self._polling_interval
        time_waited = 0.0

        while True:
            obj = store.get(
                self.key,
                deserializer=self.deserializer,
                default=_MISSING_OBJECT,
            )

            # Break because we found the object or we hit the timeout
            if obj is not _MISSING_OBJECT or (
                self._polling_timeout is not None
                and time_waited >= self._polling_timeout
            ):
                break

            time.sleep(sleep_interval)
            time_waited += sleep_interval
            new_interval = sleep_interval * self._polling_backoff_factor
            sleep_interval = (
                new_interval
                if self._polling_interval_limit is None
                else min(new_interval, self._polling_interval_limit)
            )

        if obj is _MISSING_OBJECT:
            raise ProxyResolveMissingKeyError(
                self.key,
                type(store),
                store.name,
            )
        elif self.evict:
            store.evict(self.key)

    if store.metrics is not None:
        total_time = timer.elapsed_ns
        store.metrics.add_time(
            'factory.polling_resolve',
            self.key,
            total_time,
        )

    return cast(T, obj)