PeerManager(
relay_client: RelayClient,
*,
timeout: int = 30,
peer_channels: int = 1
)
Peer Connections Manager.
Handles establishing peer connections via
aiortc, responding to
requests for new peer connections from the relay server, and sending
and receiving data to/from existing peer connections.
Example
from proxystore.p2p.manager import PeerManager
from proxystore.p2p.relay import BasicRelayClient
relay_client = BasicRelayClient(relay_server_address)
pm1 = await PeerManager(relay_client)
pm2 = await PeerManager(relay_client)
await pm1.send(pm2.uuid, 'hello hello')
source_uuid, message = await pm2.recv()
assert source_uuid == pm1.uuid
assert message == 'hello hello'
await pm1.close()
await pm2.close()
Note
The class can also be used as an asynchronous context manager.
async with PeerManager(..) as manager:
...
Parameters:
-
relay_client
(RelayClient
)
–
Established client interface to a relay server.
-
timeout
(int
, default:
30
)
–
Timeout in seconds when waiting for a peer connection to be
established.
-
peer_channels
(int
, default:
1
)
–
number of datachannels to split message sending over
between each peer.
Raises:
-
ValueError
–
If the relay server address does not start with "ws://"
or "wss://".
Source code in proxystore/p2p/manager.py
| def __init__(
self,
relay_client: RelayClient,
*,
timeout: int = 30,
peer_channels: int = 1,
) -> None:
self._relay_client = relay_client
self._timeout = timeout
self._peer_channels = peer_channels
self._peers_lock = asyncio.Lock()
self._peers: dict[frozenset[UUID], PeerConnection] = {}
self._message_queue: asyncio.Queue[
tuple[UUID, bytes | str]
] = asyncio.Queue()
self._server_task: asyncio.Task[None] | None = None
self._tasks: dict[frozenset[UUID], asyncio.Task[None]] = {}
|
name
property
Name of client as registered with relay server.
uuid
property
UUID of client as registered with relay server.
relay_client
property
relay_client: RelayClient
Relay client interface.
Raises:
async_init()
async
Connect to relay server and being listening to incoming messages.
Source code in proxystore/p2p/manager.py
| async def async_init(self) -> None:
"""Connect to relay server and being listening to incoming messages."""
await self._relay_client.connect()
if self._server_task is None:
self._server_task = spawn_guarded_background_task(
self._handle_server_messages,
)
self._server_task.set_name('peer-manager-server-message-handler')
|
close()
async
Close the connection manager.
Warning
This will close all create peer connections and close the
connection to the relay server.
Source code in proxystore/p2p/manager.py
| async def close(self) -> None:
"""Close the connection manager.
Warning:
This will close all create peer connections and close the
connection to the relay server.
"""
if self._server_task is not None:
self._server_task.cancel()
try:
await self._server_task
except (asyncio.CancelledError, SafeTaskExitError):
pass
for task in self._tasks.values():
task.cancel()
try:
await task
except (asyncio.CancelledError, SafeTaskExitError):
pass
async with self._peers_lock:
for connection in self._peers.values():
await connection.close()
await self.relay_client.close()
logger.info(f'{self._log_prefix}: peer manager closed')
|
close_connection()
async
close_connection(peers: Iterable[UUID]) -> None
Close a peer connection if it exists.
This will close the associated
PeerConnection
and
cancel the asyncio task handling peer messages. If the
PeerManager
is used to
send a message from the peer again, a new connection will be
established.
Parameters:
-
peers
(Iterable[UUID]
)
–
Iterable containing the two peer UUIDs taking part in the
connection that should be closed.
Source code in proxystore/p2p/manager.py
| async def close_connection(self, peers: Iterable[UUID]) -> None:
"""Close a peer connection if it exists.
This will close the associated
[`PeerConnection`][proxystore.p2p.connection.PeerConnection] and
cancel the asyncio task handling peer messages. If the
[`PeerManager`][proxystore.p2p.manager.PeerManager] is used to
send a message from the peer again, a new connection will be
established.
Args:
peers: Iterable containing the two peer UUIDs taking part in the
connection that should be closed.
"""
peers = frozenset(peers)
async with self._peers_lock:
connection = self._peers.pop(peers, None)
if connection is not None:
logger.info(
f'{self._log_prefix} Closing connection between peers: '
f'{", ".join(str(peer) for peer in peers)}',
)
await connection.close()
task = self._tasks.pop(peers, None)
if task is not None:
task.cancel()
try:
await task
except (asyncio.CancelledError, SafeTaskExitError):
pass
|
recv()
async
recv() -> tuple[UUID, bytes | str]
Receive next message from a peer.
Returns:
-
tuple[UUID, bytes | str]
–
Tuple containing the UUID of the peer that sent the message and the message itself.
Source code in proxystore/p2p/manager.py
| async def recv(self) -> tuple[UUID, bytes | str]:
"""Receive next message from a peer.
Returns:
Tuple containing the UUID of the peer that sent the message \
and the message itself.
"""
return await self._message_queue.get()
|
send()
async
send(
peer_uuid: UUID,
message: bytes | str,
timeout: float = 30,
) -> None
Send message to peer.
Parameters:
-
peer_uuid
(UUID
)
–
UUID of peer to send message to.
-
message
(bytes | str
)
–
-
timeout
(float
, default:
30
)
–
Timeout to wait on peer connection to be ready.
Raises:
Source code in proxystore/p2p/manager.py
| async def send(
self,
peer_uuid: UUID,
message: bytes | str,
timeout: float = 30,
) -> None:
"""Send message to peer.
Args:
peer_uuid: UUID of peer to send message to.
message: Message to send to peer.
timeout: Timeout to wait on peer connection to be ready.
Raises:
PeerConnectionTimeoutError: If the peer connection is not
established within the timeout.
"""
connection = await self.get_connection(peer_uuid)
await connection.send(message, timeout)
|
get_connection()
async
get_connection(peer_uuid: UUID) -> PeerConnection
Get connection to the peer.
Parameters:
-
peer_uuid
(UUID
)
–
UUID of peer to make connection with.
Returns:
Source code in proxystore/p2p/manager.py
| async def get_connection(self, peer_uuid: UUID) -> PeerConnection:
"""Get connection to the peer.
Args:
peer_uuid: UUID of peer to make connection with.
Returns:
The peer connection object.
"""
peers = frozenset({self.uuid, peer_uuid})
async with self._peers_lock:
if peers in self._peers:
return self._peers[peers]
connection = PeerConnection(
self.relay_client,
channels=self._peer_channels,
)
self._peers[peers] = connection
logger.info(
f'{self._log_prefix}: opening peer connection with '
f'{peer_uuid}',
)
await connection.send_offer(peer_uuid)
self._tasks[peers] = spawn_guarded_background_task(
self._handle_peer_messages,
peer_uuid,
connection,
)
self._tasks[peers].set_name(
f'handle-peer-messages-{self.uuid}-{peer_uuid}',
)
connection.on_close_callback(self.close_connection, peers)
return connection
|