PeerManager(
uuid: UUID,
relay_server: str,
name: str | None = None,
*,
timeout: int = 30,
peer_channels: int = 1,
verify_certificate: bool = True
) -> None
Peer Connections Manager.
Handles establishing peer connections via
aiortc, responding to
requests for new peer connections from the relay server, and sending
and receiving data to/from existing peer connections.
Example
from proxystore.p2p.manager import PeerManager
pm1 = await PeerManager(uuid.uuid4(), relay_server_address)
pm2 = await PeerManager(uuid.uuid4(), relay_server_address)
await pm1.send(pm2.uuid, 'hello hello')
source_uuid, message = await pm2.recv()
assert source_uuid == pm1.uuid
assert message == 'hello hello'
pm1.close()
pm2.close()
Note
The class can also be used as a context manager.
async with PeerManager(..) as manager:
...
Warning
The class must be initialized with await or inside an async with
statement to correctly configure all async tasks and connections.
manager = await PeerManager(...)
manager.close()
async with PeerManager(...) as manager:
...
Parameters:
-
uuid
(
UUID
)
– UUID of the client.
-
relay_server
(
str
)
– Address of relay server to use for establishing
peer-to-peer connections.
-
name
(
str | None
)
– Readable name of the client to use in logging. If unspecified,
the hostname will be used.
-
timeout
(
int
)
– Timeout in seconds when waiting for a peer or relay server
connection to be established.
-
peer_channels
(
int
)
– number of datachannels to split message sending over
between each peer.
-
verify_certificate
(
bool
)
– Verify the relay server's SSL certificate,
Raises:
Source code in proxystore/p2p/manager.py
| def __init__(
self,
uuid: UUID,
relay_server: str,
name: str | None = None,
*,
timeout: int = 30,
peer_channels: int = 1,
verify_certificate: bool = True,
) -> None:
if not (
relay_server.startswith('ws://')
or relay_server.startswith('wss://')
):
raise ValueError(
'Relay server address must start with ws:// or wss://'
f'Got {relay_server}.',
)
self._uuid = uuid
self._relay_server = relay_server
self._name = name if name is not None else utils.hostname()
self._timeout = timeout
self._peer_channels = peer_channels
self._verify_certificate = verify_certificate
self._peers_lock = asyncio.Lock()
self._peers: dict[frozenset[UUID], PeerConnection] = {}
self._message_queue: asyncio.Queue[
tuple[UUID, bytes | str]
] = asyncio.Queue()
self._server_task: asyncio.Task[None] | None = None
self._tasks: dict[frozenset[UUID], asyncio.Task[None]] = {}
self._relay_server_client_or_none: RelayServerClient | None = None
|
uuid
property
UUID of the peer manager.
name
property
Name of the peer manager.
async_init()
async
Connect to relay server.
Source code in proxystore/p2p/manager.py
| async def async_init(self) -> None:
"""Connect to relay server."""
if self._relay_server_client_or_none is None:
ssl_context = ssl.create_default_context()
if not self._verify_certificate:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
client = RelayServerClient(
address=self._relay_server,
client_uuid=self._uuid,
client_name=self._name,
timeout=self._timeout,
ssl=ssl_context
if self._relay_server.startswith('wss://')
else None,
)
await client.connect()
self._relay_server_client_or_none = client
logger.info(
f'{self._log_prefix}: registered as peer with relay '
f'server at {self._relay_server}',
)
if self._server_task is None:
self._server_task = spawn_guarded_background_task(
self._handle_server_messages,
)
|
close()
async
Close the connection manager.
Source code in proxystore/p2p/manager.py
| async def close(self) -> None:
"""Close the connection manager."""
if self._server_task is not None:
self._server_task.cancel()
try:
await self._server_task
except (asyncio.CancelledError, SafeTaskExitError):
pass
for task in self._tasks.values():
task.cancel()
try:
await task
except (asyncio.CancelledError, SafeTaskExitError):
pass
async with self._peers_lock:
for connection in self._peers.values():
await connection.close()
if self._relay_server_client_or_none is not None:
await self._relay_server_client_or_none.close()
logger.info(f'{self._log_prefix}: peer manager closed')
|
close_connection()
async
close_connection(peers: Iterable[UUID]) -> None
Close a peer connection if it exists.
This will close the associated
PeerConnection
and
cancel the asyncio task handling peer messages. If the
PeerManager
is used to
send a message from the peer again, a new connection will be
established.
Parameters:
Source code in proxystore/p2p/manager.py
| async def close_connection(self, peers: Iterable[UUID]) -> None:
"""Close a peer connection if it exists.
This will close the associated
[`PeerConnection`][proxystore.p2p.connection.PeerConnection] and
cancel the asyncio task handling peer messages. If the
[`PeerManager`][proxystore.p2p.manager.PeerManager] is used to
send a message from the peer again, a new connection will be
established.
Args:
peers: Iterable containing the two peer UUIDs taking part in the
connection that should be closed.
"""
peers = frozenset(peers)
async with self._peers_lock:
connection = self._peers.pop(peers, None)
if connection is not None:
logger.info(
f'{self._log_prefix} Closing connection between peers: '
f'{", ".join(str(peer) for peer in peers)}',
)
await connection.close()
task = self._tasks.pop(peers, None)
if task is not None:
task.cancel()
try:
await task
except (asyncio.CancelledError, SafeTaskExitError):
pass
|
recv()
async
recv() -> tuple[UUID, bytes | str]
Receive next message from a peer.
Returns:
Source code in proxystore/p2p/manager.py
| async def recv(self) -> tuple[UUID, bytes | str]:
"""Receive next message from a peer.
Returns:
Tuple containing the UUID of the peer that sent the message \
and the message itself.
"""
return await self._message_queue.get()
|
send()
async
send(
peer_uuid: UUID,
message: bytes | str,
timeout: float = 30,
) -> None
Send message to peer.
Parameters:
Raises:
Source code in proxystore/p2p/manager.py
| async def send(
self,
peer_uuid: UUID,
message: bytes | str,
timeout: float = 30,
) -> None:
"""Send message to peer.
Args:
peer_uuid: UUID of peer to send message to.
message: Message to send to peer.
timeout: Timeout to wait on peer connection to be ready.
Raises:
PeerConnectionTimeoutError: If the peer connection is not
established within the timeout.
"""
connection = await self.get_connection(peer_uuid)
await connection.send(message, timeout)
|
get_connection()
async
get_connection(peer_uuid: UUID) -> PeerConnection
Get connection to the peer.
Parameters:
Returns:
Source code in proxystore/p2p/manager.py
| async def get_connection(self, peer_uuid: UUID) -> PeerConnection:
"""Get connection to the peer.
Args:
peer_uuid: UUID of peer to make connection with.
Returns:
The peer connection object.
"""
peers = frozenset({self._uuid, peer_uuid})
async with self._peers_lock:
if peers in self._peers:
return self._peers[peers]
connection = PeerConnection(
self._relay_server_client,
channels=self._peer_channels,
)
self._peers[peers] = connection
logger.info(
f'{self._log_prefix}: opening peer connection with '
f'{peer_uuid}',
)
await connection.send_offer(peer_uuid)
self._tasks[peers] = spawn_guarded_background_task(
self._handle_peer_messages,
peer_uuid,
connection,
)
connection.on_close_callback(self.close_connection, peers)
return connection
|