CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-producer-consumer.mddocs/

Core Producer and Consumer

The core producer and consumer classes provide fundamental Kafka functionality with high performance through the underlying librdkafka C library. These classes support all Kafka features including transactions, exactly-once semantics, and custom partitioning.

Capabilities

Producer

High-performance Kafka producer with support for asynchronous message delivery, custom partitioning, transactions, and delivery guarantees.

class Producer:
    def __init__(self, conf):
        """
        Create a new Producer instance.
        
        Args:
            conf (dict): Configuration properties for the producer
        """

    def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None):
        """
        Produce message to topic.
        
        Args:
            topic (str): Topic to produce to
            value (bytes, str, optional): Message value
            key (bytes, str, optional): Message key for partitioning
            partition (int, optional): Specific partition (-1 for automatic)
            on_delivery (callable, optional): Delivery report callback
            timestamp (int, optional): Message timestamp (0 for current time)
            headers (dict, optional): Message headers
            
        Raises:
            BufferError: If local producer queue is full
            KafkaException: For other produce errors
        """

    def poll(self, timeout=-1):
        """
        Poll for events and call registered callbacks.
        
        Args:
            timeout (float): Maximum time to wait in seconds (-1 for infinite)
            
        Returns:
            int: Number of events processed
        """

    def flush(self, timeout=-1):
        """
        Wait for all messages to be delivered.
        
        Args:
            timeout (float): Maximum time to wait in seconds (-1 for infinite)
            
        Returns:
            int: Number of messages still in queue (0 on success)
        """

    def purge(self, in_queue=True, in_flight=True, blocking=True):
        """
        Purge messages from internal queues.
        
        Args:
            in_queue (bool): Purge messages in local queue
            in_flight (bool): Purge messages in flight to broker
            blocking (bool): Block until purge is complete
            
        Returns:
            int: Number of messages purged
        """

    def abort_transaction(self, timeout=-1):
        """
        Abort ongoing transaction.
        
        Args:
            timeout (float): Maximum time to wait in seconds
        """

    def begin_transaction(self):
        """
        Begin a new transaction.
        """

    def commit_transaction(self, timeout=-1):
        """
        Commit current transaction.
        
        Args:
            timeout (float): Maximum time to wait in seconds
        """

    def init_transactions(self, timeout=-1):
        """
        Initialize transactions for this producer.
        
        Args:
            timeout (float): Maximum time to wait in seconds
        """

    def send_offsets_to_transaction(self, positions, group_metadata, timeout=-1):
        """
        Send consumer offsets to transaction.
        
        Args:
            positions (list): List of TopicPartition objects with offsets
            group_metadata (ConsumerGroupMetadata): Consumer group metadata
            timeout (float): Maximum time to wait in seconds
        """

    def list_topics(self, topic=None, timeout=-1):
        """
        Get metadata for topics.
        
        Args:
            topic (str, optional): Specific topic name to query
            timeout (float): Maximum time to wait in seconds
            
        Returns:
            ClusterMetadata: Cluster and topic metadata
        """

Consumer

High-performance Kafka consumer with support for consumer groups, manual/automatic offset management, and rebalancing.

class Consumer:
    def __init__(self, conf):
        """
        Create a new Consumer instance.
        
        Args:
            conf (dict): Configuration properties for the consumer
        """

    def subscribe(self, topics, listener=None):
        """
        Subscribe to list of topics for automatic partition assignment.
        
        Args:
            topics (list): List of topic names to subscribe to
            listener (RebalanceCallback, optional): Rebalance callback
        """

    def unsubscribe(self):
        """
        Unsubscribe from current topic subscription.
        """

    def assign(self, partitions):
        """
        Manually assign partitions to consume from.
        
        Args:
            partitions (list): List of TopicPartition objects
        """

    def assignment(self):
        """
        Get current partition assignment.
        
        Returns:
            list: List of assigned TopicPartition objects
        """

    def unassign(self):
        """
        Remove current partition assignment.
        """

    def poll(self, timeout=-1):
        """
        Poll for messages.
        
        Args:
            timeout (float): Maximum time to wait in seconds (-1 for infinite)
            
        Returns:
            Message: Message object or None if timeout
        """

    def consume(self, num_messages=1, timeout=-1):
        """
        Consume multiple messages.
        
        Args:
            num_messages (int): Maximum number of messages to return
            timeout (float): Maximum time to wait in seconds
            
        Returns:
            list: List of Message objects
        """

    def commit(self, message=None, offsets=None, asynchronous=True):
        """
        Commit message offset or specified offsets.
        
        Args:
            message (Message, optional): Commit offset for this message
            offsets (list, optional): List of TopicPartition objects with offsets
            asynchronous (bool): Commit asynchronously if True
            
        Returns:
            list: Committed offsets if synchronous, None if asynchronous
        """

    def committed(self, partitions, timeout=-1):
        """
        Get committed offsets for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects
            timeout (float): Maximum time to wait in seconds
            
        Returns:
            list: List of TopicPartition objects with committed offsets
        """

    def position(self, partitions):
        """
        Get current position (next fetch offset) for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects
            
        Returns:
            list: List of TopicPartition objects with current positions
        """

    def seek(self, partition):
        """
        Seek to offset for partition.
        
        Args:
            partition (TopicPartition): Partition with offset to seek to
        """

    def pause(self, partitions):
        """
        Pause consumption for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects to pause
        """

    def resume(self, partitions):
        """
        Resume consumption for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects to resume
        """

    def get_watermark_offsets(self, partition, timeout=-1, cached=False):
        """
        Get low and high watermark offsets for partition.
        
        Args:
            partition (TopicPartition): Partition to query
            timeout (float): Maximum time to wait in seconds
            cached (bool): Use cached values if available
            
        Returns:
            tuple: (low_offset, high_offset)
        """

    def offsets_for_times(self, partitions, timeout=-1):
        """
        Get offsets for timestamps.
        
        Args:
            partitions (list): List of TopicPartition objects with timestamps
            timeout (float): Maximum time to wait in seconds
            
        Returns:
            list: List of TopicPartition objects with offsets for timestamps
        """

    def close(self):
        """
        Close the consumer and leave consumer group.
        """

    def store_offsets(self, message=None, offsets=None):
        """
        Store offset for message or specified offsets.
        
        Args:
            message (Message, optional): Store offset for this message
            offsets (list, optional): List of TopicPartition objects with offsets
        """

    def incremental_assign(self, partitions):
        """
        Incrementally add partitions to assignment.
        
        Args:
            partitions (list): List of TopicPartition objects to add
        """

    def incremental_unassign(self, partitions):
        """
        Incrementally remove partitions from assignment.
        
        Args:
            partitions (list): List of TopicPartition objects to remove
        """

    def list_topics(self, topic=None, timeout=-1):
        """
        Get metadata for topics.
        
        Args:
            topic (str, optional): Specific topic name to query
            timeout (float): Maximum time to wait in seconds
            
        Returns:
            ClusterMetadata: Cluster and topic metadata
        """

    def consumer_group_metadata(self):
        """
        Get consumer group metadata for transactional operations.
        
        Returns:
            ConsumerGroupMetadata: Consumer group metadata object
        """

Message

Container for Kafka message data and metadata.

class Message:
    def error(self):
        """
        Get message error.
        
        Returns:
            KafkaError: Error object or None if no error
        """

    def key(self):
        """
        Get message key.
        
        Returns:
            bytes: Message key or None
        """

    def value(self):
        """
        Get message value.
        
        Returns:
            bytes: Message value or None
        """

    def topic(self):
        """
        Get message topic.
        
        Returns:
            str: Topic name
        """

    def partition(self):
        """
        Get message partition.
        
        Returns:
            int: Partition number
        """

    def offset(self):
        """
        Get message offset.
        
        Returns:
            int: Message offset
        """

    def timestamp(self):
        """
        Get message timestamp.
        
        Returns:
            tuple: (timestamp_type, timestamp) where timestamp_type is one of:
                   TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME
        """

    def headers(self):
        """
        Get message headers.
        
        Returns:
            dict: Dictionary of header key-value pairs or None
        """

    def latency(self):
        """
        Get message latency (produce time to broker acknowledgement).
        
        Returns:
            float: Latency in seconds or None
        """

    def leader_epoch(self):
        """
        Get leader epoch for the message.
        
        Returns:
            int: Leader epoch or None
        """

    def set_key(self, key):
        """
        Set message key.
        
        Args:
            key (bytes, str): New message key
        """

    def set_value(self, value):
        """
        Set message value.
        
        Args:
            value (bytes, str): New message value
        """

    def set_headers(self, headers):
        """
        Set message headers.
        
        Args:
            headers (dict): Dictionary of header key-value pairs
        """

TopicPartition

Represents a Kafka topic partition with optional offset.

class TopicPartition:
    def __init__(self, topic, partition=None, offset=None):
        """
        Create TopicPartition object.
        
        Args:
            topic (str): Topic name
            partition (int, optional): Partition number
            offset (int, optional): Offset value
        """

    @property
    def topic(self):
        """
        Topic name.
        
        Returns:
            str: Topic name
        """

    @property
    def partition(self):
        """
        Partition number.
        
        Returns:
            int: Partition number
        """

    @property
    def offset(self):
        """
        Offset value.
        
        Returns:
            int: Offset value
        """

    @offset.setter
    def offset(self, value):
        """
        Set offset value.
        
        Args:
            value (int): New offset value
        """

    @property
    def metadata(self):
        """
        Partition metadata.
        
        Returns:
            str: Metadata string
        """

    @property
    def leader_epoch(self):
        """
        Leader epoch.
        
        Returns:
            int: Leader epoch or None
        """

    def __hash__(self):
        """Hash function for use in sets and dicts."""

    def __eq__(self, other):
        """Equality comparison."""

    def __lt__(self, other):
        """Less than comparison for sorting."""

    def __str__(self):
        """String representation."""

Uuid

Represents a UUID (Universally Unique Identifier).

class Uuid:
    def __init__(self, uuid_str=None):
        """
        Create Uuid object.
        
        Args:
            uuid_str (str, optional): UUID string representation
        """

    def __str__(self):
        """
        Get string representation of UUID.
        
        Returns:
            str: UUID string
        """

    def __eq__(self, other):
        """Equality comparison."""

    def __hash__(self):
        """Hash function for use in sets and dicts."""

Usage Examples

Basic Producer Usage

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'my-producer'
}

producer = Producer(conf)

def delivery_report(err, msg):
    """Called once for each message produced."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

# Produce messages
for i in range(10):
    producer.produce('my-topic', 
                    key=f'key-{i}', 
                    value=f'value-{i}', 
                    callback=delivery_report)

# Wait for all messages to be delivered
producer.flush()

Basic Consumer Usage

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(conf)
consumer.subscribe(['my-topic'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f'End of partition {msg.topic()} [{msg.partition()}]')
            else:
                print(f'Error: {msg.error()}')
        else:
            print(f'Received: key={msg.key()}, value={msg.value()}, '
                  f'partition={msg.partition()}, offset={msg.offset()}')
            
finally:
    consumer.close()

Transaction Usage

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': 'my-transactional-id',
    'enable.idempotence': True
}

producer = Producer(conf)

# Initialize transactions
producer.init_transactions()

try:
    # Begin transaction
    producer.begin_transaction()
    
    # Produce messages within transaction
    for i in range(5):
        producer.produce('my-topic', f'transactional-message-{i}')
    
    # Commit transaction
    producer.commit_transaction()
    print('Transaction committed successfully')
    
except Exception as e:
    print(f'Transaction failed: {e}')
    producer.abort_transaction()

Manual Partition Assignment

from confluent_kafka import Consumer, TopicPartition

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'enable.auto.commit': False
}

consumer = Consumer(conf)

# Manually assign specific partitions
partitions = [
    TopicPartition('my-topic', 0, offset=100),
    TopicPartition('my-topic', 1, offset=200)
]
consumer.assign(partitions)

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
            
        if not msg.error():
            print(f'Message: {msg.value()}')
            # Manual offset commit
            consumer.commit(message=msg)
            
finally:
    consumer.close()

Install with Tessl CLI

npx tessl i tessl/pypi-confluent-kafka

docs

admin-client.md

core-producer-consumer.md

error-handling.md

index.md

schema-registry.md

serialization.md

tile.json