Dask Distributed with ProxyStore¶
Last updated 24 April 2024
This guide walks through using ProxyStore in Dask Distributed. ProxyStore can be used to efficiently pass large intermediate values between function invocations.
Note
Some familiarity with using Dask Distributed and ProxyStore is assumed. Check out the Dask Distributed Quickstart and ProxyStore Get Started to learn more.
Installation¶
Create a new virtual environment of your choosing and install Dask Distributed and ProxyStore.
Note
The below versions represent the latest versions of these packages available when this guide was written. These instructions should generally work with newer versions as well.
$ python -m venv venv
$ . venv/bin/activate
$ pip install dask[distributed]==2024.4.2 proxystore==0.6.5
Using Dask Distributed¶
Dask Distributed is a library for futures-based distributed computing.
The Client.submit()
and Client.map()
methods behave similarly to those of concurrent.futures.Executor
.
Consider this trivial example where we submit sum()
on a list of numbers.
example.py | |
---|---|
Using ProxyStore¶
Dask Distributed has many builtin optimizations for data management when working with array-like data (e.g., NumPy arrays for Pandas dataframes). However, other large objects can cause performance degradation when serialized along with the task graph. ProxyStore provides a seamless alternative for passing objects to and from task invocations.
Here, we will modify the above example to use ProxyStore's FileConnector
to communicate intermediate data.
This example will work the same for any Connector
implementations, but different implementations can yield different performance benefits depending on the data or Dask Distributed deployment characteristics.
- Setting
populate_target=True
is always recommended with Dask Distributed. - Setting
register=True
is always recommended with Dask Distributed.
As expected, the result is the same.
Under the hood, ProxyStore is serializing x
and putting the value in the connector.
The resulting proxy
acts like a reference to the x
that is now stored in a shared location.
The reference-like nature of proxy
means that Dask does not end up serializing or transferring x
itself; rather, Dask serializes the lightweight proxy
.
The transparent nature of proxy
means that when used by the task, proxy
will resolve to and act like x
ensuring that the functionality of the program is the exact same.
Performance Tips¶
In the above example, we set two flags (register
and populate_target
) which will improve performance with ProxyStore in Dask Distributed applications.
Passing register=True
will call register_store()
automatically to register the Store
instance globally by name.
This enables proxies to reuse the same store instance, improving performance by sharing the same cache and stateful connections.
Most important for ProxyStore performance in Dask Distributed is populate_target=True
.
When True
, created proxies will be "pre-resolved" and have their __class__
and __hash__
attributes cached inside the proxy.
This allows Dask to call hash()
and isinstance()
on a proxy without needing to resolve the proxy.
If populate_target=False
and we run the example with DEBUG
level logging enabled, we will see that the target object of the proxy is retrieved three times.
$ python
INFO:proxystore.store:Registered a store named "dask"
INFO:proxystore.store.base:Initialized Store("dask", connector=FileConnector(directory=/tmp/proxystore-cache), serializer=default, deserializer=default, cache_size=16, metrics=False)
DEBUG:proxystore.store.base:Store(name="dask"): PUT FileKey(filename='3682883f-40bd-4990-bec0-73242f56067a') in 0.058 ms
DEBUG:proxystore.store.base:Store(name="dask"): PROXY FileKey(filename='3682883f-40bd-4990-bec0-73242f56067a') in 0.108 ms
DEBUG:proxystore.store.base:Store(name="dask"): GET FileKey(filename='3682883f-40bd-4990-bec0-73242f56067a') in 0.026 ms (cached=False)
DEBUG:proxystore.store.base:Store(name="dask"): GET FileKey(filename='3682883f-40bd-4990-bec0-73242f56067a') in 0.002 ms (cached=True)
INFO:proxystore.store:Registered a store named "dask"
INFO:proxystore.store.base:Initialized Store("dask", connector=FileConnector(directory=/tmp/proxystore-cache), serializer=default, deserializer=default, cache_size=16, metrics=False)
INFO:proxystore.store:Registered a store named "dask"
DEBUG:proxystore.store.base:Store(name="dask"): GET FileKey(filename='3682883f-40bd-4990-bec0-73242f56067a') in 0.034 ms (cached=False)
Result: 4950
INFO:proxystore.store:Unregistered a store named dask
Each GET
message corresponds to an instance of proxy
being resolved.
In this example, this happens (1) when the Dask client serializes proxy
, (2) on the Dask scheduler when the task request message is processed, and (3) on the Dask worker when proxy
is actually used in the computation.
If x
was very large or costly to retrieve, this could significantly increase the application's memory usage or harmfully reduce task dispatch latency.
Running the example again with logging enabled but populate_target=True
will produce a single GET
message corresponding to the Dask worker resolving proxy
when the sum is computed which is optimal for performance.
Memory Management¶
The Store
, by default, will not delete stored objects once they are no longer needed.
In the above example, this means that x
will be stored in the FileConnector
until Store.close()
is called and the directory /tmp/proxystore-cache
is deleted.
(Here, Store.close()
is called when exiting the with
context block.)
However, it is not a requirement that Connector
implementations clear stored objects when closed.
In this case, the shared object x
would be "leaked" because it was never deleted when no longer needed by the application.
ProxyStore provides many opt-in mechanisms for automated management of shared objects.
For single-use proxies, passing evict=True
to Store.proxy()
will automatically delete the object from the store once the proxy is resolved.
In more complex scenarios where a proxy may be used by many processes, Lifetimes or Ownership can be used.
Check out the Object Lifetimes guide to learn more.