Skip to content

proxystore.stream.shims.queue

Python queue-based pub/sub implementation.

Warning

This implementation is meant for streaming between Python threads within the same process, or between Python processes on the same machine. Each queue topic may only have one subscriber.

QueuePublisher

QueuePublisher(
    queues: Mapping[str, Queue[bytes] | Queue[bytes]],
    *,
    block: bool = True,
    timeout: float | None = None
)

Publisher built on Python queues.

Warning

Each topic can only have one subscriber.

Parameters:

  • queues (Mapping[str, Queue[bytes] | Queue[bytes]]) –

    Mapping of topic name to Python queue.

  • block (bool, default: True ) –

    Block until a free slot is available when sending a new message to the queue.

  • timeout (float | None, default: None ) –

    Block at most timeout seconds.

Source code in proxystore/stream/shims/queue.py
def __init__(
    self,
    queues: Mapping[
        str,
        multiprocessing.Queue[bytes] | queue.Queue[bytes],
    ],
    *,
    block: bool = True,
    timeout: float | None = None,
) -> None:
    self._queues = queues
    self._block = block
    self._timeout = timeout

close

close() -> None

Close this publisher.

Source code in proxystore/stream/shims/queue.py
def close(self) -> None:
    """Close this publisher."""
    for q in self._queues.values():
        if isinstance(q, multiprocessing.queues.Queue):
            q.close()

send_message

send_message(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Raises:

  • ValueError

    if a queue with the name topic does not exist.

Source code in proxystore/stream/shims/queue.py
def send_message(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.

    Raises:
        ValueError: if a queue with the name `topic` does not exist.
    """
    if topic not in self._queues:
        raise ValueError(f'Unknown topic "{topic}".')
    self._queues[topic].put(
        message,
        block=self._block,
        timeout=self._timeout,
    )

QueueSubscriber

QueueSubscriber(
    queue: Queue[bytes] | Queue[bytes],
    *,
    block: bool = True,
    timeout: float | None = None
)

Subscriber to a QueuePublisher topic.

Warning

Each topic can only have one subscriber.

Parameters:

  • queue (Queue[bytes] | Queue[bytes]) –

    Queue shared with the QueuePublisher to pull messages from.

  • block (bool, default: True ) –

    Block until the next message is available in the queue.

  • timeout (float | None, default: None ) –

    Block at most timeout seconds.

Source code in proxystore/stream/shims/queue.py
def __init__(
    self,
    queue: multiprocessing.Queue[bytes] | queue.Queue[bytes],
    *,
    block: bool = True,
    timeout: float | None = None,
) -> None:
    self._queue = queue
    self._block = block
    self._timeout = timeout

next_message

next_message() -> bytes

Get the next message.

Source code in proxystore/stream/shims/queue.py
def next_message(self) -> bytes:
    """Get the next message."""
    try:
        message = self._queue.get(block=self._block, timeout=self._timeout)
    except ValueError:
        raise StopIteration from None

    return message

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/shims/queue.py
def close(self) -> None:
    """Close this subscriber."""
    pass