proxystore.store.executor¶
Executor wrapper that automatically proxies input and output objects.
The following example wraps a
ProcessPoolExecutor
to
automatically proxy certain input and output values.
Here, we create a Store
using a
FileConnector
.
StoreExecutor
takes a
should_proxy
argument which is a callable used to determine which inputs
and output values should be proxied. In this example, we use
ProxyType(str)
which cause only
instances of str
to be proxied. All other input or output types
will be ignored.
from concurrent.futures import ProcessPoolExecutor
from proxystore.connectors.file import FileConnector
from proxystore.proxy import Proxy
from proxystore.store import Store
from proxystore.store.executor import StoreExecutor, ProxyType
base_executor = ProcessPoolExecutor()
store = Store('executor-example', FileConnector('./object-cache'))
def concat(base: str, *, num: int) -> str:
return f'{base}-{num}'
with StoreExecutor(
base_executor,
store=store,
should_proxy=ProxyType(str),
) as executor:
future = executor.submit(concat, 'foobar', num=42)
result = future.result()
assert isinstance(result, Proxy)
assert result == 'foobar-42'
The execution of concat
, above, uses a str
and int
inputs
and produces a str
output. Because we configured the
StoreExecutor
to proxy only
str
instances, only the str input and output were proxied.
The int
input was not proxied.
The should_proxy
callable passed to
StoreExecutor
can be as
complicated as you want. For example, you could write one which checks if
an array is larger than some threshold.
ProxyAlways
¶
Should-proxy callable which always returns True
.
ProxyNever
¶
Should-proxy callable which always returns False
.
ProxyType
¶
ProxyType(*types: type)
Proxy objects with matching types.
Example
Parameters:
-
types
(type
, default:()
) –Variable number of object types for which objects of that type should be proxied.
Source code in proxystore/store/executor.py
StoreExecutor
¶
StoreExecutor(
executor: Executor,
store: Store[Any],
should_proxy: Callable[[Any], bool] | None = None,
*,
ownership: bool = True,
close_store: bool = True
)
Bases: Executor
Executor wrapper that automatically proxies arguments and results.
By default, the StoreExecutor
will automatically manage the memory of proxied objects by evicting
proxied inputs after execution has completed (via callbacks on the
futures) and using Ownership for
result values.
Tip
This class is also compatible with some executor-like clients such
as the Dask Distributed
Client
.
While functionally compatible, mypy may consider the usage invalid
if the specific client does not inherit from
Executor
.
Warning
Proxy Ownership may not be
compatible with every executor type. If you encounter errors such as
ReferenceInvalidError
,
set ownership=False
and consider using alternate mechanisms for
evicted data associated with proxies.
For example, ownership=True
is not currently compatible with the
Dask Distributed Client
because Dask will maintain multiple
references to the resulting
OwnedProxy
which breaks
the ownership rules.
Parameters:
-
executor
(Executor
) –Executor to use for scheduling callables. This class takes ownership of
executor
, meaning that, when closed, it will also closeexecutor
. -
store
(Store[Any]
) –Store to use for proxying arguments and results. This class takes ownership of
store
, meaning that, when closed, it will also closestore
. -
should_proxy
(Callable[[Any], bool] | None
, default:None
) –Callable used to determine which arguments and results should be proxied. This is only applied to positional arguments, keyword arguments, and return values. Container types will not be recursively checked. The callable must be serializable.
None
defaults toProxyNever
. -
ownership
(bool
, default:True
) –Use
OwnedProxy
for result values rather thanProxy
types.OwnedProxy
types will evict the proxied data from the store when they get garbage collected. IfFalse
and default proxies are used, it is the responsibility of the caller to clean up data associated with any result proxies. -
close_store
(bool
, default:True
) –Close
store
when this executor is closed.
Source code in proxystore/store/executor.py
submit
¶
Schedule the callable to be executed.
Parameters:
-
function
(Callable[P, R]
) –Callable to execute.
-
args
(args
, default:()
) –Positional arguments.
-
kwargs
(kwargs
, default:{}
) –Keyword arguments.
Returns:
Source code in proxystore/store/executor.py
map
¶
map(
function: Callable[P, R],
*iterables: Iterable[args],
**kwargs: Any
) -> Iterator[R | Proxy[R]]
Map a function onto iterables of arguments.
Parameters:
-
function
(Callable[P, R]
) –A callable that will take as many arguments as there are passed iterables.
-
iterables
(Iterable[args]
, default:()
) –Variable number of iterables.
-
kwargs
(Any
, default:{}
) –Keyword arguments to pass to
self.executor.map()
.
Returns:
-
Iterator[R | Proxy[R]]
–An iterator equivalent to:
map(func, *iterables)
but the calls may be evaluated out-of-order.
Source code in proxystore/store/executor.py
shutdown
¶
Shutdown the executor and close the store.
Warning
This will close the Store
passed
to this StoreExecutor
instance if close_store=True
, but it is possible the store is
reinitialized again if ownership=True
was configured and
register=True
was passed to the store. Any
OwnedProxy
instances returned by functions invoked through this executor that
are still alive will evict themselves once they are garbage
collected. Eviction requires a store instance so the garbage
collection processes can inadvertently reinitialize and register
a store that was previously closed.
Note
Arguments are only used if the wrapped executor is an instance
of Python's Executor
.
Parameters:
-
wait
(bool
, default:True
) –Wait on all pending futures to complete.
-
cancel_futures
(bool
, default:False
) –Cancel all pending futures that the executor has not started running.