Skip to content

proxystore.stream.shims.redis

Redis publisher and subscriber shims.

Shims to the redis-py Publish / Subscribe interface.

RedisPublisher

RedisPublisher(hostname: str, port: int, **kwargs: Any)

Redis pub/sub publisher shim.

Note

In Redis pub/sub, all subscribers will receive all messages, and messages will be dropped if no subscribers are present. The RedisQueuePublisher provides message persistence and single consumption messages.

Parameters:

  • hostname (str) –

    Redis server hostname.

  • port (int) –

    Redis server port.

  • kwargs (Any, default: {} ) –

    Extra keyword arguments to pass to redis.Redis().

Source code in proxystore/stream/shims/redis.py
def __init__(
    self,
    hostname: str,
    port: int,
    **kwargs: Any,
) -> None:
    self._redis_client = redis.StrictRedis(
        host=hostname,
        port=port,
        **kwargs,
    )

close

close() -> None

Close this publisher.

Source code in proxystore/stream/shims/redis.py
def close(self) -> None:
    """Close this publisher."""
    self._redis_client.close()

send

send(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/shims/redis.py
def send(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    self._redis_client.publish(topic, message)

RedisSubscriber

RedisSubscriber(
    hostname: str,
    port: int,
    topic: str | Sequence[str],
    **kwargs: Any
)

Redis pub/sub subscriber shim.

This shim is an iterable object which will yield bytes messages from the stream, blocking on the next message, until the stream is closed.

Parameters:

  • hostname (str) –

    Redis server hostname.

  • port (int) –

    Redis server port.

  • topic (str | Sequence[str]) –

    Topic or sequence of topics to subscribe to.

  • kwargs (Any, default: {} ) –

    Extra keyword arguments to pass to redis.Redis().

Source code in proxystore/stream/shims/redis.py
def __init__(
    self,
    hostname: str,
    port: int,
    topic: str | Sequence[str],
    **kwargs: Any,
) -> None:
    self._redis_client = redis.StrictRedis(
        host=hostname,
        port=port,
        **kwargs,
    )
    self._topics = [topic] if isinstance(topic, str) else topic
    self._pubsub_client = self._redis_client.pubsub()
    self._pubsub_client.subscribe(*self._topics)

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/shims/redis.py
def close(self) -> None:
    """Close this subscriber."""
    self._pubsub_client.unsubscribe()
    self._pubsub_client.close()
    self._redis_client.close()

RedisQueuePublisher

RedisQueuePublisher(
    hostname: str, port: int, **kwargs: Any
)

Redis queue publisher shim.

Note

Only a single subscriber will be able to read each message sent to the queue. The RedisPublisher uses pub/sub and supports broadcasting messages to all active subscribers.

Parameters:

  • hostname (str) –

    Redis server hostname.

  • port (int) –

    Redis server port.

  • kwargs (Any, default: {} ) –

    Extra keyword arguments to pass to redis.Redis().

Source code in proxystore/stream/shims/redis.py
def __init__(
    self,
    hostname: str,
    port: int,
    **kwargs: Any,
) -> None:
    self._redis_client = redis.StrictRedis(
        host=hostname,
        port=port,
        **kwargs,
    )

close

close() -> None

Close this publisher.

Source code in proxystore/stream/shims/redis.py
def close(self) -> None:
    """Close this publisher."""
    self._redis_client.close()

send

send(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/shims/redis.py
def send(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    self._redis_client.rpush(topic, message)

RedisQueueSubscriber

RedisQueueSubscriber(
    hostname: str,
    port: int,
    topic: str,
    *,
    timeout: int | None = None,
    **kwargs: Any
)

Redis queue subscriber shim.

This shim is an iterable object which will yield bytes messages from the queue, blocking on the next message, forever.

Parameters:

  • hostname (str) –

    Redis server hostname.

  • port (int) –

    Redis server port.

  • topic (str) –

    Topic to subscribe to (I.e., the name of the key corresponding to a Redis list).

  • timeout (int | None, default: None ) –

    Timeout for waiting on the next item. If None, the timeout will be set to one second but will loop indefinitely.

  • kwargs (Any, default: {} ) –

    Extra keyword arguments to pass to redis.Redis().

Source code in proxystore/stream/shims/redis.py
def __init__(
    self,
    hostname: str,
    port: int,
    topic: str,
    *,
    timeout: int | None = None,
    **kwargs: Any,
) -> None:
    self._redis_client = redis.StrictRedis(
        host=hostname,
        port=port,
        **kwargs,
    )
    self._topic = topic
    self._timeout = timeout

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/shims/redis.py
def close(self) -> None:
    """Close this subscriber."""
    self._redis_client.close()