proxystore.connectors.dim.ucx¶
UCX-based distributed in-memory connector implementation.
UCXConnector ¶
UCXConnector(
port: int,
address: str | None = None,
interface: str | None = None,
timeout: float = 1,
) -> None
UCX-based distributed in-memory connector.
Note
The first instance of this connector created on a process will
spawn a UCXServer
that will store data. Hence, this connector just acts as an interface
to that server.
Parameters:
-
port
(
int
) –The desired port for the spawned server.
-
address
(
str | None
) –The IP address of the network interface to use. Has precedence over
interface
if both are provided. -
interface
(
str | None
) –The network interface to use.
address
has precedence if both args are defined. -
timeout
(
float
) –Timeout in seconds to try connecting to local server before spawning one.
Raises:
-
ServerTimeoutError
–If a local server cannot be connected to within
timeout
seconds, and a new local server does not response withintimeout
seconds after being started.
Source code in proxystore/connectors/dim/ucx.py
close() ¶
Close the connector.
Parameters:
-
kill_server
(
bool
) –Whether to kill the server process. If this instance did not spawn the local node's server process, this is a no-op.
Source code in proxystore/connectors/dim/ucx.py
config() ¶
Get the connector configuration.
The configuration contains all the information needed to reconstruct the connector object.
Source code in proxystore/connectors/dim/ucx.py
from_config()
classmethod
¶
Create a new connector instance from a configuration.
Parameters:
evict() ¶
Evict the object associated with the key.
Parameters:
-
key
(
DIMKey
) –Key associated with object to evict.
exists() ¶
Check if an object associated with the key exists.
Parameters:
-
key
(
DIMKey
) –Key potentially associated with stored object.
Returns:
-
bool
–If an object associated with the key exists.
Source code in proxystore/connectors/dim/ucx.py
get() ¶
Get the serialized object associated with the key.
Parameters:
-
key
(
DIMKey
) –Key associated with the object to retrieve.
Returns:
-
bytes | None
–Serialized object or
None
if the object does not exist.
Source code in proxystore/connectors/dim/ucx.py
get_batch() ¶
Get a batch of serialized objects associated with the keys.
Parameters:
Returns:
-
list[bytes | None]
–List with same order as
keys
with the serialized objects orNone
if the corresponding key does not have an associated object.
Source code in proxystore/connectors/dim/ucx.py
put() ¶
Put a serialized object in the store.
Parameters:
-
obj
(
bytes
) –Serialized object to put in the store.
Returns:
-
DIMKey
–Key which can be used to retrieve the object.
Source code in proxystore/connectors/dim/ucx.py
put_batch() ¶
Put a batch of serialized objects in the store.
Parameters:
Returns:
Source code in proxystore/connectors/dim/ucx.py
UCXServer ¶
UCXServer implementation.
Source code in proxystore/connectors/dim/ucx.py
evict() ¶
Evict the object associated with the key.
Parameters:
-
key
(
str
) –Key associated with object to evict.
exists() ¶
get() ¶
put() ¶
Put data in the store.
Parameters:
handle_rpc() ¶
Process an RPC request.
Parameters:
-
rpc
(
RPC
) –Client RPC to process.
Returns:
-
RPCResponse
–Response containing result or an exception if the operation failed.
Source code in proxystore/connectors/dim/ucx.py
handler()
async
¶
Handle endpoint requests.
Parameters:
-
ep
(
ucp.Endpoint
) –The endpoint making the request.
Source code in proxystore/connectors/dim/ucx.py
run_server()
async
¶
Listen and reply to RPCs from clients.
Warning
This function does not return until SIGINT or SIGTERM is received.
Parameters:
-
port
(
int
) –Port the server should listen on.
Source code in proxystore/connectors/dim/ucx.py
start_server() ¶
Run a local server.
Note
This function creates an event loop and executes
run_server()
within
that loop.
Parameters:
-
port
(
int
) –Port the server should listen on.
Source code in proxystore/connectors/dim/ucx.py
spawn_server() ¶
spawn_server(
address: str,
port: int,
*,
spawn_timeout: float = 5.0,
kill_timeout: float | None = 1.0
) -> multiprocessing.context.SpawnProcess
Spawn a local server running in a separate process.
Note
An atexit
callback is registered which will terminate the spawned
server process when the calling process exits.
Parameters:
-
address
(
str
) –IP address the server will listen on.
-
port
(
int
) –Port the server will listen on.
-
spawn_timeout
(
float
) –Max time in seconds to wait for the server to start.
-
kill_timeout
(
float | None
) –Max time in seconds to wait for the server to shutdown on exit.
Returns:
-
multiprocessing.context.SpawnProcess
–The process that the server is running in.
Source code in proxystore/connectors/dim/ucx.py
wait_for_server_async()
async
¶
Wait until the server responds.
Parameters:
-
address
(
str
) –Host IP of the server to ping.
-
port
(
int
) –Port of the server to ping.
-
timeout
(
float
) –Max time in seconds to wait for server response.
Raises:
-
ServerTimeoutError
–If the server does not respond within the timeout.
Source code in proxystore/connectors/dim/ucx.py
wait_for_server() ¶
Wait until the server responds.
Note
This function calls
wait_for_server_async()
using asyncio.run()
.
Parameters:
-
address
(
str
) –The host IP of the server to ping.
-
port
(
int
) –Theport of the server to ping.
-
timeout
(
float
) –The max time in seconds to wait for server response.
Raises:
-
ServerTimeoutError
–If the server does not respond within the timeout.
Source code in proxystore/connectors/dim/ucx.py
reset_ucp_async()
async
¶
Hard reset all of UCP.
UCP provides ucp.reset()
; however, this function does not correctly
shutdown all asyncio tasks and readers. This function wraps
ucp.reset()
and additionally removes all readers on the event loop
and cancels/awaits all asyncio tasks.