Skip to content

proxystore.stream.protocols

Stream interface protocols.

Publisher/Subscriber

The Publisher and Subscriber are Protocols which define the publisher and subscriber interfaces to a pub/sub-like message broker system.

In general, these protocols do not enforce any other implementation details besides the interface. For example, implementations could choose to support any producer-to-consumer configurations (e.g., 1:1, 1:N, N:N).

There are two variants of publisher/subscribers:

  • Event: The lower-level variant that are responsible for publishing and retrieving EventBatch types. These require more complexity to implement but can support finer optimization.
  • Message: The higher-level variant that publishes pre-serialized messages in the form of bytes-strings and receives messages on the subscriber side. These are simple to implement.

A set of shims to third-party message brokers are provided in proxystore.stream.shims.

Plugins

Additional protocols, such as the Filter, are plugins used by the StreamProducer and/or StreamConsumer that alter their behavior.

Publisher module-attribute

Publisher = Union['EventPublisher', 'MessagePublisher']

Publisher union type.

Subscriber module-attribute

Subscriber = Union['EventSubscriber', 'MessageSubscriber']

Subscriber union type.

EventPublisher

Bases: Protocol

Publisher interface to an event stream.

close

close() -> None

Close this publisher.

Source code in proxystore/stream/protocols.py
def close(self) -> None:
    """Close this publisher."""
    ...

send_events

send_events(events: EventBatch) -> None

Publish event with optional data to the stream.

Parameters:

  • events (EventBatch) –

    Batch of events to publish.

Source code in proxystore/stream/protocols.py
def send_events(self, events: EventBatch) -> None:
    """Publish event with optional data to the stream.

    Args:
        events: Batch of events to publish.
    """
    ...

EventSubscriber

Bases: Protocol

Subscriber interface to an event stream.

The subscriber protocol is an iterable object which yields objects from the stream until the stream is closed.

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/protocols.py
def close(self) -> None:
    """Close this subscriber."""
    ...

next_events

next_events() -> EventBatch

Get the next event batch.

Source code in proxystore/stream/protocols.py
def next_events(self) -> EventBatch:
    """Get the next event batch."""
    ...

MessagePublisher

Bases: Protocol

Publisher interface to message stream.

close

close() -> None

Close this publisher.

Source code in proxystore/stream/protocols.py
def close(self) -> None:
    """Close this publisher."""
    ...

send_message

send_message(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/protocols.py
def send_message(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    ...

MessageSubscriber

Bases: Protocol

Subscriber interface to message stream.

The subscriber protocol is an iterable object which yields objects from the stream until the stream is closed.

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/protocols.py
def close(self) -> None:
    """Close this subscriber."""
    ...

next_message

next_message() -> bytes

Get the next message.

Source code in proxystore/stream/protocols.py
def next_message(self) -> bytes:
    """Get the next message."""
    ...

Filter

Bases: Protocol

Filter protocol.

A filter takes as input the dictionary of metadata associated with a new object event and returns a boolean indicating if the event should be dropped. I.e., if the filter returns True, the event will be filtered out of the stream and lost.

__call__

__call__(metadata: dict[str, Any]) -> bool

Apply the filter to event metadata.

Source code in proxystore/stream/protocols.py
def __call__(self, metadata: dict[str, Any]) -> bool:
    """Apply the filter to event metadata."""
    ...