Skip to content

proxystore.stream.shims.kafka

Kafka publisher and subscriber shims.

Shims to the confluent-kafka package.

KafkaPublisher

KafkaPublisher(client: Producer)

Kafka publisher shim.

Parameters:

  • client (Producer) –

    Kafka producer client.

Source code in proxystore/stream/shims/kafka.py
def __init__(self, client: confluent_kafka.Producer) -> None:
    self.client = client

close()

close() -> None

Close this publisher.

Source code in proxystore/stream/shims/kafka.py
def close(self) -> None:
    """Close this publisher."""
    self.client.flush()

send()

send(topic: str, message: bytes) -> None

Publish a message to the stream.

Parameters:

  • topic (str) –

    Stream topic to publish message to.

  • message (bytes) –

    Message as bytes to publish to the stream.

Source code in proxystore/stream/shims/kafka.py
def send(self, topic: str, message: bytes) -> None:
    """Publish a message to the stream.

    Args:
        topic: Stream topic to publish message to.
        message: Message as bytes to publish to the stream.
    """
    self.client.produce(topic, message)
    self.client.flush()

KafkaSubscriber

KafkaSubscriber(client: Consumer)

Kafka subscriber shim.

This shim is an iterable object which will yield bytes messages from the stream, blocking on the next message, until the stream is closed.

Parameters:

  • client (Consumer) –

    Kafka consumer client. The client must already be subscribed to the relevant topics.

Source code in proxystore/stream/shims/kafka.py
def __init__(self, client: confluent_kafka.Consumer) -> None:
    self.client = client

close()

close() -> None

Close this subscriber.

Source code in proxystore/stream/shims/kafka.py
def close(self) -> None:
    """Close this subscriber."""
    self.client.close()