Skip to content

proxystore.p2p.manager

Manager of many peer-to-peer connections.

PeerManager

PeerManager(
    uuid: UUID,
    relay_server: str,
    name: str | None = None,
    *,
    timeout: int = 30,
    peer_channels: int = 1,
    verify_certificate: bool = True
) -> None

Peer Connections Manager.

Handles establishing peer connections via aiortc, responding to requests for new peer connections from the relay server, and sending and receiving data to/from existing peer connections.

Example
from proxystore.p2p.manager import PeerManager

pm1 = await PeerManager(uuid.uuid4(), relay_server_address)
pm2 = await PeerManager(uuid.uuid4(), relay_server_address)

await pm1.send(pm2.uuid, 'hello hello')
source_uuid, message = await pm2.recv()
assert source_uuid == pm1.uuid
assert message == 'hello hello'

pm1.close()
pm2.close()
Note

The class can also be used as a context manager.

async with PeerManager(..) as manager:
    ...
Warning

The class must be initialized with await or inside an async with statement to correctly configure all async tasks and connections.

manager = await PeerManager(...)
manager.close()
async with PeerManager(...) as manager:
    ...

Parameters:

  • uuid (UUID) –

    UUID of the client.

  • relay_server (str) –

    Address of relay server to use for establishing peer-to-peer connections.

  • name (str | None) –

    Readable name of the client to use in logging. If unspecified, the hostname will be used.

  • timeout (int) –

    Timeout in seconds when waiting for a peer or relay server connection to be established.

  • peer_channels (int) –

    number of datachannels to split message sending over between each peer.

  • verify_certificate (bool) –

    Verify the relay server's SSL certificate,

Raises:

  • ValueError

    If the relay server address does not start with "ws://" or "wss://".

Source code in proxystore/p2p/manager.py
def __init__(
    self,
    uuid: UUID,
    relay_server: str,
    name: str | None = None,
    *,
    timeout: int = 30,
    peer_channels: int = 1,
    verify_certificate: bool = True,
) -> None:
    if not (
        relay_server.startswith('ws://')
        or relay_server.startswith('wss://')
    ):
        raise ValueError(
            'Relay server address must start with ws:// or wss://'
            f'Got {relay_server}.',
        )
    self._uuid = uuid
    self._relay_server = relay_server
    self._name = name if name is not None else utils.hostname()
    self._timeout = timeout
    self._peer_channels = peer_channels
    self._verify_certificate = verify_certificate

    self._peers_lock = asyncio.Lock()
    self._peers: dict[frozenset[UUID], PeerConnection] = {}

    self._message_queue: asyncio.Queue[
        tuple[UUID, bytes | str]
    ] = asyncio.Queue()
    self._server_task: asyncio.Task[None] | None = None
    self._tasks: dict[frozenset[UUID], asyncio.Task[None]] = {}
    self._relay_server_client_or_none: RelayServerClient | None = None

uuid property

uuid: UUID

UUID of the peer manager.

name property

name: str

Name of the peer manager.

async_init() async

async_init() -> None

Connect to relay server.

Source code in proxystore/p2p/manager.py
async def async_init(self) -> None:
    """Connect to relay server."""
    if self._relay_server_client_or_none is None:
        ssl_context = ssl.create_default_context()
        if not self._verify_certificate:
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

        client = RelayServerClient(
            address=self._relay_server,
            client_uuid=self._uuid,
            client_name=self._name,
            timeout=self._timeout,
            ssl=ssl_context
            if self._relay_server.startswith('wss://')
            else None,
        )
        await client.connect()

        self._relay_server_client_or_none = client
        logger.info(
            f'{self._log_prefix}: registered as peer with relay '
            f'server at {self._relay_server}',
        )
    if self._server_task is None:
        self._server_task = spawn_guarded_background_task(
            self._handle_server_messages,
        )

close() async

close() -> None

Close the connection manager.

Source code in proxystore/p2p/manager.py
async def close(self) -> None:
    """Close the connection manager."""
    if self._server_task is not None:
        self._server_task.cancel()
        try:
            await self._server_task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass
    for task in self._tasks.values():
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass
    async with self._peers_lock:
        for connection in self._peers.values():
            await connection.close()
    if self._relay_server_client_or_none is not None:
        await self._relay_server_client_or_none.close()
    logger.info(f'{self._log_prefix}: peer manager closed')

close_connection() async

close_connection(peers: Iterable[UUID]) -> None

Close a peer connection if it exists.

This will close the associated PeerConnection and cancel the asyncio task handling peer messages. If the PeerManager is used to send a message from the peer again, a new connection will be established.

Parameters:

  • peers (Iterable[UUID]) –

    Iterable containing the two peer UUIDs taking part in the connection that should be closed.

Source code in proxystore/p2p/manager.py
async def close_connection(self, peers: Iterable[UUID]) -> None:
    """Close a peer connection if it exists.

    This will close the associated
    [`PeerConnection`][proxystore.p2p.connection.PeerConnection] and
    cancel the asyncio task handling peer messages. If the
    [`PeerManager`][proxystore.p2p.manager.PeerManager] is used to
    send a message from the peer again, a new connection will be
    established.

    Args:
        peers: Iterable containing the two peer UUIDs taking part in the
            connection that should be closed.
    """
    peers = frozenset(peers)
    async with self._peers_lock:
        connection = self._peers.pop(peers, None)
    if connection is not None:
        logger.info(
            f'{self._log_prefix} Closing connection between peers: '
            f'{", ".join(str(peer) for peer in peers)}',
        )
        await connection.close()
    task = self._tasks.pop(peers, None)
    if task is not None:
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass

recv() async

recv() -> tuple[UUID, bytes | str]

Receive next message from a peer.

Returns:

  • tuple[UUID, bytes | str]

    Tuple containing the UUID of the peer that sent the message and the message itself.

Source code in proxystore/p2p/manager.py
async def recv(self) -> tuple[UUID, bytes | str]:
    """Receive next message from a peer.

    Returns:
        Tuple containing the UUID of the peer that sent the message \
        and the message itself.
    """
    return await self._message_queue.get()

send() async

send(
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None

Send message to peer.

Parameters:

  • peer_uuid (UUID) –

    UUID of peer to send message to.

  • message (bytes | str) –

    Message to send to peer.

  • timeout (float) –

    Timeout to wait on peer connection to be ready.

Raises:

Source code in proxystore/p2p/manager.py
async def send(
    self,
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None:
    """Send message to peer.

    Args:
        peer_uuid: UUID of peer to send message to.
        message: Message to send to peer.
        timeout: Timeout to wait on peer connection to be ready.

    Raises:
        PeerConnectionTimeoutError: If the peer connection is not
            established within the timeout.
    """
    connection = await self.get_connection(peer_uuid)
    await connection.send(message, timeout)

get_connection() async

get_connection(peer_uuid: UUID) -> PeerConnection

Get connection to the peer.

Parameters:

  • peer_uuid (UUID) –

    UUID of peer to make connection with.

Returns:

Source code in proxystore/p2p/manager.py
async def get_connection(self, peer_uuid: UUID) -> PeerConnection:
    """Get connection to the peer.

    Args:
        peer_uuid: UUID of peer to make connection with.

    Returns:
        The peer connection object.
    """
    peers = frozenset({self._uuid, peer_uuid})

    async with self._peers_lock:
        if peers in self._peers:
            return self._peers[peers]

        connection = PeerConnection(
            self._relay_server_client,
            channels=self._peer_channels,
        )
        self._peers[peers] = connection

    logger.info(
        f'{self._log_prefix}: opening peer connection with '
        f'{peer_uuid}',
    )
    await connection.send_offer(peer_uuid)

    self._tasks[peers] = spawn_guarded_background_task(
        self._handle_peer_messages,
        peer_uuid,
        connection,
    )
    connection.on_close_callback(self.close_connection, peers)
    return connection