Skip to content

proxystore.stream.shims.zmq

ZeroMQ pub/sub interface.

Note

Unlike some of the other shims that simply interface with a third-party message broker system, here the subscriber connects directly to the publisher. This means that if the publisher is not alive when creating the subscriber, the subscriber will fail.

ZeroMQPublisher

ZeroMQPublisher(address: str, port: int)

ZeroMQ publisher interface.

Parameters:

  • address (str) –

    Address to bind to. The full address bound to will be 'tcp://{address}:{port}'.

  • port (int) –

    Port to bind to.

Source code in proxystore/stream/shims/zmq.py
def __init__(self, address: str, port: int) -> None:
    self._context = zmq.Context()
    self._socket = self._context.socket(zmq.PUB)
    self._socket.bind(f'tcp://{address}:{port}')

close

close() -> None

Close this publisher.

Source code in proxystore/stream/shims/zmq.py
def close(self) -> None:
    """Close this publisher."""
    self._context.destroy()

send

send(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/shims/zmq.py
def send(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    self._socket.send_multipart((topic.encode(), message))

ZeroMQSubscriber

ZeroMQSubscriber(
    address: str, port: int, *, topic: str = ""
)

ZeroMQ subscriber interface.

This subscriber is an iterable object which yields bytes messages indefinitely from the stream while connected to a publisher.

Parameters:

  • address (str) –

    Publisher address to connect to. The full address will be constructed as 'tcp://{address}:{port}'.

  • port (int) –

    Publisher port to connect to.

  • topic (str, default: '' ) –

    Topic to subscribe to. The default '' subscribes to all topics.

Source code in proxystore/stream/shims/zmq.py
def __init__(self, address: str, port: int, *, topic: str = '') -> None:
    self._context = zmq.Context()
    self._socket = self._context.socket(zmq.SUB)
    self._socket.connect(f'tcp://{address}:{port}')
    self._socket.setsockopt(zmq.SUBSCRIBE, topic.encode())

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/shims/zmq.py
def close(self) -> None:
    """Close this subscriber."""
    self._context.destroy()