Skip to content

proxystore.stream.shims.kafka

Kafka publisher and subscriber shims.

Shims to the confluent-kafka package.

KafkaPublisher

KafkaPublisher(client: Producer)

Kafka publisher shim.

Parameters:

  • client (Producer) –

    Kafka producer client.

Source code in proxystore/stream/shims/kafka.py
def __init__(self, client: confluent_kafka.Producer) -> None:
    self.client = client

close

close() -> None

Close this publisher.

Source code in proxystore/stream/shims/kafka.py
def close(self) -> None:
    """Close this publisher."""
    self.client.flush()

send_message

send_message(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/shims/kafka.py
def send_message(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    self.client.produce(topic, message)
    self.client.flush()

KafkaSubscriber

KafkaSubscriber(client: Consumer)

Kafka subscriber shim.

This shim is an iterable object which will yield bytes messages from the stream, blocking on the next message, until the stream is closed.

Parameters:

  • client (Consumer) –

    Kafka consumer client. The client must already be subscribed to the relevant topics.

Source code in proxystore/stream/shims/kafka.py
def __init__(self, client: confluent_kafka.Consumer) -> None:
    self.client = client

next_message

next_message() -> bytes

Get the next message.

Source code in proxystore/stream/shims/kafka.py
def next_message(self) -> bytes:
    """Get the next message."""
    message = self.client.poll()
    # Should not be None because we do not specify a poll in timeout.
    assert message is not None
    if message.error() is not None:  # pragma: no cover
        raise confluent_kafka.KafkaException(message.error())
    return message.value()

close

close() -> None

Close this subscriber.

Source code in proxystore/stream/shims/kafka.py
def close(self) -> None:
    """Close this subscriber."""
    self.client.close()