Skip to content

proxystore.p2p.manager

Manager of many peer-to-peer connections.

PeerManager

PeerManager(
    relay_client: RelayClient,
    *,
    timeout: int = 30,
    peer_channels: int = 1
)

Peer Connections Manager.

Handles establishing peer connections via aiortc, responding to requests for new peer connections from the relay server, and sending and receiving data to/from existing peer connections.

Example
from proxystore.p2p.manager import PeerManager
from proxystore.p2p.relay import BasicRelayClient

relay_client = BasicRelayClient(relay_server_address)

pm1 = await PeerManager(relay_client)
pm2 = await PeerManager(relay_client)

await pm1.send(pm2.uuid, 'hello hello')
source_uuid, message = await pm2.recv()
assert source_uuid == pm1.uuid
assert message == 'hello hello'

await pm1.close()
await pm2.close()
Note

The class can also be used as an asynchronous context manager.

async with PeerManager(..) as manager:
    ...

Parameters:

  • relay_client (RelayClient) –

    Established client interface to a relay server.

  • timeout (int, default: 30 ) –

    Timeout in seconds when waiting for a peer connection to be established.

  • peer_channels (int, default: 1 ) –

    number of datachannels to split message sending over between each peer.

Raises:

  • ValueError

    If the relay server address does not start with "ws://" or "wss://".

Source code in proxystore/p2p/manager.py
def __init__(
    self,
    relay_client: RelayClient,
    *,
    timeout: int = 30,
    peer_channels: int = 1,
) -> None:
    self._relay_client = relay_client
    self._timeout = timeout
    self._peer_channels = peer_channels

    self._peers_lock = asyncio.Lock()
    self._peers: dict[frozenset[UUID], PeerConnection] = {}

    self._message_queue: asyncio.Queue[tuple[UUID, bytes | str]] = (
        asyncio.Queue()
    )
    self._server_task: asyncio.Task[None] | None = None
    self._tasks: dict[frozenset[UUID], asyncio.Task[None]] = {}

name property

name: str

Name of client as registered with relay server.

uuid property

uuid: UUID

UUID of client as registered with relay server.

relay_client property

relay_client: RelayClient

Relay client interface.

Raises:

async_init async

async_init() -> None

Connect to relay server and being listening to incoming messages.

Source code in proxystore/p2p/manager.py
async def async_init(self) -> None:
    """Connect to relay server and being listening to incoming messages."""
    await self._relay_client.connect()
    if self._server_task is None:
        self._server_task = spawn_guarded_background_task(
            self._handle_server_messages,
        )
        self._server_task.set_name('peer-manager-server-message-handler')

close async

close() -> None

Close the connection manager.

Warning

This will close all create peer connections and close the connection to the relay server.

Source code in proxystore/p2p/manager.py
async def close(self) -> None:
    """Close the connection manager.

    Warning:
        This will close all create peer connections and close the
        connection to the relay server.
    """
    if self._server_task is not None:
        self._server_task.cancel()
        try:
            await self._server_task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass

    for task in self._tasks.values():
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass

    async with self._peers_lock:
        for connection in self._peers.values():
            await connection.close()

    await self.relay_client.close()
    logger.info(f'{self._log_prefix}: peer manager closed')

close_connection async

close_connection(peers: Iterable[UUID]) -> None

Close a peer connection if it exists.

This will close the associated PeerConnection and cancel the asyncio task handling peer messages. If the PeerManager is used to send a message from the peer again, a new connection will be established.

Parameters:

  • peers (Iterable[UUID]) –

    Iterable containing the two peer UUIDs taking part in the connection that should be closed.

Source code in proxystore/p2p/manager.py
async def close_connection(self, peers: Iterable[UUID]) -> None:
    """Close a peer connection if it exists.

    This will close the associated
    [`PeerConnection`][proxystore.p2p.connection.PeerConnection] and
    cancel the asyncio task handling peer messages. If the
    [`PeerManager`][proxystore.p2p.manager.PeerManager] is used to
    send a message from the peer again, a new connection will be
    established.

    Args:
        peers: Iterable containing the two peer UUIDs taking part in the
            connection that should be closed.
    """
    peers = frozenset(peers)
    async with self._peers_lock:
        connection = self._peers.pop(peers, None)
    if connection is not None:
        logger.info(
            f'{self._log_prefix} Closing connection between peers: '
            f'{", ".join(str(peer) for peer in peers)}',
        )
        await connection.close()
    task = self._tasks.pop(peers, None)
    if task is not None:
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass

recv async

recv() -> tuple[UUID, bytes | str]

Receive next message from a peer.

Returns:

  • tuple[UUID, bytes | str]

    Tuple containing the UUID of the peer that sent the message and the message itself.

Source code in proxystore/p2p/manager.py
async def recv(self) -> tuple[UUID, bytes | str]:
    """Receive next message from a peer.

    Returns:
        Tuple containing the UUID of the peer that sent the message \
        and the message itself.
    """
    return await self._message_queue.get()

send async

send(
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None

Send message to peer.

Parameters:

  • peer_uuid (UUID) –

    UUID of peer to send message to.

  • message (bytes | str) –

    Message to send to peer.

  • timeout (float, default: 30 ) –

    Timeout to wait on peer connection to be ready.

Raises:

Source code in proxystore/p2p/manager.py
async def send(
    self,
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None:
    """Send message to peer.

    Args:
        peer_uuid: UUID of peer to send message to.
        message: Message to send to peer.
        timeout: Timeout to wait on peer connection to be ready.

    Raises:
        PeerConnectionTimeoutError: If the peer connection is not
            established within the timeout.
    """
    connection = await self.get_connection(peer_uuid)
    await connection.send(message, timeout)

get_connection async

get_connection(peer_uuid: UUID) -> PeerConnection

Get connection to the peer.

Parameters:

  • peer_uuid (UUID) –

    UUID of peer to make connection with.

Returns:

Source code in proxystore/p2p/manager.py
async def get_connection(self, peer_uuid: UUID) -> PeerConnection:
    """Get connection to the peer.

    Args:
        peer_uuid: UUID of peer to make connection with.

    Returns:
        The peer connection object.
    """
    peers = frozenset({self.uuid, peer_uuid})

    async with self._peers_lock:
        if peers in self._peers:
            return self._peers[peers]

        connection = PeerConnection(
            self.relay_client,
            channels=self._peer_channels,
        )
        self._peers[peers] = connection

    logger.info(
        f'{self._log_prefix}: opening peer connection with '
        f'{peer_uuid}',
    )
    await connection.send_offer(peer_uuid)

    self._tasks[peers] = spawn_guarded_background_task(
        self._handle_peer_messages,
        peer_uuid,
        connection,
    )
    self._tasks[peers].set_name(
        f'handle-peer-messages-{self.uuid}-{peer_uuid}',
    )

    connection.on_close_callback(self.close_connection, peers)
    return connection