Skip to content

proxystore.p2p.relay.client

Client interface to a relay server.

RelayClient

RelayClient(
    address: str,
    *,
    client_name: str | None = None,
    client_uuid: UUID | None = None,
    extra_headers: dict[str, str] | None = None,
    reconnect_task: bool = True,
    ssl_context: SSLContext | None = None,
    timeout: float = 10,
    verify_certificate: bool = True
)

Client interface to a relay server.

This interface abstracts the low-level WebSocket connection to a relay server to provide automatic reconnection.

Tip

This class can be used as an async context manager!

from proxystore.p2p.relay.client import RelayClient

async with RelayClient(...) as client:
    await client.send(...)
    message = await client.recv(...)

Note

WebSocket connections are not opened until a message is sent, a message is received, or connect() is called. Initializing the client with await will call connect().

client = await RelayClient(...)

Parameters:

  • address (str) –

    Address of the relay server. Should start with ws:// or wss://.

  • client_name (str | None, default: None ) –

    Optional name of the client to use when registering with the relay server. If None, the hostname will be used.

  • client_uuid (UUID | None, default: None ) –

    Optional UUID of the client to use when registering with the relay server. If None, one will be generated.

  • extra_headers (dict[str, str] | None, default: None ) –

    Arbitrary HTTP headers to add to the handshake request. If connecting to a relay server with authentication, such as one using the GlobusAuthenticator, the headers should include the Authorization header containing the bearer token.

  • reconnect_task (bool, default: True ) –

    Spawn a background task which will automatically reconnect to the relay server when the websocket client closes. Otherwise, reconnections will only be attempted when sending or receiving a message.

  • ssl_context (SSLContext | None, default: None ) –

    Custom SSL context to pass to websockets.connect(). A TLS context is created with ssl.create_default_context() when connecting to a wss:// URI and ssl_context is not provided.

  • timeout (float, default: 10 ) –

    Time to wait in seconds on relay server connection.

  • verify_certificate (bool, default: True ) –

    Verify the relay server's SSL certificate. Only used if ssl_context is None and connecting to a wss:// URI.

Raises:

  • RelayRegistrationError

    If the connection to the relay server is closed, does not reply to the registration request within the timeout, or replies with an error.

  • ValueError

    If address does not start with ws:// or wss://.

Source code in proxystore/p2p/relay/client.py
def __init__(
    self,
    address: str,
    *,
    client_name: str | None = None,
    client_uuid: uuid.UUID | None = None,
    extra_headers: dict[str, str] | None = None,
    reconnect_task: bool = True,
    ssl_context: ssl.SSLContext | None = None,
    timeout: float = 10,
    verify_certificate: bool = True,
) -> None:
    if not (address.startswith('ws://') or address.startswith('wss://')):
        raise ValueError(
            'Relay server address must start with ws:// or wss://.'
            f'Got {address}.',
        )

    self._address = address
    self._name = hostname() if client_name is None else client_name
    self._uuid = uuid.uuid4() if client_uuid is None else client_uuid
    self._timeout = timeout

    if self._address.startswith('wss://') and ssl_context is None:
        ssl_context = ssl.create_default_context()
        if not verify_certificate:
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

    self._extra_headers = extra_headers
    self._ssl_context = ssl_context
    self._create_reconnect_task = reconnect_task

    self._initial_backoff_seconds = 1.0

    self._connect_lock = asyncio.Lock()
    self._reconnect_task: asyncio.Task[None] | None = None
    self._websocket: WebSocketClientProtocol | None = None

name property

name: str

Name of client as registered with relay server.

uuid property

uuid: UUID

UUID of client as registered with relay server.

websocket property

Websocket connection to the relay server.

Raises:

connect() async

connect(retry: bool = True) -> None

Connect to the relay server.

Note

Typically this does not need to be called because the send and receive methods will automatically call this.

Note

This method is a no-op if a connection is already established. Otherwise, a new connection will be attempted with exponential backoff when retry is True for connection failures.

Parameters:

  • retry (bool, default: True ) –

    Retry the connection with exponential backoff starting at one second and increasing to a max of 60 seconds.

Source code in proxystore/p2p/relay/client.py
async def connect(self, retry: bool = True) -> None:
    """Connect to the relay server.

    Note:
        Typically this does not need to be called because the
        send and receive methods will automatically call this.

    Note:
        This method is a no-op if a connection is already established.
        Otherwise, a new connection will be attempted with
        exponential backoff when `retry` is True for connection failures.

    Args:
        retry: Retry the connection with exponential backoff starting at
            one second and increasing to a max of 60 seconds.
    """
    async with self._connect_lock:
        if self._websocket is not None and self._websocket.open:
            return

        backoff_seconds = self._initial_backoff_seconds
        while True:
            try:
                self._websocket = await self._register(
                    timeout=self._timeout,
                )
                if (
                    self._reconnect_task is None
                    and self._create_reconnect_task
                ):
                    self._reconnect_task = spawn_guarded_background_task(
                        self._reconnect_on_close,
                    )
                    self._reconnect_task.set_name('relay-client-reconnect')
            except (
                # Exceptions that we should wait and retry again for
                ConnectionRefusedError,
                asyncio.TimeoutError,
                websockets.exceptions.ConnectionClosed,
            ) as e:
                if not retry:
                    raise

                logger.warning(
                    f'Registration with relay server at {self._address} '
                    f'failed because of {e}. Retrying connection in '
                    f'{backoff_seconds} seconds',
                )
                await asyncio.sleep(backoff_seconds)
                backoff_seconds = min(backoff_seconds * 2, 60)
            else:
                # Coverage doesn't detect the singular break but it does
                # get executed to break from the loop
                break  # pragma: no cover

close() async

close() -> None

Close the connection to the relay server.

Source code in proxystore/p2p/relay/client.py
async def close(self) -> None:
    """Close the connection to the relay server."""
    if self._reconnect_task is not None:
        self._reconnect_task.cancel()
        try:
            await self._reconnect_task
        except asyncio.CancelledError:
            pass

    if self._websocket is not None:
        await self._websocket.close()

recv() async

recv() -> RelayMessage

Receive the next message.

Returns:

  • RelayMessage

    The message received from the relay server.

Raises:

Source code in proxystore/p2p/relay/client.py
async def recv(self) -> RelayMessage:
    """Receive the next message.

    Returns:
        The message received from the relay server.

    Raises:
        RelayMessageDecodeError: If the message received cannot
            be decoded into the appropriate message type.
    """
    try:
        websocket = self.websocket
    except RelayNotConnectedError:
        await self.connect()
        websocket = self.websocket

    message_str = await websocket.recv()
    if not isinstance(message_str, str):
        raise AssertionError('Received non-string from websocket.')
    return decode_relay_message(message_str)

send() async

send(message: RelayMessage) -> None

Send a message.

Parameters:

  • message (RelayMessage) –

    The message to send to the relay server.

Source code in proxystore/p2p/relay/client.py
async def send(self, message: RelayMessage) -> None:
    """Send a message.

    Args:
        message: The message to send to the relay server.
    """
    message_str = encode_relay_message(message)

    try:
        websocket = self.websocket
    except RelayNotConnectedError:
        await self.connect()
        websocket = self.websocket

    await websocket.send(message_str)