proxystore.stream.interface¶
Stream producer and consumer interfaces.
Note
The StreamProducer
and StreamConsumer
are re-exported in proxystore.stream
for
convenience.
StreamProducer
¶
StreamProducer(
publisher: Publisher,
stores: Mapping[str | None, Store[Any]],
*,
aggregator: Callable[[list[T]], T] | None = None,
batch_size: int = 1,
filter_: Filter | None = None
)
Bases: Generic[T]
Proxy stream producer interface.
Note
The StreamProducer
can
be used as a context manager.
Warning
The producer is not thread-safe.
Tip
This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately. This is useful for enabling a static type checker to validate that the correct object types are published to the stream.
Attributes:
-
publisher
–Publisher interface.
Parameters:
-
publisher
(Publisher
) –Object which implements the
Publisher
protocol. Used to publish event messages when new objects are added to the stream. -
stores
(Mapping[str | None, Store[Any]]
) – -
aggregator
(Callable[[list[T]], T] | None
, default:None
) –Optional aggregator which takes as input the batch of objects and returns a single object of the same type when invoked. The size of the batch passed to the aggregator is controlled by the
batch_size
parameter. When aggregation is used, the metadata associated with the aggregated object will be the union of each metadata dict from each object in the batch. -
batch_size
(int
, default:1
) –Batch size used for batching and aggregation.
-
filter_
(Filter | None
, default:None
) –Optional filter to apply prior to sending objects to the stream. If the filter returns
True
for a given object's metadata, the object will not be sent to the stream. The filter is applied before aggregation or batching.
Source code in proxystore/stream/interface.py
close
¶
Close the producer.
Warning
Objects buffered in an incomplete batch will be lost. Call
flush()
to ensure
that all objects are sent before closing.
Warning
By default, this will close the
Publisher
interface,
but will not close the Store
interfaces.
Parameters:
-
topics
(Iterable[str]
, default:()
) –Topics to send end of stream events to. Equivalent to calling
close_topics()
first. -
publisher
(bool
, default:True
) –Close the
Publisher
interface. -
stores
(bool
, default:False
) –Close and unregister the
Store
instances.
Source code in proxystore/stream/interface.py
close_topics
¶
close_topics(*topics: str) -> None
Send publish an end of stream event to each topic.
A StreamConsumer
will raise a StopIteration
exception when an
end of stream event is received. The end of stream event is still
ordered, however, so all prior sent events will be consumed first
before the end of stream event is propagated.
Note
This will flush the topic buffer.
Parameters:
-
topics
(str
, default:()
) –Topics to send end of stream events to.
Source code in proxystore/stream/interface.py
flush
¶
flush_topic
¶
flush_topic(topic: str) -> None
Flush the batch buffer for a topic.
This method:
- Pops the current batch of objects off the topic buffer.
- Applies the aggregator to the batch if one was provided.
- Puts the batch of objects in the
Store
. - Creates a new batch event using the keys returned by the store and additional metadata.
- Publishes the event to the stream via the
Publisher
.
Parameters:
-
topic
(str
) –Topic to flush.
if a store associated with topic
is not found
in the mapping of topics to stores nor a default store is provided.
Source code in proxystore/stream/interface.py
send
¶
Send an item to the stream.
This method:
- Applies the filter to the metadata associated with this event,
skipping streaming this object if the filter returns
True
. - Adds the object to the internal event buffer for this topic.
- Flushes the event buffer once the batch size is reached.
Warning
Careful consideration should be given to the setting of the
evict
flag. When set to True
, the corresponding proxy
yielded by the consumer of the stream will only be resolvable
once. If you encounter unexpected
ProxyResolveMissingKeyError
errors, it may be due to proxies from the stream being resolved
multiple times but the first resolve triggered an eviction
of the underlying data.
Parameters:
-
topic
(str
) –Stream topic to send the object to.
-
obj
(T
) –Object to send via the stream.
-
evict
(bool
, default:True
) –Evict the object from the
Store
once the object is consumed by aStreamConsumer
. Set toFalse
if a single object in the stream will be consumed by multiple consumers. Note that when set toFalse
, data eviction must be handled manually. -
metadata
(dict[str, Any] | None
, default:None
) –Dictionary containing metadata about the object. This can be used by the producer or consumer to filter new object events. The default value
None
is replaced with an emptydict
.
Raises:
-
TopicClosedError
–if the
topic
has already been closed viaclose_topics()
. -
ValueError
–if a store associated with
topic
is not found in the mapping of topics to stores nor a default store is provided.
Source code in proxystore/stream/interface.py
StreamConsumer
¶
StreamConsumer(
subscriber: Subscriber, *, filter_: Filter | None = None
)
Bases: Generic[T]
Proxy stream consumer interface.
This interface acts as an iterator that will yield items from the stream until the stream is closed.
Note
The StreamConsumer
can
be used as a context manager.
Tip
This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately.
If the stream is heterogeneous or objects types are not known ahead of time, it may be appropriate to annotate the stream withAny
.
Warning
If you encounter unexpected
ProxyResolveMissingKeyError
errors, it may be due to proxies from the stream being resolved
multiple times but the first resolve triggered an eviction
of the underlying data. If this is the case, confirm that the
setting of the evict
flag on
StreamProducer.send()
is set correctly and the there is not code incidentally resolving
proxies before you expect.
Note
The consumer is not thread-safe.
Attributes:
-
subscriber
–Subscriber interface.
Parameters:
-
subscriber
(Subscriber
) –Object which implements the
Subscriber
protocol. Used to listen for new event messages indicating new objects in the stream. -
filter_
(Filter | None
, default:None
) –Optional filter to apply to event metadata received from the stream. If the filter returns
True
, the event will be dropped (i.e., not yielded back to the user), and the object associated with that event will be deleted if theevict
flag was set on the producer side.
Source code in proxystore/stream/interface.py
__iter__
¶
close
¶
Close the consumer.
Warning
By default, this will close the
Subscriber
interface,
but will not close the Store
interfaces.
Parameters:
-
stores
(bool
, default:False
) –Close and unregister the
Store
instances used to resolve objects consumed from the stream. -
subscriber
(bool
, default:True
) –Close the
Subscriber
interface.
Source code in proxystore/stream/interface.py
iter_with_metadata
¶
Return an iterator that yields tuples of metadata and proxies.
Note
This is different from iter(consumer)
which will yield
only proxies of objects in the stream.
Source code in proxystore/stream/interface.py
iter_objects
¶
iter_objects() -> Generator[T, None, None]
Return an iterator that yields objects from the stream.
Note
This is different from iter(consumer)
which will yield
proxies of objects in the stream.
Source code in proxystore/stream/interface.py
iter_objects_with_metadata
¶
Return an iterator that yields tuples of metadata and objects.
Note
This is different from iter(consumer)
which will yield
proxies of objects in the stream.
Source code in proxystore/stream/interface.py
next
¶
next() -> Proxy[T]
Return a proxy of the next object in the stream.
Note
This method has the potential side effect of initializing and
globally registering a new Store
instance. This will happen at most once per topic because the
producer can map topic names to
Store
instances. This class will
keep track of the Store
instances
used by the stream and will close and unregister them when this
class is closed.
Raises:
-
StopIteration
–when an end of stream event is received from a producer. Note that this does not call
close()
.
Source code in proxystore/stream/interface.py
next_with_metadata
¶
Return a tuple of metadata and proxy for the next object.
Note
This method has the same potential side effects as
next()
.
Returns:
-
dict[str, Any]
–Dictionary of user-provided metadata associated with the object.
-
Proxy[T]
–Proxy of the next object in the stream.
Raises:
-
StopIteration
–when an end of stream event is received from a producer. Note that this does not call
close()
.
Source code in proxystore/stream/interface.py
next_object
¶
Return the next object in the stream.
Note
This method has the same potential side effects as
next()
.
Raises:
-
StopIteration
–when an end of stream event is received from a producer. Note that this does not call
close()
. -
ValueError
–if the store does not return an object using the key included in the object's event metadata.
Source code in proxystore/stream/interface.py
next_object_with_metadata
¶
Return a tuple of metadata and the next object in the stream.
Note
This method has the same potential side effects as
next()
.
Returns:
-
dict[str, Any]
–Dictionary of user-provided metadata associated with the object.
-
T
–Next object in the stream.
Raises:
-
StopIteration
–when an end of stream event is received from a producer. Note that this does not call
close()
.