PeerConnection(
uuid: UUID,
name: str,
websocket: WebSocketClientProtocol,
*,
channels: int = 1
) -> None
Peer-to-peer connection.
Interface for establishing a peer-to-peer connection via WebRTC
aiortc and
sending/receiving messages between the two peers. The peer-to-peer
connection is established using a central and publicly accessible
signaling server.
Warning
Applications should prefer using the
PeerManager
rather than using
the PeerConnection
class.
Example
from proxystore.p2p.connection import PeerConnection
from proxystore.p2p.messages import decode
from proxystore.p2p.server import connect
uuid1, name1, websocket1 = await connect(signaling_server_address)
connection1 = PeerConnection(uuid1, name1, websocket1)
uuid2, name2, websocket2 = await connect(signaling_server_address)
connection2 = PeerConnection(uuid2, name2, websocket2)
await connection1.send_offer(uuid2)
offer = decode(await websocket2.recv())
await connection2.handle_server_message(offer)
answer = decode(await websocket1.recv())
await connection1.handle_server_message(answer)
await connection1.ready()
await connection2.ready()
await connection1.send('hello')
assert await connection2.recv() == 'hello'
await connection2.send('hello hello')
assert await connection1.recv() == 'hello hello'
await websocket1.close()
await websocket2.close()
await connection1.close()
await connection2.close()
Parameters:
-
uuid
(
UUID
)
– UUID of this client.
-
name
(
str
)
– Readable name of this client for logging.
-
websocket
(
WebSocketClientProtocol
)
– Websocket connection to the signaling server.
-
channels
(
int
)
– Number of datachannels to open with peer.
Source code in proxystore/p2p/connection.py
| def __init__(
self,
uuid: UUID,
name: str,
websocket: WebSocketClientProtocol,
*,
channels: int = 1,
) -> None:
self._uuid = uuid
self._name = name
self._websocket = websocket
self._max_channels = channels
self._handshake_success: asyncio.Future[
bool
] = asyncio.get_running_loop().create_future()
self._pc = RTCPeerConnection()
self._incoming_queue: asyncio.Queue[bytes | str] = asyncio.Queue()
self._incoming_chunks: dict[int, list[Chunk]] = defaultdict(list)
# Max size of unsigned long (4 bytes) is 2^32 - 1
self._message_counter = AtomicCounter(size=2**32 - 1)
# Used by offerer to count how many of the channels it opened are ready
self._ready = 0
self._channels: dict[str, RTCDataChannel] = {}
self._channel_buffer_low: dict[str, asyncio.Event] = {}
self._peer_uuid: UUID | None = None
self._peer_name: str | None = None
|
state
property
Get the current connection state.
Returns:
-
str
– One of 'connected', 'connecting', 'closed', 'failed', or 'new'.
close
async
Terminate the peer connection.
Source code in proxystore/p2p/connection.py
| async def close(self) -> None:
"""Terminate the peer connection."""
logger.info(f'{self._log_prefix}: closing connection')
# Flush send buffers before close
# https://github.com/aiortc/aiortc/issues/547
for channel in self._channels.values():
transport = channel._RTCDataChannel__transport
await transport._data_channel_flush()
await transport._transmit()
await self._pc.close()
|
send
async
send(message: bytes | str, timeout: float = 30) -> None
Send message to peer.
Parameters:
Raises:
Source code in proxystore/p2p/connection.py
| async def send(self, message: bytes | str, timeout: float = 30) -> None:
"""Send message to peer.
Args:
message: Message to send to peer.
timeout: Timeout to wait on peer connection to be ready.
Raises:
PeerConnectionTimeoutError: If the peer connection is not
established within the timeout.
"""
await self.ready(timeout)
chunk_size = (
MAX_CHUNK_SIZE_STRING
if isinstance(message, str)
else MAX_CHUNK_SIZE_BYTES
)
message_id = self._message_counter.increment()
channel_names = list(self._channels.keys())
for i, chunk in enumerate(chunkify(message, chunk_size, message_id)):
channel_name = channel_names[i % len(channel_names)]
channel = self._channels[channel_name]
buffer_low = self._channel_buffer_low[channel_name]
if channel.bufferedAmount > channel.bufferedAmountLowThreshold:
await buffer_low.wait()
buffer_low.clear()
channel.send(bytes(chunk))
logger.debug(f'{self._log_prefix}: sending message to peer')
|
recv
async
Receive next message from peer.
Returns:
Source code in proxystore/p2p/connection.py
| async def recv(self) -> bytes | str:
"""Receive next message from peer.
Returns:
Message received from peer.
"""
return await self._incoming_queue.get()
|
send_offer
async
send_offer(peer_uuid: UUID) -> None
Send offer for peering via signaling server.
Parameters:
Source code in proxystore/p2p/connection.py
| async def send_offer(self, peer_uuid: UUID) -> None:
"""Send offer for peering via signaling server.
Args:
peer_uuid: UUID of peer client to establish connection with.
"""
for i in range(self._max_channels):
label = f'p2p-{i}-{self._max_channels}'
channel = self._pc.createDataChannel(label, ordered=False)
buffer_low = asyncio.Event()
channel.on('open', self._on_open)
channel.on('bufferedamountlow', buffer_low.set)
channel.on('message', self._on_message)
self._channels[label] = channel
self._channel_buffer_low[label] = buffer_low
await self._pc.setLocalDescription(await self._pc.createOffer())
message = messages.PeerConnection(
source_uuid=self._uuid,
source_name=self._name,
peer_uuid=peer_uuid,
description_type='offer',
description=object_to_string(self._pc.localDescription),
)
message_str = messages.encode(message)
logger.info(f'{self._log_prefix}: sending offer to {peer_uuid}')
await self._websocket.send(message_str)
|
send_answer
async
send_answer(peer_uuid: UUID) -> None
Send answer to peering request via signaling server.
Parameters:
Source code in proxystore/p2p/connection.py
| async def send_answer(self, peer_uuid: UUID) -> None:
"""Send answer to peering request via signaling server.
Args:
peer_uuid: UUID of peer client that sent the initial offer.
"""
@self._pc.on('datachannel')
def on_datachannel(channel: RTCDataChannel) -> None:
logger.info(f'{self._log_prefix}: peer channel established')
# TODO: note this is first channel opened
match = re.search(r'(\d+)-(\d+)$', channel.label)
if match is None:
raise AssertionError(
f'Got mislabled datachannel {channel.label}',
)
total = int(match.group(2))
buffer_low = asyncio.Event()
self._channels[channel.label] = channel
self._channel_buffer_low[channel.label] = buffer_low
channel.on('bufferedamountlow', buffer_low.set)
channel.on('message', self._on_message)
if len(self._channels) >= total:
self._handshake_success.set_result(True)
await self._pc.setLocalDescription(await self._pc.createAnswer())
message = messages.PeerConnection(
source_uuid=self._uuid,
source_name=self._name,
peer_uuid=peer_uuid,
description_type='answer',
description=object_to_string(self._pc.localDescription),
)
message_str = messages.encode(message)
logger.info(f'{self._log_prefix}: sending answer to {peer_uuid}')
await self._websocket.send(message_str)
|
handle_server_message
async
handle_server_message(
message: messages.PeerConnection,
) -> None
Handle message from the signaling server.
Parameters:
Source code in proxystore/p2p/connection.py
| async def handle_server_message(
self,
message: messages.PeerConnection,
) -> None:
"""Handle message from the signaling server.
Args:
message: Message received from the signaling server.
"""
if message.error is not None:
self._handshake_success.set_exception(
PeerConnectionError(
'Received error message from signaling server: '
f'{str(message.error)}',
),
)
return
if message.description_type == 'offer':
logger.info(
f'{self._log_prefix}: received offer from '
f'{message.source_uuid} ({message.source_name})',
)
obj = object_from_string(message.description)
elif message.description_type == 'answer':
logger.info(
f'{self._log_prefix}: received answer from '
f'{message.source_uuid} ({message.source_name})',
)
obj = object_from_string(message.description)
else:
raise AssertionError(
'P2P connection message does not contain either an offer or '
'an answer',
)
if isinstance(obj, RTCSessionDescription):
await self._pc.setRemoteDescription(obj)
self._peer_uuid = message.source_uuid
self._peer_name = message.source_name
if obj.type == 'offer':
await self.send_answer(message.source_uuid)
elif isinstance(obj, RTCIceCandidate): # pragma: no cover
# We should not receive an RTCIceCandidate message via the
# signaling server but this is here following the aiortc example.
# https://github.com/aiortc/aiortc/blob/713fb644b95328f8ec1ac2cbb54def0424cc6645/examples/datachannel-cli/cli.py#L30 # noqa: E501
await self._pc.addIceCandidate(obj)
elif obj is BYE: # pragma: no cover
raise AssertionError('received BYE message')
else:
raise AssertionError('received unknown message')
|
ready
async
ready(timeout: float | None = None) -> None
Wait for connection to be ready.
Parameters:
Raises:
Source code in proxystore/p2p/connection.py
| async def ready(self, timeout: float | None = None) -> None:
"""Wait for connection to be ready.
Args:
timeout: The maximum time in seconds to wait for
the peer connection to establish. If None, block until
the connection is established.
Raises:
PeerConnectionTimeoutError: If the connection is not ready within
the timeout.
PeerConnectionError: If there is an error establishing the peer
connection.
"""
try:
await asyncio.wait_for(self._handshake_success, timeout)
except asyncio.TimeoutError as e:
raise PeerConnectionTimeoutError(
'Timeout waiting for peer to peer connection to establish '
f'in {self._log_prefix}.',
) from e
|