Skip to content

proxystore.stream.shims.zmq

ZeroMQ pub/sub interface.

Note

Unlike some of the other shims that simply interface with a third-party message broker system, here the subscriber connects directly to the publisher. This means that if the publisher is not alive when creating the subscriber, the subscriber will fail.

ZeroMQPublisher

ZeroMQPublisher(address: str, port: int)

ZeroMQ publisher interface.

Parameters:

  • address (str) –

    Address to bind to. The full address bound to will be 'tcp://{address}:{port}'.

  • port (int) –

    Port to bind to.

Source code in proxystore/stream/shims/zmq.py
def __init__(self, address: str, port: int) -> None:
    self._context = zmq.Context()
    self._socket = self._context.socket(zmq.PUB)
    self._socket.bind(f'tcp://{address}:{port}')

close

close() -> None

Close this publisher.

Source code in proxystore/stream/shims/zmq.py
def close(self) -> None:
    """Close this publisher."""
    self._context.destroy()

send_message

send_message(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/shims/zmq.py
def send_message(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    self._socket.send_multipart((topic.encode(), message))

ZeroMQSubscriber

ZeroMQSubscriber(
    address: str, port: int, *, topic: str = ""
)

ZeroMQ subscriber interface.

This subscriber is an iterable object which yields bytes messages indefinitely from the stream while connected to a publisher.

Parameters:

  • address (str) –

    Publisher address to connect to. The full address will be constructed as 'tcp://{address}:{port}'.

  • port (int) –

    Publisher port to connect to.

  • topic (str, default: '' ) –

    Topic to subscribe to. The default '' subscribes to all topics.

Source code in proxystore/stream/shims/zmq.py
def __init__(self, address: str, port: int, *, topic: str = '') -> None:
    self._context = zmq.Context()
    self._socket = self._context.socket(zmq.SUB)
    self._socket.connect(f'tcp://{address}:{port}')
    self._socket.setsockopt(zmq.SUBSCRIBE, topic.encode())

next_message

next_message() -> bytes

Get the next message.

Source code in proxystore/stream/shims/zmq.py
def next_message(self) -> bytes:
    """Get the next message."""
    _, message = self._socket.recv_multipart()
    return message

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/shims/zmq.py
def close(self) -> None:
    """Close this subscriber."""
    self._context.destroy()