Skip to content

proxystore.p2p.manager

Manager of many peer-to-peer connections.

PeerManager

PeerManager(
    uuid: UUID,
    signaling_server: str,
    name: str | None = None,
    *,
    timeout: int = 30,
    peer_channels: int = 1,
    verify_certificate: bool = True
) -> None

Peer Connections Manager.

Handles establishing peer connections via aiortc, responding to requests for new peer connections from the signaling server, and sending and receiving data to/from existing peer connections.

Example
from proxystore.p2p.manager import PeerManager

pm1 = await PeerManager(uuid.uuid4(), signaling_server_address)
pm2 = await PeerManager(uuid.uuid4(), signaling_server_address)

await pm1.send(pm2.uuid, 'hello hello')
source_uuid, message = await pm2.recv()
assert source_uuid == pm1.uuid
assert message == 'hello hello'

pm1.close()
pm2.close()
Note

The class can also be used as a context manager.

async with PeerManager(..) as manager:
    ...
Warning

The class must be initialized with await or inside an async with statement to correctly configure all async tasks and connections.

manager = await PeerManager(...)
manager.close()
async with PeerManager(...) as manager:
    ...

Parameters:

  • uuid (UUID) –

    UUID of the client.

  • signaling_server (str) –

    Address of signaling server to use for establishing peer-to-peer connections.

  • name (str | None) –

    Readable name of the client to use in logging. If unspecified, the hostname will be used.

  • timeout (int) –

    Timeout in seconds when waiting for a peer or signaling server connection to be established.

  • peer_channels (int) –

    number of datachannels to split message sending over between each peer.

  • verify_certificate (bool) –

    Verify the signaling server's SSL certificate,

Raises:

  • ValueError

    If the signaling server address does not start with "ws://" or "wss://".

Source code in proxystore/p2p/manager.py
def __init__(
    self,
    uuid: UUID,
    signaling_server: str,
    name: str | None = None,
    *,
    timeout: int = 30,
    peer_channels: int = 1,
    verify_certificate: bool = True,
) -> None:
    if not (
        signaling_server.startswith('ws://')
        or signaling_server.startswith('wss://')
    ):
        raise ValueError(
            'Signaling server address must start with ws:// or wss://'
            f'Got {signaling_server}.',
        )
    self._uuid = uuid
    self._signaling_server = signaling_server
    self._name = name if name is not None else utils.hostname()
    self._timeout = timeout
    self._peer_channels = peer_channels
    self._verify_certificate = verify_certificate

    self._peers_lock = asyncio.Lock()
    self._peers: dict[frozenset[UUID], PeerConnection] = {}

    self._message_queue: asyncio.Queue[
        tuple[UUID, bytes | str]
    ] = asyncio.Queue()
    self._server_task: asyncio.Task[None] | None = None
    self._tasks: dict[frozenset[UUID], asyncio.Task[None]] = {}
    self._websocket_or_none: WebSocketClientProtocol | None = None

uuid property

uuid: UUID

UUID of the peer manager.

name property

name: str

Name of the peer manager.

async_init async

async_init() -> None

Connect to signaling server.

Source code in proxystore/p2p/manager.py
async def async_init(self) -> None:
    """Connect to signaling server."""
    if self._websocket_or_none is None:
        ssl_context = ssl.create_default_context()
        if not self._verify_certificate:
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

        uuid, _, socket = await connect(
            address=self._signaling_server,
            uuid=self._uuid,
            name=self._name,
            timeout=self._timeout,
            ssl=ssl_context
            if self._signaling_server.startswith('wss://')
            else None,
        )

        if uuid != self._uuid:
            raise PeerRegistrationError(
                'Signaling server responded to registration request '
                f'with non-matching UUID. Received {uuid} but expected '
                f'{self._uuid}.',
            )
        self._websocket_or_none = socket
        logger.info(
            f'{self._log_prefix}: registered as peer with signaling '
            f'server at {self._signaling_server}',
        )
    if self._server_task is None:
        self._server_task = spawn_guarded_background_task(
            self._handle_server_messages,
        )

close async

close() -> None

Close the connection manager.

Source code in proxystore/p2p/manager.py
async def close(self) -> None:
    """Close the connection manager."""
    if self._server_task is not None:
        self._server_task.cancel()
        try:
            await self._server_task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass
    for task in self._tasks.values():
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass
    async with self._peers_lock:
        for connection in self._peers.values():
            await connection.close()
    if self._websocket_or_none is not None:
        await self._websocket_or_none.close()
    logger.info(f'{self._log_prefix}: peer manager closed')

recv async

recv() -> tuple[UUID, bytes | str]

Receive next message from a peer.

Returns:

  • UUID

    Tuple containing the UUID of the peer that sent the message

  • bytes | str

    and the message itself.

Source code in proxystore/p2p/manager.py
async def recv(self) -> tuple[UUID, bytes | str]:
    """Receive next message from a peer.

    Returns:
        Tuple containing the UUID of the peer that sent the message
        and the message itself.
    """
    return await self._message_queue.get()

send async

send(
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None

Send message to peer.

Parameters:

  • peer_uuid (UUID) –

    UUID of peer to send message to.

  • message (bytes | str) –

    Message to send to peer.

  • timeout (float) –

    Timeout to wait on peer connection to be ready.

Raises:

  • PeerConnectionTimeoutError

    If the peer connection is not established within the timeout.

Source code in proxystore/p2p/manager.py
async def send(
    self,
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None:
    """Send message to peer.

    Args:
        peer_uuid: UUID of peer to send message to.
        message: Message to send to peer.
        timeout: Timeout to wait on peer connection to be ready.

    Raises:
        PeerConnectionTimeoutError: If the peer connection is not
            established within the timeout.
    """
    connection = await self.get_connection(peer_uuid)
    await connection.send(message, timeout)

get_connection async

get_connection(peer_uuid: UUID) -> PeerConnection

Get connection to the peer.

Parameters:

  • peer_uuid (UUID) –

    UUID of peer to make connection with.

Returns:

Source code in proxystore/p2p/manager.py
async def get_connection(self, peer_uuid: UUID) -> PeerConnection:
    """Get connection to the peer.

    Args:
        peer_uuid: UUID of peer to make connection with.

    Returns:
        The peer connection object.
    """
    peers = frozenset({self._uuid, peer_uuid})

    async with self._peers_lock:
        if peers in self._peers:
            return self._peers[peers]

        connection = PeerConnection(
            self._uuid,
            self._name,
            self._websocket,
            channels=self._peer_channels,
        )
        self._peers[peers] = connection

    logger.info(
        f'{self._log_prefix}: opening peer connection with '
        f'{peer_uuid}',
    )
    await connection.send_offer(peer_uuid)

    self._tasks[peers] = spawn_guarded_background_task(
        self._handle_peer_messages,
        peer_uuid,
        connection,
    )
    return connection