proxystore.stream¶
Streaming interface.
Warning
The streaming interfaces are experimental and may change in future releases.
Tip
Checkout the Streaming Guide to learn more!
StreamConsumer
¶
StreamConsumer(
subscriber: Subscriber, *, filter_: Filter | None = None
)
Bases: Generic[T]
Stream consumer interface.
This interface acts as an iterator that will yield items from the stream until the stream is closed.
Note
The StreamConsumer
can
be used as a context manager.
Tip
This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately.
consumer = StreamConsumer[str](...)
reveal_type(consumer.next())
# ProxyOr[str]
reveal_type(consumer.next_object())
# str
Any
.
Warning
If you encounter unexpected
ProxyResolveMissingKeyError
errors, it may be due to proxies from the stream being resolved
multiple times but the first resolve triggered an eviction
of the underlying data. If this is the case, confirm that the
setting of the evict
flag on
StreamProducer.send()
is set correctly and the there is not code incidentally resolving
proxies before you expect.
Warning
The consumer is not thread-safe.
Attributes:
-
subscriber
–Subscriber interface.
Parameters:
-
subscriber
(Subscriber
) –Object which implements one of the
Subscriber
protocols. Used to listen for new event messages indicating new objects in the stream. -
filter_
(Filter | None
, default:None
) –Optional filter to apply to event metadata received from the stream. If the filter returns
True
, the event will be dropped (i.e., not yielded back to the user), and the object associated with that event will be deleted if theevict
flag was set on the producer side.
Source code in proxystore/stream/_consumer.py
__iter__
¶
Return an iterator that will yield stream items.
The return type of items is based on that returned by
next()
.
close
¶
Close the consumer.
Warning
By default, this will close the
Subscriber
interface,
but will not close the Store
interfaces.
Parameters:
-
stores
(bool
, default:False
) –Close and unregister the
Store
instances used to resolve objects consumed from the stream. -
subscriber
(bool
, default:True
) –Close the
Subscriber
interface.
Source code in proxystore/stream/_consumer.py
iter_with_metadata
¶
Create an iterator that yields tuples of metadata and items.
The return type of items is based on that returned by
next()
.
This is different from iter(consumer)
which will yield only items,
dropping the metadata.
Source code in proxystore/stream/_consumer.py
iter_objects
¶
iter_objects() -> Generator[T, None, None]
Create an iterator that yields objects from the stream.
iter_objects_with_metadata
¶
Create an iterator that yields tuples of metadata and objects.
Source code in proxystore/stream/_consumer.py
next
¶
next() -> ProxyOr[T]
Get the next item in the stream.
Note
This method has the potential side effect of initializing and
globally registering a new Store
instance. This will happen at most once per topic because the
producer can map topic names to
Store
instances. This class will
keep track of the Store
instances
used by the stream and will close and unregister them when this
class is closed.
Returns:
-
ProxyOr[T]
–Proxy[T]
is returned if the topic was associated with aStore
in theStreamProducer
otherwiseT
is returned.
Raises:
-
StopIteration
–When an end of stream event is received from a producer. Note that this does not call
close()
.
Source code in proxystore/stream/_consumer.py
next_with_metadata
¶
Get the next item with metadata in the stream.
Note
This method has the same potential side effects as and return type
as next()
.
Returns:
-
dict[str, Any]
–Dictionary of user-provided metadata associated with the object.
-
ProxyOr[T]
–Proxy of the next object in the stream.
Raises:
-
StopIteration
–When an end of stream event is received from a producer. Note that this does not call
close()
.
Source code in proxystore/stream/_consumer.py
next_object
¶
Get the next object in the stream.
Note
This method has the same potential side effects as
next()
.
Raises:
-
StopIteration
–When an end of stream event is received from a producer. Note that this does not call
close()
. -
ValueError
–If the store does not return an object using the key included in the object's event metadata.
Source code in proxystore/stream/_consumer.py
next_object_with_metadata
¶
Get the next object with metadata in the stream.
Note
This method has the same potential side effects as
next()
.
Returns:
-
dict[str, Any]
–Dictionary of user-provided metadata associated with the object.
-
T
–Next object in the stream.
Raises:
-
StopIteration
–When an end of stream event is received from a producer. Note that this does not call
close()
.
Source code in proxystore/stream/_consumer.py
StreamProducer
¶
StreamProducer(
publisher: Publisher,
*,
aggregator: Callable[[list[T]], T] | None = None,
batch_size: int = 1,
default_store: Store[Any] | None = None,
filter_: Filter | None = None,
stores: Mapping[str, Store[Any] | None] | None = None
)
Bases: Generic[T]
Stream producer interface.
This interface enables streaming objects in a manner which decouples
bulk object transfer from event notifications. Topics can be associated
with a Store
which will be used for bulk
object storage and communication. Event metadata, including the key to
the object in the store, is communicated through a message broker using
the Publisher
and
Subscriber
interfaces.
The associated StreamConsumer
can
be used to iterate on proxies of objects from the stream.
This interface can also be used without a
Store
in which case the object is
included in the event metadata and communicated directly through the
message broker.
Note
The StreamProducer
can
be used as a context manager.
Tip
This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately. This is useful for enabling a static type checker to validate that the correct object types are published to the stream.
Warning
The producer is not thread-safe.
Attributes:
-
publisher
–Publisher interface.
Parameters:
-
publisher
(Publisher
) –Object which implements one of the
Publisher
protocols. Used to publish event messages when new objects are added to the stream. -
aggregator
(Callable[[list[T]], T] | None
, default:None
) –Optional aggregator which takes as input the batch of objects and returns a single object of the same type when invoked. The size of the batch passed to the aggregator is controlled by the
batch_size
parameter. When aggregation is used, the metadata associated with the aggregated object will be the union of each metadata dict from each object in the batch. -
batch_size
(int
, default:1
) –Batch size used for batching and aggregation.
-
default_store
(Store[Any] | None
, default:None
) –Specify the default
Store
to be used with topics not explicitly set instores
. If no default is provided, objects are included directly in the event. -
filter_
(Filter | None
, default:None
) –Optional filter to apply prior to sending objects to the stream. If the filter returns
True
for a given object's metadata, the object will not be sent to the stream. The filter is applied before aggregation or batching. -
stores
(Mapping[str, Store[Any] | None] | None
, default:None
) –Mapping from topic names to an optional
Store
instance used to store and communicate serialized objects streamed to that topic. If the value associated with a topic isNone
, the object is included directly in the event.
Source code in proxystore/stream/_producer.py
close
¶
Close the producer.
Warning
Objects buffered in an incomplete batch will be lost. Call
flush()
to ensure
that all objects are sent before closing, or pass a list of
topics to flush and close.
Warning
By default, this will close the
Publisher
interface,
but will not close the Store
interfaces.
Parameters:
-
topics
(Iterable[str]
, default:()
) –Topics to send end of stream events to. Equivalent to calling
close_topics()
first. -
publisher
(bool
, default:True
) –Close the
Publisher
interface. -
stores
(bool
, default:False
) –Close and unregister the
Store
instances.
Source code in proxystore/stream/_producer.py
close_topics
¶
close_topics(*topics: str) -> None
Send an end of stream event to each topic.
A StreamConsumer
will raise a StopIteration
exception when an
end of stream event is received. The end of stream event is still
ordered, however, so all prior sent events will be consumed first
before the end of stream event is propagated.
Note
This will flush the topic buffer.
Parameters:
-
topics
(str
, default:()
) –Topics to send end of stream events to.
Source code in proxystore/stream/_producer.py
flush
¶
flush_topic
¶
flush_topic(topic: str) -> None
Flush the batch buffer for a topic.
This method:
- Pops the current batch of objects off the topic buffer.
- Applies the aggregator to the batch if one was provided.
- Puts the batch of objects in the
Store
. - Creates a new batch event using the keys returned by the store and additional metadata.
- Publishes the event to the stream via the
Publisher
.
Parameters:
-
topic
(str
) –Topic to flush.
Source code in proxystore/stream/_producer.py
send
¶
Send an item to the stream.
This method:
- Applies the filter to the metadata associated with this event,
skipping streaming this object if the filter returns
True
. - Adds the object to the internal event buffer for this topic.
- Flushes the event buffer once the batch size is reached.
Warning
Careful consideration should be given to the setting of the
evict
flag. When set to True
, the corresponding proxy
yielded by the consumer of the stream will only be resolvable
once. If you encounter unexpected
ProxyResolveMissingKeyError
errors, it may be due to proxies from the stream being resolved
multiple times but the first resolve triggered an eviction
of the underlying data.
Parameters:
-
topic
(str
) –Stream topic to send the object to.
-
obj
(T
) –Object to send via the stream.
-
evict
(bool
, default:True
) –Evict the object from the
Store
once the object is consumed by aStreamConsumer
. Set toFalse
if a single object in the stream will be consumed by multiple consumers. Note that when set toFalse
, data eviction must be handled manually. This parameter is ignored if aStore
is not mapped to this topic. -
metadata
(dict[str, Any] | None
, default:None
) –Dictionary containing metadata about the object. This can be used by the producer or consumer to filter new object events. The default value
None
is replaced with an emptydict
.
Raises:
-
TopicClosedError
–If the
topic
has already been closed viaclose_topics()
. -
ValueError
–If a store associated with
topic
is not found in the mapping of topics to stores nor a default store is provided.