Skip to content

proxystore.stream

Streaming interface.

Warning

The streaming interfaces are experimental and may change in future releases.

Tip

Checkout the Streaming Guide to learn more!

StreamConsumer

StreamConsumer(
    subscriber: Subscriber, *, filter_: Filter | None = None
)

Bases: Generic[T]

Stream consumer interface.

This interface acts as an iterator that will yield items from the stream until the stream is closed.

Note

The StreamConsumer can be used as a context manager.

with StreamConsumer(...) as stream:
    for item in stream:
        ...
Tip

This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately.

consumer = StreamConsumer[str](...)
reveal_type(consumer.next())
# ProxyOr[str]
reveal_type(consumer.next_object())
# str
If the stream is heterogeneous or objects types are not known ahead of time, it may be appropriate to annotate the stream with Any.
consumer = StreamConsumer[Any](...)
reveal_type(consumer.next())
# ProxyOr[Any]

Warning

If you encounter unexpected ProxyResolveMissingKeyError errors, it may be due to proxies from the stream being resolved multiple times but the first resolve triggered an eviction of the underlying data. If this is the case, confirm that the setting of the evict flag on StreamProducer.send() is set correctly and the there is not code incidentally resolving proxies before you expect.

Warning

The consumer is not thread-safe.

Attributes:

  • subscriber

    Subscriber interface.

Parameters:

  • subscriber (Subscriber) –

    Object which implements one of the Subscriber protocols. Used to listen for new event messages indicating new objects in the stream.

  • filter_ (Filter | None, default: None ) –

    Optional filter to apply to event metadata received from the stream. If the filter returns True, the event will be dropped (i.e., not yielded back to the user), and the object associated with that event will be deleted if the evict flag was set on the producer side.

Source code in proxystore/stream/_consumer.py
def __init__(
    self,
    subscriber: Subscriber,
    *,
    filter_: Filter | None = None,
) -> None:
    self.subscriber = subscriber
    self._stores: dict[str, Store[Any]] = {}
    self._filter: Filter = filter_ if filter_ is not None else NullFilter()

    self._current_batch: EventBatch | None = None

__iter__

__iter__() -> Self

Return an iterator that will yield stream items.

The return type of items is based on that returned by next().

Source code in proxystore/stream/_consumer.py
def __iter__(self) -> Self:
    """Return an iterator that will yield stream items.

    The return type of items is based on that returned by
    [`next()`][proxystore.stream.StreamConsumer.next].
    """
    return self

__next__

__next__() -> ProxyOr[T]

Alias for next().

Source code in proxystore/stream/_consumer.py
def __next__(self) -> ProxyOr[T]:
    """Alias for [`next()`][proxystore.stream.StreamConsumer.next]."""
    return self.next()

close

close(
    *, stores: bool = False, subscriber: bool = True
) -> None

Close the consumer.

Warning

By default, this will close the Subscriber interface, but will not close the Store interfaces.

Parameters:

  • stores (bool, default: False ) –

    Close and unregister the Store instances used to resolve objects consumed from the stream.

  • subscriber (bool, default: True ) –

    Close the Subscriber interface.

Source code in proxystore/stream/_consumer.py
def close(self, *, stores: bool = False, subscriber: bool = True) -> None:
    """Close the consumer.

    Warning:
        By default, this will close the
        [`Subscriber`][proxystore.stream.protocols.Subscriber] interface,
        but will **not** close the [`Store`][proxystore.store.Store]
        interfaces.

    Args:
        stores: Close and [unregister][proxystore.store.unregister_store]
            the [`Store`][proxystore.store.Store] instances
            used to resolve objects consumed from the stream.
        subscriber: Close the
            [`Subscriber`][proxystore.stream.protocols.Subscriber]
            interface.
    """
    if stores:
        for store in self._stores.values():
            store.close()
            unregister_store(store)
    if subscriber:
        self.subscriber.close()

iter_with_metadata

iter_with_metadata() -> (
    Generator[
        tuple[dict[str, Any], ProxyOr[T]], None, None
    ]
)

Create an iterator that yields tuples of metadata and items.

The return type of items is based on that returned by next(). This is different from iter(consumer) which will yield only items, dropping the metadata.

Source code in proxystore/stream/_consumer.py
def iter_with_metadata(
    self,
) -> Generator[tuple[dict[str, Any], ProxyOr[T]], None, None]:
    """Create an iterator that yields tuples of metadata and items.

    The return type of items is based on that returned by
    [`next()`][proxystore.stream.StreamConsumer.next].
    This is different from `iter(consumer)` which will yield *only* items,
    dropping the metadata.
    """
    while True:
        try:
            yield self.next_with_metadata()
        except StopIteration:
            return

iter_objects

iter_objects() -> Generator[T, None, None]

Create an iterator that yields objects from the stream.

Source code in proxystore/stream/_consumer.py
def iter_objects(self) -> Generator[T, None, None]:
    """Create an iterator that yields objects from the stream."""
    while True:
        try:
            yield self.next_object()
        except StopIteration:
            return

iter_objects_with_metadata

iter_objects_with_metadata() -> (
    Generator[tuple[dict[str, Any], T], None, None]
)

Create an iterator that yields tuples of metadata and objects.

Source code in proxystore/stream/_consumer.py
def iter_objects_with_metadata(
    self,
) -> Generator[tuple[dict[str, Any], T], None, None]:
    """Create an iterator that yields tuples of metadata and objects."""
    while True:
        try:
            yield self.next_object_with_metadata()
        except StopIteration:
            return

next

next() -> ProxyOr[T]

Get the next item in the stream.

Note

This method has the potential side effect of initializing and globally registering a new Store instance. This will happen at most once per topic because the producer can map topic names to Store instances. This class will keep track of the Store instances used by the stream and will close and unregister them when this class is closed.

Returns:

Raises:

  • StopIteration

    When an end of stream event is received from a producer. Note that this does not call close().

Source code in proxystore/stream/_consumer.py
def next(self) -> ProxyOr[T]:
    """Get the next item in the stream.

    Note:
        This method has the potential side effect of initializing and
        globally registering a new [`Store`][proxystore.store.Store]
        instance. This will happen at most once per topic because the
        producer can map topic names to
        [`Store`][proxystore.store.Store] instances. This class will
        keep track of the [`Store`][proxystore.store.Store] instances
        used by the stream and will close and unregister them when this
        class is closed.

    Returns:
        [`Proxy[T]`][proxystore.proxy.Proxy] is returned if the topic \
        was associated with a [`Store`][proxystore.store.Store] \
        in the [`StreamProducer`][proxystore.stream.StreamProducer] \
        otherwise `T` is returned.

    Raises:
        StopIteration: When an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.StreamConsumer.close].
    """
    _, proxy = self.next_with_metadata()
    return proxy

next_with_metadata

next_with_metadata() -> tuple[dict[str, Any], ProxyOr[T]]

Get the next item with metadata in the stream.

Note

This method has the same potential side effects as and return type as next().

Returns:

  • dict[str, Any]

    Dictionary of user-provided metadata associated with the object.

  • ProxyOr[T]

    Proxy of the next object in the stream.

Raises:

  • StopIteration

    When an end of stream event is received from a producer. Note that this does not call close().

Source code in proxystore/stream/_consumer.py
def next_with_metadata(self) -> tuple[dict[str, Any], ProxyOr[T]]:
    """Get the next item with metadata in the stream.

    Note:
        This method has the same potential side effects as and return type
        as [`next()`][proxystore.stream.StreamConsumer.next].

    Returns:
        Dictionary of user-provided metadata associated with the object.
        Proxy of the next object in the stream.

    Raises:
        StopIteration: When an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.StreamConsumer.close].
    """
    event = self._next_event_with_filter()

    if isinstance(event, NewObjectEvent):
        return event.metadata, cast(T, event.obj)
    elif isinstance(event, NewObjectKeyEvent):
        store = self._get_store(event)
        proxy: Proxy[T] = store.proxy_from_key(
            event.get_key(),
            evict=event.evict,
        )
        return event.metadata, proxy
    else:
        raise AssertionError('Unreachable.')

next_object

next_object() -> T

Get the next object in the stream.

Note

This method has the same potential side effects as next().

Raises:

  • StopIteration

    When an end of stream event is received from a producer. Note that this does not call close().

  • ValueError

    If the store does not return an object using the key included in the object's event metadata.

Source code in proxystore/stream/_consumer.py
def next_object(self) -> T:
    """Get the next object in the stream.

    Note:
        This method has the same potential side effects as
        [`next()`][proxystore.stream.StreamConsumer.next].

    Raises:
        StopIteration: When an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.StreamConsumer.close].
        ValueError: If the store does not return an object using the key
            included in the object's event metadata.
    """
    _, obj = self.next_object_with_metadata()
    return obj

next_object_with_metadata

next_object_with_metadata() -> tuple[dict[str, Any], T]

Get the next object with metadata in the stream.

Note

This method has the same potential side effects as next().

Returns:

  • dict[str, Any]

    Dictionary of user-provided metadata associated with the object.

  • T

    Next object in the stream.

Raises:

  • StopIteration

    When an end of stream event is received from a producer. Note that this does not call close().

Source code in proxystore/stream/_consumer.py
def next_object_with_metadata(self) -> tuple[dict[str, Any], T]:
    """Get the next object with metadata in the stream.

    Note:
        This method has the same potential side effects as
        [`next()`][proxystore.stream.StreamConsumer.next].

    Returns:
        Dictionary of user-provided metadata associated with the object.
        Next object in the stream.

    Raises:
        StopIteration: When an end of stream event is received from a
            producer. Note that this does not call
            [`close()`][proxystore.stream.StreamConsumer.close].
    """
    event = self._next_event_with_filter()

    if isinstance(event, NewObjectEvent):
        return event.metadata, cast(T, event.obj)
    elif isinstance(event, NewObjectKeyEvent):
        store = self._get_store(event)
        key = event.get_key()
        obj = store.get(key)
        if obj is None:
            raise ValueError(
                f'Store(name="{store.name}") returned None for key={key}.',
            )
        if event.evict:
            store.evict(key)
        return event.metadata, cast(T, obj)
    else:
        raise AssertionError('Unreachable.')

StreamProducer

StreamProducer(
    publisher: Publisher,
    *,
    aggregator: Callable[[list[T]], T] | None = None,
    batch_size: int = 1,
    default_store: Store[Any] | None = None,
    filter_: Filter | None = None,
    stores: Mapping[str, Store[Any] | None] | None = None
)

Bases: Generic[T]

Stream producer interface.

This interface enables streaming objects in a manner which decouples bulk object transfer from event notifications. Topics can be associated with a Store which will be used for bulk object storage and communication. Event metadata, including the key to the object in the store, is communicated through a message broker using the Publisher and Subscriber interfaces. The associated StreamConsumer can be used to iterate on proxies of objects from the stream.

This interface can also be used without a Store in which case the object is included in the event metadata and communicated directly through the message broker.

Note

The StreamProducer can be used as a context manager.

with StreamProducer(...) as stream:
    for item in ...:
        stream.send(item)
Tip

This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately. This is useful for enabling a static type checker to validate that the correct object types are published to the stream.

producer = StreamProducer[str](...)
# mypy will raise an error that StreamProducer.send() expects a str
# but got a list[int].
producer.send('default', [1, 2, 3])

Warning

The producer is not thread-safe.

Attributes:

  • publisher

    Publisher interface.

Parameters:

  • publisher (Publisher) –

    Object which implements one of the Publisher protocols. Used to publish event messages when new objects are added to the stream.

  • aggregator (Callable[[list[T]], T] | None, default: None ) –

    Optional aggregator which takes as input the batch of objects and returns a single object of the same type when invoked. The size of the batch passed to the aggregator is controlled by the batch_size parameter. When aggregation is used, the metadata associated with the aggregated object will be the union of each metadata dict from each object in the batch.

  • batch_size (int, default: 1 ) –

    Batch size used for batching and aggregation.

  • default_store (Store[Any] | None, default: None ) –

    Specify the default Store to be used with topics not explicitly set in stores. If no default is provided, objects are included directly in the event.

  • filter_ (Filter | None, default: None ) –

    Optional filter to apply prior to sending objects to the stream. If the filter returns True for a given object's metadata, the object will not be sent to the stream. The filter is applied before aggregation or batching.

  • stores (Mapping[str, Store[Any] | None] | None, default: None ) –

    Mapping from topic names to an optional Store instance used to store and communicate serialized objects streamed to that topic. If the value associated with a topic is None, the object is included directly in the event.

Source code in proxystore/stream/_producer.py
def __init__(
    self,
    publisher: Publisher,
    *,
    aggregator: Callable[[list[T]], T] | None = None,
    batch_size: int = 1,
    default_store: Store[Any] | None = None,
    filter_: Filter | None = None,
    stores: Mapping[str, Store[Any] | None] | None = None,
) -> None:
    self.publisher = publisher
    self._default_store = default_store
    self._aggregator = aggregator
    self._batch_size = batch_size
    self._filter: Filter = filter_ if filter_ is not None else NullFilter()
    self._stores = stores

    # Mapping between topic and buffers
    self._buffer: dict[str, _TopicBuffer[T]] = defaultdict(
        lambda: _TopicBuffer([], False),
    )

close

close(
    *,
    topics: Iterable[str] = (),
    publisher: bool = True,
    stores: bool = False
) -> None

Close the producer.

Warning

Objects buffered in an incomplete batch will be lost. Call flush() to ensure that all objects are sent before closing, or pass a list of topics to flush and close.

Warning

By default, this will close the Publisher interface, but will not close the Store interfaces.

Parameters:

Source code in proxystore/stream/_producer.py
def close(
    self,
    *,
    topics: Iterable[str] = (),
    publisher: bool = True,
    stores: bool = False,
) -> None:
    """Close the producer.

    Warning:
        Objects buffered in an incomplete batch will be lost. Call
        [`flush()`][proxystore.stream.StreamProducer] to ensure
        that all objects are sent before closing, or pass a list of
        topics to flush and close.

    Warning:
        By default, this will close the
        [`Publisher`][proxystore.stream.protocols.Publisher] interface,
        but will **not** close the [`Store`][proxystore.store.Store]
        interfaces.

    Args:
        topics: Topics to send end of stream events to. Equivalent to
            calling [`close_topics()`][proxystore.stream.StreamProducer.close_topics]
            first.
        publisher: Close the
            [`Publisher`][proxystore.stream.protocols.Publisher] interface.
        stores: Close and [unregister][proxystore.store.unregister_store]
            the [`Store`][proxystore.store.Store] instances.
    """  # noqa: E501
    self.close_topics(*topics)
    if stores:
        known_stores = {self._default_store}
        if self._stores is not None:
            known_stores.update(self._stores.values())
        for store in known_stores:
            if store is not None:
                store.close()
                unregister_store(store)
    if publisher:
        self.publisher.close()

close_topics

close_topics(*topics: str) -> None

Send an end of stream event to each topic.

A StreamConsumer will raise a StopIteration exception when an end of stream event is received. The end of stream event is still ordered, however, so all prior sent events will be consumed first before the end of stream event is propagated.

Note

This will flush the topic buffer.

Parameters:

  • topics (str, default: () ) –

    Topics to send end of stream events to.

Source code in proxystore/stream/_producer.py
def close_topics(self, *topics: str) -> None:
    """Send an end of stream event to each topic.

    A [`StreamConsumer`][proxystore.stream.StreamConsumer]
    will raise a [`StopIteration`][StopIteration] exception when an
    end of stream event is received. The end of stream event is still
    ordered, however, so all prior sent events will be consumed first
    before the end of stream event is propagated.

    Note:
        This will flush the topic buffer.

    Args:
        topics: Topics to send end of stream events to.
    """
    for topic in topics:
        self._buffer[topic].closed = True
        self.flush_topic(topic)

flush

flush() -> None

Flush batch buffers for all topics.

Source code in proxystore/stream/_producer.py
def flush(self) -> None:
    """Flush batch buffers for all topics."""
    for topic in self._buffer:
        self.flush_topic(topic)

flush_topic

flush_topic(topic: str) -> None

Flush the batch buffer for a topic.

This method:

  1. Pops the current batch of objects off the topic buffer.
  2. Applies the aggregator to the batch if one was provided.
  3. Puts the batch of objects in the Store.
  4. Creates a new batch event using the keys returned by the store and additional metadata.
  5. Publishes the event to the stream via the Publisher.

Parameters:

  • topic (str) –

    Topic to flush.

Source code in proxystore/stream/_producer.py
def flush_topic(self, topic: str) -> None:
    """Flush the batch buffer for a topic.

    This method:

    1. Pops the current batch of objects off the topic buffer.
    2. Applies the aggregator to the batch if one was provided.
    3. Puts the batch of objects in the
       [`Store`][proxystore.store.Store].
    4. Creates a new batch event using the keys returned by the store and
       additional metadata.
    5. Publishes the event to the stream via the
       [`Publisher`][proxystore.stream.protocols.Publisher].

    Args:
        topic: Topic to flush.
    """
    objects = self._buffer[topic].objects
    closed = self._buffer[topic].closed

    if len(objects) == 0 and not closed:
        # No events to send so quick return
        return

    # Reset buffer
    self._buffer[topic].objects = []

    if self._aggregator is not None and len(objects) > 0:
        obj = self._aggregator([item.obj for item in objects])
        evict = any([item.evict for item in objects])
        metadata: dict[str, Any] = {}
        for item in objects:
            metadata.update(item.metadata)
        objects = [_BufferedObject(obj, evict, metadata)]

    events: list[Event] = []
    store = self._get_store(topic)

    if len(objects) > 0 and store is not None:
        keys = store.put_batch([item.obj for item in objects])
        config = store.config()

        for key, item in zip(keys, objects):
            events.append(
                NewObjectKeyEvent.from_key(
                    key,
                    evict=item.evict,
                    metadata=item.metadata,
                    store_config=config,
                    topic=topic,
                ),
            )
    elif len(objects) > 0 and store is None:
        for item in objects:
            events.append(
                NewObjectEvent(
                    topic=topic,
                    obj=item.obj,
                    metadata=item.metadata,
                ),
            )

    if closed:
        events.append(EndOfStreamEvent(topic))

    # If there are no new events and the stream wasn't closed we should
    # have early exited
    assert len(events) > 0
    batch_event = EventBatch(topic, events)
    self._send_event(batch_event)

send

send(
    topic: str,
    obj: T,
    *,
    evict: bool = True,
    metadata: dict[str, Any] | None = None
) -> None

Send an item to the stream.

This method:

  1. Applies the filter to the metadata associated with this event, skipping streaming this object if the filter returns True.
  2. Adds the object to the internal event buffer for this topic.
  3. Flushes the event buffer once the batch size is reached.
Warning

Careful consideration should be given to the setting of the evict flag. When set to True, the corresponding proxy yielded by the consumer of the stream will only be resolvable once. If you encounter unexpected ProxyResolveMissingKeyError errors, it may be due to proxies from the stream being resolved multiple times but the first resolve triggered an eviction of the underlying data.

Parameters:

  • topic (str) –

    Stream topic to send the object to.

  • obj (T) –

    Object to send via the stream.

  • evict (bool, default: True ) –

    Evict the object from the Store once the object is consumed by a StreamConsumer. Set to False if a single object in the stream will be consumed by multiple consumers. Note that when set to False, data eviction must be handled manually. This parameter is ignored if a Store is not mapped to this topic.

  • metadata (dict[str, Any] | None, default: None ) –

    Dictionary containing metadata about the object. This can be used by the producer or consumer to filter new object events. The default value None is replaced with an empty dict.

Raises:

  • TopicClosedError

    If the topic has already been closed via close_topics().

  • ValueError

    If a store associated with topic is not found in the mapping of topics to stores nor a default store is provided.

Source code in proxystore/stream/_producer.py
def send(
    self,
    topic: str,
    obj: T,
    *,
    evict: bool = True,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Send an item to the stream.

    This method:

    1. Applies the filter to the metadata associated with this event,
       skipping streaming this object if the filter returns `True`.
    2. Adds the object to the internal event buffer for this topic.
    3. Flushes the event buffer once the batch size is reached.

    Warning:
        Careful consideration should be given to the setting of the
        `evict` flag. When set to `True`, the corresponding proxy
        yielded by the consumer of the stream will only be resolvable
        once. If you encounter unexpected
        [`ProxyResolveMissingKeyError`][proxystore.store.exceptions.ProxyResolveMissingKeyError]
        errors, it may be due to proxies from the stream being resolved
        multiple times but the first resolve triggered an eviction
        of the underlying data.

    Args:
        topic: Stream topic to send the object to.
        obj: Object to send via the stream.
        evict: Evict the object from the
            [`Store`][proxystore.store.Store] once the object is
            consumed by a
            [`StreamConsumer`][proxystore.stream.StreamConsumer].
            Set to `False` if a single object in the stream will be
            consumed by multiple consumers. Note that when set to `False`,
            data eviction must be handled manually. This parameter is
            ignored if a [`Store`][proxystore.store.base.Store] is not
            mapped to this topic.
        metadata: Dictionary containing metadata about the object. This
            can be used by the producer or consumer to filter new
            object events. The default value `None` is replaced with an
            empty [`dict`][dict].

    Raises:
        TopicClosedError: If the `topic` has already been closed via
            [`close_topics()`][proxystore.stream.StreamProducer.close_topics].
        ValueError: If a store associated with `topic` is not found
            in the mapping of topics to stores nor a default store is
            provided.
    """
    if self._buffer[topic].closed:
        raise TopicClosedError(f'Topic "{topic}" has been closed.')

    metadata = metadata if metadata is not None else {}
    if self._filter(metadata):
        return

    item = _BufferedObject(obj, evict, metadata)
    self._buffer[topic].objects.append(item)

    if len(self._buffer[topic].objects) >= self._batch_size:
        self.flush_topic(topic)