Skip to content

proxystore.store.executor

Executor wrapper that automatically proxies input and output objects.

The following example wraps a ProcessPoolExecutor to automatically proxy certain input and output values. Here, we create a Store using a FileConnector. StoreExecutor takes a should_proxy argument which is a callable used to determine which inputs and output values should be proxied. In this example, we use ProxyType(str) which cause only instances of str to be proxied. All other input or output types will be ignored.

from concurrent.futures import ProcessPoolExecutor

from proxystore.connectors.file import FileConnector
from proxystore.proxy import Proxy
from proxystore.store import Store
from proxystore.store.executor import StoreExecutor, ProxyType

base_executor = ProcessPoolExecutor()
store = Store('executor-example', FileConnector('./object-cache'))

def concat(base: str, *, num: int) -> str:
    return f'{base}-{num}'

with StoreExecutor(
    base_executor,
    store=store,
    should_proxy=ProxyType(str),
) as executor:
    future = executor.submit(concat, 'foobar', num=42)
    result = future.result()

    assert isinstance(result, Proxy)
    assert result == 'foobar-42'

The execution of concat, above, uses a str and int inputs and produces a str output. Because we configured the StoreExecutor to proxy only str instances, only the str input and output were proxied. The int input was not proxied.

The should_proxy callable passed to StoreExecutor can be as complicated as you want. For example, you could write one which checks if an array is larger than some threshold.

ProxyAlways

Should-proxy callable which always returns True.

ProxyNever

Should-proxy callable which always returns False.

ProxyType

ProxyType(*types: type)

Proxy objects with matching types.

Example
from proxystore.store.executor import ProxyType

should_proxy = ProxyType(float, str)
assert not should_proxy([1, 2, 3])
assert should_proxy(3.14)
assert should_proxy('Hello, World!')

Parameters:

  • types (type, default: () ) –

    Variable number of object types for which objects of that type should be proxied.

Source code in proxystore/store/executor.py
def __init__(self, *types: type) -> None:
    self.types = types

StoreExecutor

StoreExecutor(
    executor: Executor,
    store: Store[Any],
    should_proxy: Callable[[Any], bool] | None = None,
    *,
    ownership: bool = True,
    close_store: bool = True
)

Bases: Executor

Executor wrapper that automatically proxies arguments and results.

By default, the StoreExecutor will automatically manage the memory of proxied objects by evicting proxied inputs after execution has completed (via callbacks on the futures) and using Ownership for result values.

Tip

This class is also compatible with some executor-like clients such as the Dask Distributed Client. While functionally compatible, mypy may consider the usage invalid if the specific client does not inherit from Executor.

Warning

Proxy Ownership may not be compatible with every executor type. If you encounter errors such as ReferenceInvalidError, set ownership=False and consider using alternate mechanisms for evicted data associated with proxies.

For example, ownership=True is not currently compatible with the Dask Distributed Client because Dask will maintain multiple references to the resulting OwnedProxy which breaks the ownership rules.

Parameters:

  • executor (Executor) –

    Executor to use for scheduling callables. This class takes ownership of executor, meaning that, when closed, it will also close executor.

  • store (Store[Any]) –

    Store to use for proxying arguments and results. This class takes ownership of store, meaning that, when closed, it will also close store.

  • should_proxy (Callable[[Any], bool] | None, default: None ) –

    Callable used to determine which arguments and results should be proxied. This is only applied to positional arguments, keyword arguments, and return values. Container types will not be recursively checked. The callable must be serializable. None defaults to ProxyNever.

  • ownership (bool, default: True ) –

    Use OwnedProxy for result values rather than Proxy types. OwnedProxy types will evict the proxied data from the store when they get garbage collected. If False and default proxies are used, it is the responsibility of the caller to clean up data associated with any result proxies.

  • close_store (bool, default: True ) –

    Close store when this executor is closed.

Source code in proxystore/store/executor.py
def __init__(
    self,
    executor: Executor,
    store: Store[Any],
    should_proxy: Callable[[Any], bool] | None = None,
    *,
    ownership: bool = True,
    close_store: bool = True,
) -> None:
    if should_proxy is None:
        should_proxy = ProxyNever()

    self.executor = executor
    self.store = store
    self.should_proxy: Callable[[Any], bool] = should_proxy
    self.ownership = ownership
    self.close_store = close_store

    self._registered: dict[
        Callable[..., Any],
        _FunctionWrapper[Any, Any],
    ] = {}

submit

submit(
    function: Callable[P, R],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[R | Proxy[R]]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, R]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • Future[R | Proxy[R]]

    Future representing the result of the execution of the callable.

Source code in proxystore/store/executor.py
def submit(
    self,
    function: Callable[P, R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[R | Proxy[R]]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future] representing \
        the result of the execution of the callable.
    """
    # We cast the transformed args and kwargs back to P.args and P.kwargs,
    # but note that those types aren't exactly correct. Some items
    # may be Proxy[T] rather than T, but this is not practicle to type.
    pargs, keys1 = _proxy_iterable(args, self.store, self.should_proxy)
    pargs = cast(P.args, pargs)

    pkwargs, keys2 = _proxy_mapping(kwargs, self.store, self.should_proxy)
    pkwargs = cast(P.kwargs, pkwargs)

    wrapped = self._wrapped(function)
    future = self.executor.submit(wrapped, *pargs, **pkwargs)

    future.add_done_callback(_evict_callback(self.store, keys1 + keys2))
    return future

map

map(
    function: Callable[P, R],
    *iterables: Iterable[args],
    **kwargs: Any
) -> Iterator[R | Proxy[R]]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, R]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • kwargs (Any, default: {} ) –

    Keyword arguments to pass to self.executor.map().

Returns:

  • Iterator[R | Proxy[R]]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Source code in proxystore/store/executor.py
def map(  # type: ignore[override]
    self,
    function: Callable[P, R],
    *iterables: Iterable[P.args],
    **kwargs: Any,
) -> Iterator[R | Proxy[R]]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        kwargs: Keyword arguments to pass to `self.executor.map()`.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.
    """
    iterables, keys = _proxy_iterable(
        iterables,
        self.store,
        self.should_proxy,
    )

    wrapped = self._wrapped(function)
    results = self.executor.map(wrapped, *iterables, **kwargs)

    def _result_iterator() -> Generator[R, None, None]:
        for result in results:
            if isinstance(result, _FutureProtocol):
                # Some Executor-like classes return futures from map()
                # so we internally handle that here.
                timeout = kwargs.get('timeout')
                yield result.result(timeout=timeout)
            else:
                yield result

        # Wait to evict input proxies until all results have been received.
        # Waiting is needed because there is no guarantee what order tasks
        # complete in.
        for key in keys:
            self.store.evict(key)

    return _result_iterator()

shutdown

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the executor and close the store.

Warning

This will close the Store passed to this StoreExecutor instance if close_store=True, but it is possible the store is reinitialized again if ownership=True was configured and register=True was passed to the store. Any OwnedProxy instances returned by functions invoked through this executor that are still alive will evict themselves once they are garbage collected. Eviction requires a store instance so the garbage collection processes can inadvertently reinitialize and register a store that was previously closed.

Note

Arguments are only used if the wrapped executor is an instance of Python's Executor.

Parameters:

  • wait (bool, default: True ) –

    Wait on all pending futures to complete.

  • cancel_futures (bool, default: False ) –

    Cancel all pending futures that the executor has not started running.

Source code in proxystore/store/executor.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the executor and close the store.

    Warning:
        This will close the [`Store`][proxystore.store.base.Store] passed
        to this [`StoreExecutor`][proxystore.store.executor.StoreExecutor]
        instance if `close_store=True`, but it is possible the store is
        reinitialized again if `ownership=True` was configured and
        `register=True` was passed to the store. Any
        [`OwnedProxy`][proxystore.store.ref.OwnedProxy]
        instances returned by functions invoked through this executor that
        are still alive will evict themselves once they are garbage
        collected. Eviction requires a store instance so the garbage
        collection processes can inadvertently reinitialize and register
        a store that was previously closed.

    Note:
        Arguments are only used if the wrapped executor is an instance
        of Python's [`Executor`][concurrent.futures.Executor].

    Args:
        wait: Wait on all pending futures to complete.
        cancel_futures: Cancel all pending futures that the executor
            has not started running.
    """
    if isinstance(self.executor, Executor):
        self.executor.shutdown(
            wait=wait,
            cancel_futures=cancel_futures,
        )
    elif hasattr(self.executor, 'close'):
        # Handle Executor-like classes that don't quite follow the
        # Executor protocol, such as the Dask Distributed Client.
        self.executor.close()
    else:
        warnings.warn(
            f'Cannot shutdown {type(self.executor).__name__} because it '
            f'is not a subclass of {Executor.__name__} nor does it have '
            'a close() method.',
            category=RuntimeWarning,
            stacklevel=2,
        )

    if self.close_store:
        self.store.close()