Skip to content

proxystore.stream.events

Event metadata type.

Warning

Event types are not considered as part of the public API and may change at any time without warning. Events are created and consumed internally by the StreamProducer and StreamConsumer and never exposed to client code.

Event module-attribute

Event union type.

EndOfStreamEvent dataclass

EndOfStreamEvent()

End of stream event.

from_dict classmethod

from_dict(data: dict[str, Any]) -> EndOfStreamEvent

Create a new event instance from its dictionary representation.

Source code in proxystore/stream/events.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EndOfStreamEvent:
    """Create a new event instance from its dictionary representation."""
    return cls()

NewObjectEvent dataclass

NewObjectEvent(
    key_type: str,
    raw_key: list[Any],
    evict: bool,
    metadata: dict[str, Any],
)

New object in stream event metadata.

from_dict classmethod

from_dict(data: dict[str, Any]) -> NewObjectEvent

Create a new event instance from its dictionary representation.

Source code in proxystore/stream/events.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> NewObjectEvent:
    """Create a new event instance from its dictionary representation."""
    return NewObjectEvent(**data)

from_key classmethod

from_key(
    key: tuple[Any, ...],
    evict: bool,
    metadata: dict[str, Any],
) -> NewObjectEvent

Create a new event from a key and metadata.

Source code in proxystore/stream/events.py
@classmethod
def from_key(
    cls,
    key: tuple[Any, ...],
    evict: bool,
    metadata: dict[str, Any],
) -> NewObjectEvent:
    """Create a new event from a key and metadata."""
    return cls(
        key_type=get_object_path(type(key)),
        raw_key=list(key),
        evict=evict,
        metadata=metadata,
    )

get_key

get_key() -> Any

Get the object key associated with the event.

Source code in proxystore/stream/events.py
def get_key(self) -> Any:
    """Get the object key associated with the event."""
    key_type = import_from_path(self.key_type)
    return key_type(*self.raw_key)

EventBatch dataclass

EventBatch(
    events: list[Event],
    topic: str,
    store_config: StoreConfig,
)

Batch of stream events.

from_dict classmethod

from_dict(data: dict[str, Any]) -> EventBatch

Create a new event instance from its dictionary representation.

Source code in proxystore/stream/events.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EventBatch:
    """Create a new event instance from its dictionary representation."""
    events = [dict_to_event(d) for d in data['events']]
    return cls(
        events=events,  # type: ignore[arg-type]
        topic=data['topic'],
        store_config=StoreConfig(**data['store_config']),
    )

event_to_dict

event_to_dict(event: Event | EventBatch) -> dict[str, Any]

Convert event to dict.

Source code in proxystore/stream/events.py
def event_to_dict(event: Event | EventBatch) -> dict[str, Any]:
    """Convert event to dict."""
    if isinstance(event, EventBatch):
        data = {
            'events': [event_to_dict(e) for e in event.events],
            'topic': event.topic,
            'store_config': event.store_config.model_dump(),
        }
    else:
        data = dataclasses.asdict(event)
    data['event_type'] = _EventMapping(type(event)).name
    return data

dict_to_event

dict_to_event(data: dict[str, Any]) -> Event | EventBatch

Convert dict to event.

Source code in proxystore/stream/events.py
def dict_to_event(data: dict[str, Any]) -> Event | EventBatch:
    """Convert dict to event."""
    event_type = data.pop('event_type')
    event = _EventMapping[event_type].value.from_dict(data)
    return event

event_to_bytes

event_to_bytes(event: Event | EventBatch) -> bytes

Convert event to byte-string.

Source code in proxystore/stream/events.py
def event_to_bytes(event: Event | EventBatch) -> bytes:
    """Convert event to byte-string."""
    data = event_to_dict(event)
    return json.dumps(data).encode()

bytes_to_event

bytes_to_event(s: bytes) -> Event | EventBatch

Convert byte-string to event.

Source code in proxystore/stream/events.py
def bytes_to_event(s: bytes) -> Event | EventBatch:
    """Convert byte-string to event."""
    data = json.loads(s.decode())
    return dict_to_event(data)