Skip to content

proxystore.p2p.relay.client

Client interface to a relay server.

RelayClient

RelayClient(
    address: str,
    *,
    client_name: str | None = None,
    client_uuid: UUID | None = None,
    extra_headers: dict[str, str] | None = None,
    reconnect_task: bool = True,
    ssl_context: SSLContext | None = None,
    timeout: float = 10,
    verify_certificate: bool = True
)

Client interface to a relay server.

This interface abstracts the low-level WebSocket connection to a relay server to provide automatic reconnection.

Tip

This class can be used as an async context manager!

from proxystore.p2p.relay.client import RelayClient

async with RelayClient(...) as client:
    await client.send(...)
    message = await client.recv(...)

Note

WebSocket connections are not opened until a message is sent, a message is received, or connect() is called. Initializing the client with await will call connect().

client = await RelayClient(...)

Parameters:

  • address (str) –

    Address of the relay server. Should start with ws:// or wss://.

  • client_name (str | None, default: None ) –

    Optional name of the client to use when registering with the relay server. If None, the hostname will be used.

  • client_uuid (UUID | None, default: None ) –

    Optional UUID of the client to use when registering with the relay server. If None, one will be generated.

  • extra_headers (dict[str, str] | None, default: None ) –

    Arbitrary HTTP headers to add to the handshake request. If connecting to a relay server with authentication, such as one using the GlobusAuthenticator, the headers should include the Authorization header containing the bearer token.

  • reconnect_task (bool, default: True ) –

    Spawn a background task which will automatically reconnect to the relay server when the websocket client closes. Otherwise, reconnections will only be attempted when sending or receiving a message.

  • ssl_context (SSLContext | None, default: None ) –

    Custom SSL context to pass to websockets.connect(). A TLS context is created with ssl.create_default_context() when connecting to a wss:// URI and ssl_context is not provided.

  • timeout (float, default: 10 ) –

    Time to wait in seconds on relay server connection.

  • verify_certificate (bool, default: True ) –

    Verify the relay server's SSL certificate. Only used if ssl_context is None and connecting to a wss:// URI.

Raises:

  • RelayRegistrationError

    If the connection to the relay server is closed, does not reply to the registration request within the timeout, or replies with an error.

  • ValueError

    If address does not start with ws:// or wss://.

Source code in proxystore/p2p/relay/client.py
def __init__(
    self,
    address: str,
    *,
    client_name: str | None = None,
    client_uuid: uuid.UUID | None = None,
    extra_headers: dict[str, str] | None = None,
    reconnect_task: bool = True,
    ssl_context: ssl.SSLContext | None = None,
    timeout: float = 10,
    verify_certificate: bool = True,
) -> None:
    if not (address.startswith('ws://') or address.startswith('wss://')):
        raise ValueError(
            'Relay server address must start with ws:// or wss://.'
            f'Got {address}.',
        )

    self._address = address
    self._name = hostname() if client_name is None else client_name
    self._uuid = uuid.uuid4() if client_uuid is None else client_uuid
    self._timeout = timeout

    if self._address.startswith('wss://') and ssl_context is None:
        ssl_context = ssl.create_default_context()
        if not verify_certificate:
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

    self._extra_headers = extra_headers
    self._ssl_context = ssl_context
    self._create_reconnect_task = reconnect_task

    self._initial_backoff_seconds = 1.0
    # HTTP status codes from the relay server that are unrecoverable
    # from when connecting.
    #   - 4001: UnauthorizedError
    #   - 4002: ForbiddenError
    self._unrecoverable_status_codes = (4001, 4002)

    self._connect_lock = asyncio.Lock()
    self._reconnect_task: asyncio.Task[None] | None = None
    self._websocket: WebSocketClientProtocol | None = None

name property

name: str

Name of client as registered with relay server.

uuid property

uuid: UUID

UUID of client as registered with relay server.

websocket property

websocket: WebSocketClientProtocol

Websocket connection to the relay server.

Raises:

connect async

connect(retry: bool = True) -> None

Connect to the relay server.

Note

Typically this does not need to be called because the send and receive methods will automatically call this.

Note

This method is a no-op if a connection is already established. Otherwise, a new connection will be attempted with exponential backoff when retry is True for connection failures.

Parameters:

  • retry (bool, default: True ) –

    Retry the connection with exponential backoff starting at one second and increasing to a max of 60 seconds. Retrying is only performed for certain connection error types: ConnectionRefusedError, TimeoutError, and certain ConnectionClosed types. Specifically, retrying will not be performed if the connection closed status code indicates forbidden or unauthorized errors. These typically cannot be recovered from and must be addressed by the user.

Source code in proxystore/p2p/relay/client.py
async def connect(self, retry: bool = True) -> None:
    """Connect to the relay server.

    Note:
        Typically this does not need to be called because the
        send and receive methods will automatically call this.

    Note:
        This method is a no-op if a connection is already established.
        Otherwise, a new connection will be attempted with
        exponential backoff when `retry` is True for connection failures.

    Args:
        retry: Retry the connection with exponential backoff starting at
            one second and increasing to a max of 60 seconds. Retrying is
            only performed for certain connection error types:
            [`ConnectionRefusedError`][ConnectionRefusedError],
            [`TimeoutError`][asyncio.TimeoutError], and certain
            [`ConnectionClosed`][websockets.exceptions.ConnectionClosed]
            types. Specifically, retrying will not be performed if the
            connection closed status code indicates forbidden or
            unauthorized errors. These typically cannot be recovered
            from and must be addressed by the user.
    """
    async with self._connect_lock:
        if self._websocket is not None and self._websocket.open:
            return

        backoff_seconds = self._initial_backoff_seconds
        while True:
            try:
                self._websocket = await self._register(
                    timeout=self._timeout,
                )
                if (
                    self._reconnect_task is None
                    and self._create_reconnect_task
                ):
                    self._reconnect_task = spawn_guarded_background_task(
                        self._reconnect_on_close,
                    )
                    self._reconnect_task.set_name('relay-client-reconnect')
            except (
                # May occur if relay is unavailable
                ConnectionRefusedError,
                # May occur if relay is too slow to respond
                asyncio.TimeoutError,
                # May occur if client experiences temporary DNS failure
                socket.gaierror,
                websockets.exceptions.ConnectionClosed,
            ) as e:
                if not retry:
                    raise

                if (
                    isinstance(e, websockets.exceptions.ConnectionClosed)
                    and e.code in self._unrecoverable_status_codes
                ):
                    raise

                logger.warning(
                    f'Registration with relay server at {self._address} '
                    f'failed because of {e}. Retrying connection in '
                    f'{backoff_seconds} seconds',
                )
                await asyncio.sleep(backoff_seconds)
                backoff_seconds = min(backoff_seconds * 2, 60)
            else:
                # Coverage doesn't detect the singular break but it does
                # get executed to break from the loop
                break  # pragma: no cover

close async

close() -> None

Close the connection to the relay server.

Source code in proxystore/p2p/relay/client.py
async def close(self) -> None:
    """Close the connection to the relay server."""
    if self._reconnect_task is not None:
        self._reconnect_task.cancel()
        try:
            await self._reconnect_task
        except asyncio.CancelledError:
            pass

    if self._websocket is not None:
        await self._websocket.close()

recv async

recv() -> RelayMessage

Receive the next message.

Returns:

  • RelayMessage

    The message received from the relay server.

Raises:

Source code in proxystore/p2p/relay/client.py
async def recv(self) -> RelayMessage:
    """Receive the next message.

    Returns:
        The message received from the relay server.

    Raises:
        RelayMessageDecodeError: If the message received cannot
            be decoded into the appropriate message type.
    """
    try:
        websocket = self.websocket
    except RelayNotConnectedError:
        await self.connect()
        websocket = self.websocket

    message_str = await websocket.recv()
    if not isinstance(message_str, str):
        raise AssertionError('Received non-string from websocket.')
    return decode_relay_message(message_str)

send async

send(message: RelayMessage) -> None

Send a message.

Parameters:

  • message (RelayMessage) –

    The message to send to the relay server.

Source code in proxystore/p2p/relay/client.py
async def send(self, message: RelayMessage) -> None:
    """Send a message.

    Args:
        message: The message to send to the relay server.
    """
    message_str = encode_relay_message(message)

    try:
        websocket = self.websocket
    except RelayNotConnectedError:
        await self.connect()
        websocket = self.websocket

    await websocket.send(message_str)