PeerManager(
    uuid: UUID,
    relay_server: str,
    name: str | None = None,
    *,
    timeout: int = 30,
    peer_channels: int = 1,
    verify_certificate: bool = True
) -> None
 
  
  
      Peer Connections Manager.
Handles establishing peer connections via
aiortc, responding to
requests for new peer connections from the relay server, and sending
and receiving data to/from existing peer connections.
  Example
  from proxystore.p2p.manager import PeerManager
pm1 = await PeerManager(uuid.uuid4(), relay_server_address)
pm2 = await PeerManager(uuid.uuid4(), relay_server_address)
await pm1.send(pm2.uuid, 'hello hello')
source_uuid, message = await pm2.recv()
assert source_uuid == pm1.uuid
assert message == 'hello hello'
pm1.close()
pm2.close()
 
 
  Note
  The class can also be used as a context manager.
async with PeerManager(..) as manager:
    ...
 
 
  Warning
  The class must be initialized with await or inside an async with
statement to correctly configure all async tasks and connections.
manager = await PeerManager(...)
manager.close()
 
async with PeerManager(...) as manager:
    ...
 
 
  Parameters:
  
      - 
        uuid
            (
UUID)
        – UUID of the client.
       
      - 
        relay_server
            (
str)
        – Address of relay server to use for establishing
peer-to-peer connections.
       
      - 
        name
            (
str | None)
        – Readable name of the client to use in logging. If unspecified,
the hostname will be used.
       
      - 
        timeout
            (
int)
        – Timeout in seconds when waiting for a peer or relay server
connection to be established.
       
      - 
        peer_channels
            (
int)
        – number of datachannels to split message sending over
between each peer.
       
      - 
        verify_certificate
            (
bool)
        – Verify the relay server's SSL certificate,
       
  
  Raises:
  
          
            Source code in proxystore/p2p/manager.py
             | def __init__(
    self,
    uuid: UUID,
    relay_server: str,
    name: str | None = None,
    *,
    timeout: int = 30,
    peer_channels: int = 1,
    verify_certificate: bool = True,
) -> None:
    if not (
        relay_server.startswith('ws://')
        or relay_server.startswith('wss://')
    ):
        raise ValueError(
            'Relay server address must start with ws:// or wss://'
            f'Got {relay_server}.',
        )
    self._uuid = uuid
    self._relay_server = relay_server
    self._name = name if name is not None else utils.hostname()
    self._timeout = timeout
    self._peer_channels = peer_channels
    self._verify_certificate = verify_certificate
    self._peers_lock = asyncio.Lock()
    self._peers: dict[frozenset[UUID], PeerConnection] = {}
    self._message_queue: asyncio.Queue[
        tuple[UUID, bytes | str]
    ] = asyncio.Queue()
    self._server_task: asyncio.Task[None] | None = None
    self._tasks: dict[frozenset[UUID], asyncio.Task[None]] = {}
    self._relay_server_client_or_none: RelayServerClient | None = None
  | 
 
           
  
  
        uuid
  
  
      property
  
  
  
      UUID of the peer manager.
   
 
        name
  
  
      property
  
  
  
      Name of the peer manager.
   
 
        async_init()
  
  
      async
  
  
  
      Connect to relay server.
      
        Source code in proxystore/p2p/manager.py
         | async def async_init(self) -> None:
    """Connect to relay server."""
    if self._relay_server_client_or_none is None:
        ssl_context = ssl.create_default_context()
        if not self._verify_certificate:
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE
        client = RelayServerClient(
            address=self._relay_server,
            client_uuid=self._uuid,
            client_name=self._name,
            timeout=self._timeout,
            ssl=ssl_context
            if self._relay_server.startswith('wss://')
            else None,
        )
        await client.connect()
        self._relay_server_client_or_none = client
        logger.info(
            f'{self._log_prefix}: registered as peer with relay '
            f'server at {self._relay_server}',
        )
    if self._server_task is None:
        self._server_task = spawn_guarded_background_task(
            self._handle_server_messages,
        )
  | 
 
       
   
 
        close()
  
  
      async
  
  
  
      Close the connection manager.
      
        Source code in proxystore/p2p/manager.py
         | async def close(self) -> None:
    """Close the connection manager."""
    if self._server_task is not None:
        self._server_task.cancel()
        try:
            await self._server_task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass
    for task in self._tasks.values():
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass
    async with self._peers_lock:
        for connection in self._peers.values():
            await connection.close()
    if self._relay_server_client_or_none is not None:
        await self._relay_server_client_or_none.close()
    logger.info(f'{self._log_prefix}: peer manager closed')
  | 
 
       
   
 
        close_connection()
  
  
      async
  
close_connection(peers: Iterable[UUID]) -> None
 
  
  
      Close a peer connection if it exists.
This will close the associated
PeerConnection and
cancel the asyncio task handling peer messages. If the
PeerManager is used to
send a message from the peer again, a new connection will be
established.
  Parameters:
  
      
        Source code in proxystore/p2p/manager.py
         | async def close_connection(self, peers: Iterable[UUID]) -> None:
    """Close a peer connection if it exists.
    This will close the associated
    [`PeerConnection`][proxystore.p2p.connection.PeerConnection] and
    cancel the asyncio task handling peer messages. If the
    [`PeerManager`][proxystore.p2p.manager.PeerManager] is used to
    send a message from the peer again, a new connection will be
    established.
    Args:
        peers: Iterable containing the two peer UUIDs taking part in the
            connection that should be closed.
    """
    peers = frozenset(peers)
    async with self._peers_lock:
        connection = self._peers.pop(peers, None)
    if connection is not None:
        logger.info(
            f'{self._log_prefix} Closing connection between peers: '
            f'{", ".join(str(peer) for peer in peers)}',
        )
        await connection.close()
    task = self._tasks.pop(peers, None)
    if task is not None:
        task.cancel()
        try:
            await task
        except (asyncio.CancelledError, SafeTaskExitError):
            pass
  | 
 
       
   
 
        recv()
  
  
      async
  
recv() -> tuple[UUID, bytes | str]
 
  
  
      Receive next message from a peer.
  Returns:
  
      
        Source code in proxystore/p2p/manager.py
         | async def recv(self) -> tuple[UUID, bytes | str]:
    """Receive next message from a peer.
    Returns:
        Tuple containing the UUID of the peer that sent the message \
        and the message itself.
    """
    return await self._message_queue.get()
  | 
 
       
   
 
        send()
  
  
      async
  
send(
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None
 
  
  
      Send message to peer.
  Parameters:
  
  Raises:
  
      
        Source code in proxystore/p2p/manager.py
         | async def send(
    self,
    peer_uuid: UUID,
    message: bytes | str,
    timeout: float = 30,
) -> None:
    """Send message to peer.
    Args:
        peer_uuid: UUID of peer to send message to.
        message: Message to send to peer.
        timeout: Timeout to wait on peer connection to be ready.
    Raises:
        PeerConnectionTimeoutError: If the peer connection is not
            established within the timeout.
    """
    connection = await self.get_connection(peer_uuid)
    await connection.send(message, timeout)
  | 
 
       
   
 
        get_connection()
  
  
      async
  
get_connection(peer_uuid: UUID) -> PeerConnection
 
  
  
      Get connection to the peer.
  Parameters:
  
  Returns:
  
      
        Source code in proxystore/p2p/manager.py
         | async def get_connection(self, peer_uuid: UUID) -> PeerConnection:
    """Get connection to the peer.
    Args:
        peer_uuid: UUID of peer to make connection with.
    Returns:
        The peer connection object.
    """
    peers = frozenset({self._uuid, peer_uuid})
    async with self._peers_lock:
        if peers in self._peers:
            return self._peers[peers]
        connection = PeerConnection(
            self._relay_server_client,
            channels=self._peer_channels,
        )
        self._peers[peers] = connection
    logger.info(
        f'{self._log_prefix}: opening peer connection with '
        f'{peer_uuid}',
    )
    await connection.send_offer(peer_uuid)
    self._tasks[peers] = spawn_guarded_background_task(
        self._handle_peer_messages,
        peer_uuid,
        connection,
    )
    connection.on_close_callback(self.close_connection, peers)
    return connection
  |