Skip to content

proxystore.stream.protocols

Protocols used by the stream interfaces.

Publisher/Subscriber

The Publisher and Subscriber are Protocols which define the publisher and subscriber interfaces to a pub/sub-like messaging system.

In general, these protocols do not enforce any other implementation details besides the interface. For example, implementations could choose to support any producer-to-consumer configurations (e.g., 1:1, 1:N, N:N). A set of shims implementing these protocols for third-party message brokers are provided in proxystore.stream.shims.

Plugins

Additional protocols, such as the Filter, are plugins used by the StreamProducer and/or StreamConsumer that alter their behavior.

Publisher

Bases: Protocol

Publisher interface to message stream.

close

close() -> None

Close this publisher.

Source code in proxystore/stream/protocols.py
def close(self) -> None:
    """Close this publisher."""
    ...

send

send(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/protocols.py
def send(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    ...

Subscriber

Bases: Protocol

Subscriber interface to message stream.

The subscriber protocol is an iterable object which yields objects from the stream until the stream is closed.

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/protocols.py
def close(self) -> None:
    """Close this subscriber."""
    ...

Filter

Bases: Protocol

Filter protocol.

A filter takes as input the dictionary of metadata associated with a new object event and returns a boolean indicating if the event should be dropped. I.e., if the filter returns True, the event will be filtered out of the stream and lost.

__call__

__call__(metadata: dict[str, Any]) -> bool

Apply the filter to event metadata.

Source code in proxystore/stream/protocols.py
def __call__(self, metadata: dict[str, Any]) -> bool:
    """Apply the filter to event metadata."""
    ...