proxystore.stream¶
Streaming interface.
Warning
The streaming interfaces are experimental and may change in future releases.
Tip
Checkout the Streaming Guide to learn more!
StreamConsumer(
    subscriber: Subscriber, *, filter_: Filter | None = None
)
              Bases: Generic[T]
Stream consumer interface.
This interface acts as an iterator that will yield items from the stream until the stream is closed.
Note
The StreamConsumer can
be used as a context manager.
Tip
This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately.
consumer = StreamConsumer[str](...)
reveal_type(consumer.next())
# ProxyOr[str]
reveal_type(consumer.next_object())
# str
Any.
Warning
If you encounter unexpected
ProxyResolveMissingKeyError
errors, it may be due to proxies from the stream being resolved
multiple times but the first resolve triggered an eviction
of the underlying data. If this is the case, confirm that the
setting of the evict flag on
StreamProducer.send()
is set correctly and the there is not code incidentally resolving
proxies before you expect.
Warning
The consumer is not thread-safe.
Attributes:
- 
          subscriber–Subscriber interface. 
Parameters:
- 
            subscriber(Subscriber) –Object which implements one of the Subscriberprotocols. Used to listen for new event messages indicating new objects in the stream.
- 
            filter_(Filter | None, default:None) –Optional filter to apply to event metadata received from the stream. If the filter returns True, the event will be dropped (i.e., not yielded back to the user), and the object associated with that event will be deleted if theevictflag was set on the producer side.
Source code in proxystore/stream/_consumer.py
                    
    Return an iterator that will yield stream items.
The return type of items is based on that returned by
next().
    Close the consumer.
Warning
By default, this will close the
Subscriber interface,
but will not close the Store
interfaces.
Parameters:
- 
            stores(bool, default:False) –Close and unregister the Storeinstances used to resolve objects consumed from the stream.
- 
            subscriber(bool, default:True) –Close the Subscriberinterface.
Source code in proxystore/stream/_consumer.py
              
    Create an iterator that yields tuples of metadata and items.
The return type of items is based on that returned by
next().
This is different from iter(consumer) which will yield only items,
dropping the metadata.
Source code in proxystore/stream/_consumer.py
              
iter_objects() -> Generator[T, None, None]
Create an iterator that yields objects from the stream.
    Create an iterator that yields tuples of metadata and objects.
Source code in proxystore/stream/_consumer.py
              
            
next() -> ProxyOr[T]
Get the next item in the stream.
Note
This method has the potential side effect of initializing and
globally registering a new Store
instance. This will happen at most once per topic because the
producer can map topic names to
Store instances. This class will
keep track of the Store instances
used by the stream and will close and unregister them when this
class is closed.
Returns:
- 
              ProxyOr[T]–Proxy[T]is returned if the topic was associated with aStorein theStreamProducerotherwiseTis returned.
Raises:
- 
              StopIteration–When an end of stream event is received from a producer. Note that this does not call close().
Source code in proxystore/stream/_consumer.py
              
    Get the next item with metadata in the stream.
Note
This method has the same potential side effects as and return type
as next().
Returns:
- 
              dict[str, Any]–Dictionary of user-provided metadata associated with the object. 
- 
              ProxyOr[T]–Proxy of the next object in the stream. 
Raises:
- 
              StopIteration–When an end of stream event is received from a producer. Note that this does not call close().
Source code in proxystore/stream/_consumer.py
              
    Get the next object in the stream.
Note
This method has the same potential side effects as
next().
Raises:
- 
              StopIteration–When an end of stream event is received from a producer. Note that this does not call close().
- 
              ValueError–If the store does not return an object using the key included in the object's event metadata. 
Source code in proxystore/stream/_consumer.py
              
    Get the next object with metadata in the stream.
Note
This method has the same potential side effects as
next().
Returns:
- 
              dict[str, Any]–Dictionary of user-provided metadata associated with the object. 
- 
              T–Next object in the stream. 
Raises:
- 
              StopIteration–When an end of stream event is received from a producer. Note that this does not call close().
Source code in proxystore/stream/_consumer.py
              
StreamProducer(
    publisher: Publisher,
    *,
    aggregator: Callable[[list[T]], T] | None = None,
    batch_size: int = 1,
    default_store: Store[Any] | None = None,
    filter_: Filter | None = None,
    stores: Mapping[str, Store[Any] | None] | None = None
)
              Bases: Generic[T]
Stream producer interface.
This interface enables streaming objects in a manner which decouples
bulk object transfer from event notifications. Topics can be associated
with a Store which will be used for bulk
object storage and communication. Event metadata, including the key to
the object in the store, is communicated through a message broker using
the Publisher and
Subscriber interfaces.
The associated StreamConsumer can
be used to iterate on proxies of objects from the stream.
This interface can also be used without a
Store in which case the object is
included in the event metadata and communicated directly through the
message broker.
Note
The StreamProducer can
be used as a context manager.
Tip
This class is generic, so it is recommended that the type of objects in the stream be annotated appropriately. This is useful for enabling a static type checker to validate that the correct object types are published to the stream.
Warning
The producer is not thread-safe.
Attributes:
- 
          publisher–Publisher interface. 
Parameters:
- 
            publisher(Publisher) –Object which implements one of the Publisherprotocols. Used to publish event messages when new objects are added to the stream.
- 
            aggregator(Callable[[list[T]], T] | None, default:None) –Optional aggregator which takes as input the batch of objects and returns a single object of the same type when invoked. The size of the batch passed to the aggregator is controlled by the batch_sizeparameter. When aggregation is used, the metadata associated with the aggregated object will be the union of each metadata dict from each object in the batch.
- 
            batch_size(int, default:1) –Batch size used for batching and aggregation. 
- 
            default_store(Store[Any] | None, default:None) –Specify the default Storeto be used with topics not explicitly set instores. If no default is provided, objects are included directly in the event.
- 
            filter_(Filter | None, default:None) –Optional filter to apply prior to sending objects to the stream. If the filter returns Truefor a given object's metadata, the object will not be sent to the stream. The filter is applied before aggregation or batching.
- 
            stores(Mapping[str, Store[Any] | None] | None, default:None) –Mapping from topic names to an optional Storeinstance used to store and communicate serialized objects streamed to that topic. If the value associated with a topic isNone, the object is included directly in the event.
Source code in proxystore/stream/_producer.py
                    
    Close the producer.
Warning
Objects buffered in an incomplete batch will be lost. Call
flush() to ensure
that all objects are sent before closing, or pass a list of
topics to flush and close.
Warning
By default, this will close the
Publisher interface,
but will not close the Store
interfaces.
Parameters:
- 
            topics(Iterable[str], default:()) –Topics to send end of stream events to. Equivalent to calling close_topics()first.
- 
            publisher(bool, default:True) –Close the Publisherinterface.
- 
            stores(bool, default:False) –Close and unregister the Storeinstances.
Source code in proxystore/stream/_producer.py
              
close_topics(*topics: str) -> None
Send an end of stream event to each topic.
A StreamConsumer
will raise a StopIteration exception when an
end of stream event is received. The end of stream event is still
ordered, however, so all prior sent events will be consumed first
before the end of stream event is propagated.
Note
This will flush the topic buffer.
Parameters:
- 
            topics(str, default:()) –Topics to send end of stream events to. 
Source code in proxystore/stream/_producer.py
              
    
flush_topic(topic: str) -> None
Flush the batch buffer for a topic.
This method:
- Pops the current batch of objects off the topic buffer.
- Applies the aggregator to the batch if one was provided.
- Puts the batch of objects in the
   Store.
- Creates a new batch event using the keys returned by the store and additional metadata.
- Publishes the event to the stream via the
   Publisher.
Parameters:
- 
            topic(str) –Topic to flush. 
Source code in proxystore/stream/_producer.py
              
    Send an item to the stream.
This method:
- Applies the filter to the metadata associated with this event,
   skipping streaming this object if the filter returns True.
- Adds the object to the internal event buffer for this topic.
- Flushes the event buffer once the batch size is reached.
Warning
Careful consideration should be given to the setting of the
evict flag. When set to True, the corresponding proxy
yielded by the consumer of the stream will only be resolvable
once. If you encounter unexpected
ProxyResolveMissingKeyError
errors, it may be due to proxies from the stream being resolved
multiple times but the first resolve triggered an eviction
of the underlying data.
Parameters:
- 
            topic(str) –Stream topic to send the object to. 
- 
            obj(T) –Object to send via the stream. 
- 
            evict(bool, default:True) –Evict the object from the Storeonce the object is consumed by aStreamConsumer. Set toFalseif a single object in the stream will be consumed by multiple consumers. Note that when set toFalse, data eviction must be handled manually. This parameter is ignored if aStoreis not mapped to this topic.
- 
            metadata(dict[str, Any] | None, default:None) –Dictionary containing metadata about the object. This can be used by the producer or consumer to filter new object events. The default value Noneis replaced with an emptydict.
Raises:
- 
              TopicClosedError–If the topichas already been closed viaclose_topics().
- 
              ValueError–If a store associated with topicis not found in the mapping of topics to stores nor a default store is provided.