CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Overview
Eval results
Files

consumer.mddocs/

Consumer Operations

High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based consumption and manual partition assignment.

Capabilities

KafkaConsumer

Main consumer class for consuming records from Kafka topics. Provides automatic group coordination, offset management, and flexible consumption patterns.

class KafkaConsumer:
    def __init__(self, *topics, **configs):
        """
        Create a KafkaConsumer instance.
        
        Args:
            *topics: Topics to subscribe to initially
            **configs: Consumer configuration options including:
                bootstrap_servers (list): List of Kafka brokers
                group_id (str): Consumer group identifier
                key_deserializer (callable): Function to deserialize keys
                value_deserializer (callable): Function to deserialize values
                auto_offset_reset (str): What to do when no offset ('earliest', 'latest', 'none')
                enable_auto_commit (bool): Whether to auto-commit offsets
                auto_commit_interval_ms (int): Auto-commit interval
                session_timeout_ms (int): Group session timeout
                heartbeat_interval_ms (int): Heartbeat interval
                max_poll_records (int): Maximum records per poll
                max_poll_interval_ms (int): Maximum time between polls
                client_id (str): Client identifier
                security_protocol (str): Security protocol
                ssl_context: SSL context
                sasl_mechanism (str): SASL mechanism
                consumer_timeout_ms (int): Consumer timeout
        """
        
    def subscribe(self, topics=None, pattern=None, listener=None):
        """
        Subscribe to topics or topic pattern.
        
        Args:
            topics (list): List of topic names to subscribe to
            pattern (str): Regex pattern for topic matching
            listener (ConsumerRebalanceListener): Rebalance event listener
        """
        
    def unsubscribe(self):
        """Unsubscribe from all topics."""
        
    def assign(self, partitions):
        """
        Manually assign partitions to consumer.
        
        Args:
            partitions (list): List of TopicPartition objects
        """
        
    def assignment(self):
        """
        Get current partition assignment.
        
        Returns:
            set: Set of TopicPartition objects currently assigned
        """
        
    def subscription(self):
        """
        Get current topic subscription.
        
        Returns:
            set: Set of subscribed topic names
        """
        
    def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
        """
        Fetch records from Kafka.
        
        Args:
            timeout_ms (int): Maximum time to wait for records
            max_records (int): Maximum number of records to return
            update_offsets (bool): Whether to update fetch positions
            
        Returns:
            dict: Dictionary mapping TopicPartition to list of ConsumerRecord
        """
        
    def commit(self, offsets=None):
        """
        Commit offsets synchronously.
        
        Args:
            offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
        """
        
    def commit_async(self, offsets=None, callback=None):
        """
        Commit offsets asynchronously.
        
        Args:
            offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
            callback (callable): Callback function for commit result
        """
        
    def committed(self, partition, metadata=False):
        """
        Get committed offset for partition.
        
        Args:
            partition (TopicPartition): Partition to check
            metadata (bool): Whether to return metadata with offset
            
        Returns:
            int or OffsetAndMetadata: Committed offset
        """
        
    def position(self, partition):
        """
        Get current position (next fetch offset) for partition.
        
        Args:
            partition (TopicPartition): Partition to check
            
        Returns:
            int: Current fetch position
        """
        
    def seek(self, partition, offset):
        """
        Seek to specific offset for partition.
        
        Args:
            partition (TopicPartition): Partition to seek
            offset (int): Offset to seek to
        """
        
    def seek_to_beginning(self, *partitions):
        """
        Seek to beginning of partitions.
        
        Args:
            *partitions: TopicPartition objects (empty = all assigned)
        """
        
    def seek_to_end(self, *partitions):
        """
        Seek to end of partitions.
        
        Args:
            *partitions: TopicPartition objects (empty = all assigned)
        """
        
    def pause(self, *partitions):
        """
        Pause consumption from partitions.
        
        Args:
            *partitions: TopicPartition objects to pause
        """
        
    def resume(self, *partitions):
        """
        Resume consumption from previously paused partitions.
        
        Args:
            *partitions: TopicPartition objects to resume
        """
        
    def paused(self):
        """
        Get currently paused partitions.
        
        Returns:
            set: Set of paused TopicPartition objects
        """
        
    def topics(self):
        """
        Get available topics from cluster.
        
        Returns:
            set: Set of available topic names
        """
        
    def partitions_for_topic(self, topic):
        """
        Get available partitions for topic.
        
        Args:
            topic (str): Topic name
            
        Returns:
            set: Set of partition numbers for topic
        """
        
    def beginning_offsets(self, partitions):
        """
        Get earliest available offsets for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects
            
        Returns:
            dict: Dictionary mapping TopicPartition to offset
        """
        
    def end_offsets(self, partitions):
        """
        Get latest available offsets for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects
            
        Returns:
            dict: Dictionary mapping TopicPartition to offset
        """
        
    def close(self):
        """Close the consumer and release resources."""
        
    def highwater(self, partition):
        """
        Get high watermark offset for partition.
        
        Args:
            partition (TopicPartition): Partition to check
            
        Returns:
            int: High watermark offset
        """
        
    def bootstrap_connected(self):
        """
        Check if consumer has established bootstrap connection.
        
        Returns:
            bool: True if connected to at least one bootstrap server
        """
        
    def offsets_for_times(self, timestamps):
        """
        Get offsets for given timestamps.
        
        Args:
            timestamps (dict): Dictionary mapping TopicPartition to timestamp
            
        Returns:
            dict: Dictionary mapping TopicPartition to OffsetAndTimestamp
        """
        
    def metrics(self, raw=False):
        """
        Get consumer performance metrics.
        
        Args:
            raw (bool): If True, return raw metrics dict
            
        Returns:
            dict: Consumer performance metrics including fetch rates, lag, and timing
        """

Consumer Records and Metadata

Data structures returned by the consumer for representing consumed records.

class ConsumerRecord:
    topic: str          # Topic name
    partition: int      # Partition number
    offset: int         # Record offset
    timestamp: int      # Record timestamp  
    timestamp_type: int # Timestamp type
    key: bytes         # Record key (deserialized if deserializer provided)
    value: bytes       # Record value (deserialized if deserializer provided)
    headers: list      # List of (key, value) header tuples
    checksum: int      # Record checksum
    serialized_key_size: int    # Key size in bytes
    serialized_value_size: int  # Value size in bytes

class ConsumerRebalanceListener:
    def on_partitions_revoked(self, revoked):
        """
        Called when partitions are revoked from consumer.
        
        Args:
            revoked (list): List of TopicPartition objects being revoked
        """
        
    def on_partitions_assigned(self, assigned):
        """
        Called when new partitions are assigned to consumer.
        
        Args:
            assigned (list): List of TopicPartition objects being assigned
        """

Usage Examples

Basic Consumer Usage

from kafka import KafkaConsumer
import json

# Create consumer with JSON deserialization
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-consumer-group',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Consume messages
for message in consumer:
    print(f"Topic: {message.topic}")
    print(f"Partition: {message.partition}")
    print(f"Offset: {message.offset}")
    print(f"Value: {message.value}")

Manual Offset Management

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='manual-commit-group',
    enable_auto_commit=False  # Disable auto-commit
)

consumer.subscribe(['my-topic'])

try:
    while True:
        # Poll for messages
        message_batch = consumer.poll(timeout_ms=1000)
        
        for topic_partition, messages in message_batch.items():
            for message in messages:
                # Process message
                print(f"Processing: {message.value}")
                
                # Manually commit after processing
                consumer.commit({
                    topic_partition: OffsetAndMetadata(message.offset + 1, "processed")
                })
                
except KeyboardInterrupt:
    consumer.close()

Manual Partition Assignment

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id=None  # No group coordination
)

# Manually assign specific partitions
partitions = [
    TopicPartition('topic-1', 0),
    TopicPartition('topic-1', 1),
    TopicPartition('topic-2', 0)
]
consumer.assign(partitions)

# Seek to specific offsets
consumer.seek(TopicPartition('topic-1', 0), 100)
consumer.seek_to_beginning(TopicPartition('topic-1', 1))

for message in consumer:
    print(f"Manual assignment: {message.topic}:{message.partition}:{message.offset}")

Consumer with Rebalance Listener

from kafka import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener

class RebalanceListener(ConsumerRebalanceListener):
    def __init__(self, consumer):
        self.consumer = consumer
        
    def on_partitions_revoked(self, revoked):
        print(f"Partitions revoked: {revoked}")
        # Commit current offsets before rebalance
        self.consumer.commit()
        
    def on_partitions_assigned(self, assigned):
        print(f"Partitions assigned: {assigned}")
        # Reset to beginning for new partitions
        for partition in assigned:
            self.consumer.seek_to_beginning(partition)

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='rebalance-group'
)

listener = RebalanceListener(consumer)
consumer.subscribe(['my-topic'], listener=listener)

for message in consumer:
    print(f"Consumed: {message.value}")

Batch Processing

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'batch-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='batch-processor',
    max_poll_records=100,  # Get up to 100 records per poll
    enable_auto_commit=False
)

while True:
    # Get batch of messages
    message_batch = consumer.poll(timeout_ms=5000, max_records=100)
    
    if not message_batch:
        continue
        
    # Process batch
    batch_count = 0
    for topic_partition, messages in message_batch.items():
        for message in messages:
            # Process message
            batch_count += 1
            
    print(f"Processed batch of {batch_count} messages")
    
    # Commit batch
    consumer.commit()

Secure Consumer (SSL + SASL)

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'secure-topic',
    bootstrap_servers=['secure-broker:9093'],
    group_id='secure-group',
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_username='myuser', 
    sasl_plain_password='mypassword',
    ssl_check_hostname=True,
    ssl_cafile='ca-cert.pem'
)

for message in consumer:
    print(f"Secure message: {message.value}")

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python-ng

docs

admin.md

connection.md

consumer.md

errors.md

index.md

producer.md

serialization.md

structs.md

tile.json