Skip to content

proxystore.stream.interface

Stream producer and consumer interfaces.

Note

The StreamProducer and StreamConsumer are re-exported in proxystore.stream for convenience.

StreamProducer

StreamProducer(
    publisher: Publisher,
    stores: Mapping[str | None, Store[Any]],
    *,
    aggregator: Callable[[list[T]], T] | None = None,
    batch_size: int = 1,
    filter_: Filter | None = None
)

Bases: Generic[T]

Proxy stream producer interface.

Note

The StreamProducer can be used as a context manager.

with StreamProducer(...) as stream:
    for item in ...:
        stream.send(item)
Warning

The producer is not thread-safe.

Tip

This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately. This is useful for enabling a static type checker to validate that the correct object types are published to the stream.

producer = StreamProducer[str](...)
# mypy will raise an error that StreamProducer.send() expects a str
# but got a list[int].
producer.send('default', [1, 2, 3])

Attributes:

  • publisher

    Publisher interface.

Parameters:

  • publisher (Publisher) –

    Object which implements the Publisher protocol. Used to publish event messages when new objects are added to the stream.

  • stores (Mapping[str | None, Store[Any]]) –

    Mapping from topic names to the Store instance used to store and communicate serialized objects streamed to that topic. The None topic can be used to specify a default Store used for topics not present in this mapping.

  • aggregator (Callable[[list[T]], T] | None, default: None ) –

    Optional aggregator which takes as input the batch of objects and returns a single object of the same type when invoked. The size of the batch passed to the aggregator is controlled by the batch_size parameter. When aggregation is used, the metadata associated with the aggregated object will be the union of each metadata dict from each object in the batch.

  • batch_size (int, default: 1 ) –

    Batch size used for batching and aggregation.

  • filter_ (Filter | None, default: None ) –

    Optional filter to apply prior to sending objects to the stream. If the filter returns True for a given object's metadata, the object will not be sent to the stream. The filter is applied before aggregation or batching.

Source code in proxystore/stream/interface.py
def __init__(
    self,
    publisher: Publisher,
    stores: Mapping[str | None, Store[Any]],
    *,
    aggregator: Callable[[list[T]], T] | None = None,
    batch_size: int = 1,
    filter_: Filter | None = None,
) -> None:
    self.publisher = publisher
    self._stores = stores
    self._aggregator = aggregator
    self._batch_size = batch_size
    self._filter: Filter = filter_ if filter_ is not None else NullFilter()

    # Mapping between topic and buffers
    self._buffer: dict[str, _TopicBuffer[T]] = defaultdict(
        lambda: _TopicBuffer([], False),
    )

close

close(
    *,
    topics: Iterable[str] = (),
    publisher: bool = True,
    stores: bool = False
) -> None

Close the producer.

Warning

Objects buffered in an incomplete batch will be lost. Call flush() to ensure that all objects are sent before closing.

Warning

By default, this will close the Publisher interface, but will not close the Store interfaces.

Parameters:

Source code in proxystore/stream/interface.py
def close(
    self,
    *,
    topics: Iterable[str] = (),
    publisher: bool = True,
    stores: bool = False,
) -> None:
    """Close the producer.

    Warning:
        Objects buffered in an incomplete batch will be lost. Call
        [`flush()`][proxystore.stream.interface.StreamProducer] to ensure
        that all objects are sent before closing.

    Warning:
        By default, this will close the
        [`Publisher`][proxystore.stream.protocols.Publisher] interface,
        but will **not** close the [`Store`][proxystore.store.base.Store]
        interfaces.

    Args:
        topics: Topics to send end of stream events to. Equivalent to
            calling [`close_topics()`][proxystore.stream.interface.StreamProducer.close_topics]
            first.
        publisher: Close the
            [`Publisher`][proxystore.stream.protocols.Publisher] interface.
        stores: Close and [unregister][proxystore.store.unregister_store]
            the [`Store`][proxystore.store.base.Store] instances.
    """  # noqa: E501
    self.close_topics(*topics)
    if stores:
        for store in self._stores.values():
            store.close()
            unregister_store(store)
    if publisher:
        self.publisher.close()

close_topics

close_topics(*topics: str) -> None

Send publish an end of stream event to each topic.

A StreamConsumer will raise a StopIteration exception when an end of stream event is received. The end of stream event is still ordered, however, so all prior sent events will be consumed first before the end of stream event is propagated.

Note

This will flush the topic buffer.

Parameters:

  • topics (str, default: () ) –

    Topics to send end of stream events to.

Source code in proxystore/stream/interface.py
def close_topics(self, *topics: str) -> None:
    """Send publish an end of stream event to each topic.

    A [`StreamConsumer`][proxystore.stream.interface.StreamConsumer]
    will raise a [`StopIteration`][StopIteration] exception when an
    end of stream event is received. The end of stream event is still
    ordered, however, so all prior sent events will be consumed first
    before the end of stream event is propagated.

    Note:
        This will flush the topic buffer.

    Args:
        topics: Topics to send end of stream events to.
    """
    for topic in topics:
        self._buffer[topic].closed = True
        self.flush_topic(topic)

flush

flush() -> None

Flush batch buffers for all topics.

Source code in proxystore/stream/interface.py
def flush(self) -> None:
    """Flush batch buffers for all topics."""
    for topic in self._buffer:
        self.flush_topic(topic)

flush_topic

flush_topic(topic: str) -> None

Flush the batch buffer for a topic.

This method:

  1. Pops the current batch of objects off the topic buffer.
  2. Applies the aggregator to the batch if one was provided.
  3. Puts the batch of objects in the Store.
  4. Creates a new batch event using the keys returned by the store and additional metadata.
  5. Publishes the event to the stream via the Publisher.

Parameters:

  • topic (str) –

    Topic to flush.

if a store associated with topic is not found

in the mapping of topics to stores nor a default store is provided.

Source code in proxystore/stream/interface.py
def flush_topic(self, topic: str) -> None:
    """Flush the batch buffer for a topic.

    This method:

    1. Pops the current batch of objects off the topic buffer.
    2. Applies the aggregator to the batch if one was provided.
    3. Puts the batch of objects in the
       [`Store`][proxystore.store.base.Store].
    4. Creates a new batch event using the keys returned by the store and
       additional metadata.
    5. Publishes the event to the stream via the
       [`Publisher`][proxystore.stream.protocols.Publisher].

    Args:
        topic: Topic to flush.

    ValueError: if a store associated with `topic` is not found
        in the mapping of topics to stores nor a default store is
        provided.
    """
    objects = self._buffer[topic].objects
    closed = self._buffer[topic].closed

    if len(objects) == 0 and not closed:
        # No events to send so quick return
        return

    # Reset buffer
    self._buffer[topic].objects = []

    if self._aggregator is not None and len(objects) > 0:
        obj = self._aggregator([item.obj for item in objects])
        evict = any([item.evict for item in objects])
        metadata: dict[str, Any] = {}
        for item in objects:
            metadata.update(item.metadata)
        objects = [_BufferedObject(obj, evict, metadata)]

    if topic in self._stores:
        store = self._stores[topic]
    elif None in self._stores:
        store = self._stores[None]
    else:
        raise ValueError(
            f'No store associated with topic "{topic}" found or '
            'default store.',
        )

    events: list[Event] = []

    if len(objects) > 0:
        keys = store.put_batch([item.obj for item in objects])

        for key, item in zip(keys, objects):
            event = NewObjectEvent.from_key(
                key,
                evict=item.evict,
                metadata=item.metadata,
            )
            events.append(event)

    if closed:
        events.append(EndOfStreamEvent())

    # If there are no new events and the stream wasn't closed we should
    # have early exited
    assert len(events) > 0

    batch_event = EventBatch(events, topic, store.config())
    message = event_to_bytes(batch_event)
    self.publisher.send(topic, message)

send

send(
    topic: str,
    obj: T,
    *,
    evict: bool = True,
    metadata: dict[str, Any] | None = None
) -> None

Send an item to the stream.

This method:

  1. Applies the filter to the metadata associated with this event, skipping streaming this object if the filter returns True.
  2. Adds the object to the internal event buffer for this topic.
  3. Flushes the event buffer once the batch size is reached.
Warning

Careful consideration should be given to the setting of the evict flag. When set to True, the corresponding proxy yielded by the consumer of the stream will only be resolvable once. If you encounter unexpected ProxyResolveMissingKeyError errors, it may be due to proxies from the stream being resolved multiple times but the first resolve triggered an eviction of the underlying data.

Parameters:

  • topic (str) –

    Stream topic to send the object to.

  • obj (T) –

    Object to send via the stream.

  • evict (bool, default: True ) –

    Evict the object from the Store once the object is consumed by a StreamConsumer. Set to False if a single object in the stream will be consumed by multiple consumers. Note that when set to False, data eviction must be handled manually.

  • metadata (dict[str, Any] | None, default: None ) –

    Dictionary containing metadata about the object. This can be used by the producer or consumer to filter new object events. The default value None is replaced with an empty dict.

Raises:

  • TopicClosedError

    if the topic has already been closed via close_topics().

  • ValueError

    if a store associated with topic is not found in the mapping of topics to stores nor a default store is provided.

Source code in proxystore/stream/interface.py
def send(
    self,
    topic: str,
    obj: T,
    *,
    evict: bool = True,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Send an item to the stream.

    This method:

    1. Applies the filter to the metadata associated with this event,
       skipping streaming this object if the filter returns `True`.
    2. Adds the object to the internal event buffer for this topic.
    3. Flushes the event buffer once the batch size is reached.

    Warning:
        Careful consideration should be given to the setting of the
        `evict` flag. When set to `True`, the corresponding proxy
        yielded by the consumer of the stream will only be resolvable
        once. If you encounter unexpected
        [`ProxyResolveMissingKeyError`][proxystore.store.exceptions.ProxyResolveMissingKeyError]
        errors, it may be due to proxies from the stream being resolved
        multiple times but the first resolve triggered an eviction
        of the underlying data.

    Args:
        topic: Stream topic to send the object to.
        obj: Object to send via the stream.
        evict: Evict the object from the
            [`Store`][proxystore.store.base.Store] once the object is
            consumed by a
            [`StreamConsumer`][proxystore.stream.interface.StreamConsumer].
            Set to `False` if a single object in the stream will be
            consumed by multiple consumers. Note that when set to `False`,
            data eviction must be handled manually.
        metadata: Dictionary containing metadata about the object. This
            can be used by the producer or consumer to filter new
            object events. The default value `None` is replaced with an
            empty [`dict`][dict].

    Raises:
        TopicClosedError: if the `topic` has already been closed via
            [`close_topics()`][proxystore.stream.interface.StreamProducer.close_topics].
        ValueError: if a store associated with `topic` is not found
            in the mapping of topics to stores nor a default store is
            provided.
    """
    if self._buffer[topic].closed:
        raise TopicClosedError(f'Topic "{topic}" has been closed.')

    metadata = metadata if metadata is not None else {}
    if self._filter(metadata):
        return

    item = _BufferedObject(obj, evict, metadata)
    self._buffer[topic].objects.append(item)

    if len(self._buffer[topic].objects) >= self._batch_size:
        self.flush_topic(topic)

StreamConsumer

StreamConsumer(
    subscriber: Subscriber, *, filter_: Filter | None = None
)

Bases: Generic[T]

Proxy stream consumer interface.

This interface acts as an iterator that will yield items from the stream until the stream is closed.

Note

The StreamConsumer can be used as a context manager.

with StreamConsumer(...) as stream:
    for item in stream:
        ...
Tip

This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately.

consumer = StreamConsumer[str](...)
reveal_type(consumer.next())
# Proxy[str]
If the stream is heterogeneous or objects types are not known ahead of time, it may be appropriate to annotate the stream with Any.
consumer = StreamConsumer[Any](...)
reveal_type(consumer.next())
# Proxy[Any]

Warning

If you encounter unexpected ProxyResolveMissingKeyError errors, it may be due to proxies from the stream being resolved multiple times but the first resolve triggered an eviction of the underlying data. If this is the case, confirm that the setting of the evict flag on StreamProducer.send() is set correctly and the there is not code incidentally resolving proxies before you expect.

Note

The consumer is not thread-safe.

Attributes:

  • subscriber

    Subscriber interface.

Parameters:

  • subscriber (Subscriber) –

    Object which implements the Subscriber protocol. Used to listen for new event messages indicating new objects in the stream.

  • filter_ (Filter | None, default: None ) –

    Optional filter to apply to event metadata received from the stream. If the filter returns True, the event will be dropped (i.e., not yielded back to the user), and the object associated with that event will be deleted if the evict flag was set on the producer side.

Source code in proxystore/stream/interface.py
def __init__(
    self,
    subscriber: Subscriber,
    *,
    filter_: Filter | None = None,
) -> None:
    self.subscriber = subscriber
    self._stores: dict[str, Store[Any]] = {}
    self._filter: Filter = filter_ if filter_ is not None else NullFilter()

    self._current_batch: EventBatch | None = None

__iter__

__iter__() -> Self

Return an iterator that will yield proxies of stream objects.

Source code in proxystore/stream/interface.py
def __iter__(self) -> Self:
    """Return an iterator that will yield proxies of stream objects."""
    return self

__next__

__next__() -> Proxy[T]

Alias for next().

Source code in proxystore/stream/interface.py
def __next__(self) -> Proxy[T]:
    """Alias for [`next()`][proxystore.stream.interface.StreamConsumer.next]."""  # noqa: E501
    return self.next()

close

close(
    *, stores: bool = False, subscriber: bool = True
) -> None

Close the consumer.

Warning

By default, this will close the Subscriber interface, but will not close the Store interfaces.

Parameters:

  • stores (bool, default: False ) –

    Close and unregister the Store instances used to resolve objects consumed from the stream.

  • subscriber (bool, default: True ) –

    Close the Subscriber interface.

Source code in proxystore/stream/interface.py
def close(self, *, stores: bool = False, subscriber: bool = True) -> None:
    """Close the consumer.

    Warning:
        By default, this will close the
        [`Subscriber`][proxystore.stream.protocols.Subscriber] interface,
        but will **not** close the [`Store`][proxystore.store.base.Store]
        interfaces.

    Args:
        stores: Close and [unregister][proxystore.store.unregister_store]
            the [`Store`][proxystore.store.base.Store] instances
            used to resolve objects consumed from the stream.
        subscriber: Close the
            [`Subscriber`][proxystore.stream.protocols.Subscriber]
            interface.
    """
    if stores:
        for store in self._stores.values():
            store.close()
            unregister_store(store)
    if subscriber:
        self.subscriber.close()

iter_with_metadata

iter_with_metadata() -> (
    Generator[tuple[dict[str, Any], Proxy[T]], None, None]
)

Return an iterator that yields tuples of metadata and proxies.

Note

This is different from iter(consumer) which will yield only proxies of objects in the stream.

Source code in proxystore/stream/interface.py
def iter_with_metadata(
    self,
) -> Generator[tuple[dict[str, Any], Proxy[T]], None, None]:
    """Return an iterator that yields tuples of metadata and proxies.

    Note:
        This is different from `iter(consumer)` which will yield
        *only* proxies of objects in the stream.
    """
    while True:
        try:
            yield self.next_with_metadata()
        except StopIteration:
            return

iter_objects

iter_objects() -> Generator[T, None, None]

Return an iterator that yields objects from the stream.

Note

This is different from iter(consumer) which will yield proxies of objects in the stream.

Source code in proxystore/stream/interface.py
def iter_objects(self) -> Generator[T, None, None]:
    """Return an iterator that yields objects from the stream.

    Note:
        This is different from `iter(consumer)` which will yield
        proxies of objects in the stream.
    """
    while True:
        try:
            yield self.next_object()
        except StopIteration:
            return

iter_objects_with_metadata

iter_objects_with_metadata() -> (
    Generator[tuple[dict[str, Any], T], None, None]
)

Return an iterator that yields tuples of metadata and objects.

Note

This is different from iter(consumer) which will yield proxies of objects in the stream.

Source code in proxystore/stream/interface.py
def iter_objects_with_metadata(
    self,
) -> Generator[tuple[dict[str, Any], T], None, None]:
    """Return an iterator that yields tuples of metadata and objects.

    Note:
        This is different from `iter(consumer)` which will yield
        proxies of objects in the stream.
    """
    while True:
        try:
            yield self.next_object_with_metadata()
        except StopIteration:
            return

next

next() -> Proxy[T]

Return a proxy of the next object in the stream.

Note

This method has the potential side effect of initializing and globally registering a new Store instance. This will happen at most once per topic because the producer can map topic names to Store instances. This class will keep track of the Store instances used by the stream and will close and unregister them when this class is closed.

Raises:

  • StopIteration

    when an end of stream event is received from a producer. Note that this does not call close().

Source code in proxystore/stream/interface.py
def next(self) -> Proxy[T]:
    """Return a proxy of the next object in the stream.

    Note:
        This method has the potential side effect of initializing and
        globally registering a new [`Store`][proxystore.store.base.Store]
        instance. This will happen at most once per topic because the
        producer can map topic names to
        [`Store`][proxystore.store.base.Store] instances. This class will
        keep track of the [`Store`][proxystore.store.base.Store] instances
        used by the stream and will close and unregister them when this
        class is closed.

    Raises:
        StopIteration: when an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.interface.StreamConsumer.close].
    """
    _, proxy = self.next_with_metadata()
    return proxy

next_with_metadata

next_with_metadata() -> tuple[dict[str, Any], Proxy[T]]

Return a tuple of metadata and proxy for the next object.

Note

This method has the same potential side effects as next().

Returns:

  • dict[str, Any]

    Dictionary of user-provided metadata associated with the object.

  • Proxy[T]

    Proxy of the next object in the stream.

Raises:

  • StopIteration

    when an end of stream event is received from a producer. Note that this does not call close().

Source code in proxystore/stream/interface.py
def next_with_metadata(self) -> tuple[dict[str, Any], Proxy[T]]:
    """Return a tuple of metadata and proxy for the next object.

    Note:
        This method has the same potential side effects as
        [`next()`][proxystore.stream.interface.StreamConsumer.next].

    Returns:
        Dictionary of user-provided metadata associated with the object.
        Proxy of the next object in the stream.

    Raises:
        StopIteration: when an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.interface.StreamConsumer.close].
    """
    event_info = self._next_event_with_filter()
    store = self._get_store(event_info)
    event = event_info.event
    key = event.get_key()

    proxy: Proxy[T] = store.proxy_from_key(key, evict=event.evict)
    return event.metadata, proxy

next_object

next_object() -> T

Return the next object in the stream.

Note

This method has the same potential side effects as next().

Raises:

  • StopIteration

    when an end of stream event is received from a producer. Note that this does not call close().

  • ValueError

    if the store does not return an object using the key included in the object's event metadata.

Source code in proxystore/stream/interface.py
def next_object(self) -> T:
    """Return the next object in the stream.

    Note:
        This method has the same potential side effects as
        [`next()`][proxystore.stream.interface.StreamConsumer.next].

    Raises:
        StopIteration: when an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.interface.StreamConsumer.close].
        ValueError: if the store does not return an object using the key
            included in the object's event metadata.
    """
    _, obj = self.next_object_with_metadata()
    return obj

next_object_with_metadata

next_object_with_metadata() -> tuple[dict[str, Any], T]

Return a tuple of metadata and the next object in the stream.

Note

This method has the same potential side effects as next().

Returns:

  • dict[str, Any]

    Dictionary of user-provided metadata associated with the object.

  • T

    Next object in the stream.

Raises:

  • StopIteration

    when an end of stream event is received from a producer. Note that this does not call close().

Source code in proxystore/stream/interface.py
def next_object_with_metadata(self) -> tuple[dict[str, Any], T]:
    """Return a tuple of metadata and the next object in the stream.

    Note:
        This method has the same potential side effects as
        [`next()`][proxystore.stream.interface.StreamConsumer.next].

    Returns:
        Dictionary of user-provided metadata associated with the object.
        Next object in the stream.

    Raises:
        StopIteration: when an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.interface.StreamConsumer.close].
    """
    event_info = self._next_event_with_filter()
    store = self._get_store(event_info)
    event = event_info.event
    key = event.get_key()

    obj = store.get(key)
    if obj is None:
        raise ValueError(
            f'Store(name="{store.name}") returned None for key={key}.',
        )

    if event.evict:
        store.evict(key)

    return event.metadata, cast(T, obj)