Skip to content

proxystore.stream.events

Event types.

Event module-attribute

Event union type.

EndOfStreamEvent dataclass

EndOfStreamEvent(topic: str)

End of stream event.

from_dict classmethod

from_dict(data: dict[str, Any]) -> EndOfStreamEvent

Create a new event instance from its dictionary representation.

Source code in proxystore/stream/events.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EndOfStreamEvent:
    """Create a new event instance from its dictionary representation."""
    return cls(**data)

NewObjectEvent dataclass

NewObjectEvent(
    topic: str, obj: Any, metadata: dict[str, Any]
)

New object in stream event.

from_dict classmethod

from_dict(data: dict[str, Any]) -> NewObjectEvent

Create a new event instance from its dictionary representation.

Source code in proxystore/stream/events.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> NewObjectEvent:
    """Create a new event instance from its dictionary representation."""
    return NewObjectEvent(**data)

NewObjectKeyEvent dataclass

NewObjectKeyEvent(
    topic: str,
    key_type: str,
    raw_key: list[Any],
    evict: bool,
    metadata: dict[str, Any],
    store_config: StoreConfig,
)

New object key in stream event.

from_dict classmethod

from_dict(data: dict[str, Any]) -> NewObjectKeyEvent

Create a new event instance from its dictionary representation.

Source code in proxystore/stream/events.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> NewObjectKeyEvent:
    """Create a new event instance from its dictionary representation."""
    return NewObjectKeyEvent(**data)

from_key classmethod

from_key(
    key: tuple[Any, ...],
    *,
    evict: bool,
    metadata: dict[str, Any],
    store_config: StoreConfig,
    topic: str
) -> NewObjectKeyEvent

Create a new event from a key and metadata.

Source code in proxystore/stream/events.py
@classmethod
def from_key(
    cls,
    key: tuple[Any, ...],
    *,
    evict: bool,
    metadata: dict[str, Any],
    store_config: StoreConfig,
    topic: str,
) -> NewObjectKeyEvent:
    """Create a new event from a key and metadata."""
    return cls(
        topic=topic,
        key_type=get_object_path(type(key)),
        raw_key=list(key),
        evict=evict,
        metadata=metadata,
        store_config=store_config,
    )

get_key

get_key() -> Any

Get the object key associated with the event.

Source code in proxystore/stream/events.py
def get_key(self) -> Any:
    """Get the object key associated with the event."""
    key_type = import_from_path(self.key_type)
    return key_type(*self.raw_key)

EventBatch dataclass

EventBatch(topic: str, events: list[Event])

Batch of stream events.

Warning

All events must be for the same topic.

from_dict classmethod

from_dict(data: dict[str, Any]) -> EventBatch

Create a new event instance from its dictionary representation.

Source code in proxystore/stream/events.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EventBatch:
    """Create a new event instance from its dictionary representation."""
    events = [dict_to_event(d) for d in data['events']]
    return cls(events=events, topic=data['topic'])  # type: ignore[arg-type]

event_to_dict

event_to_dict(event: Event | EventBatch) -> dict[str, Any]

Convert event to dict.

Source code in proxystore/stream/events.py
def event_to_dict(event: Event | EventBatch) -> dict[str, Any]:
    """Convert event to dict."""
    if isinstance(event, EventBatch):
        data = {
            'events': [event_to_dict(e) for e in event.events],
            'topic': event.topic,
        }
    else:
        data = dataclasses.asdict(event)
    data['event_type'] = _EventMapping(type(event)).name
    return data

dict_to_event

dict_to_event(data: dict[str, Any]) -> Event | EventBatch

Convert dict to event.

Source code in proxystore/stream/events.py
def dict_to_event(data: dict[str, Any]) -> Event | EventBatch:
    """Convert dict to event."""
    event_type = data.pop('event_type')
    event = _EventMapping[event_type].value.from_dict(data)
    return event

event_to_bytes

event_to_bytes(event: Event | EventBatch) -> bytes

Convert event to byte-string.

Source code in proxystore/stream/events.py
def event_to_bytes(event: Event | EventBatch) -> bytes:
    """Convert event to byte-string."""
    return serialize(event_to_dict(event))

bytes_to_event

bytes_to_event(s: bytes) -> Event | EventBatch

Convert byte-string to event.

Source code in proxystore/stream/events.py
def bytes_to_event(s: bytes) -> Event | EventBatch:
    """Convert byte-string to event."""
    return dict_to_event(deserialize(s))