CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Pending
Overview
Eval results
Files

consumer.mddocs/

Consumer API

High-level consumer for consuming records from Kafka topics with comprehensive support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.

Capabilities

KafkaConsumer

Main consumer class providing high-level interface for consuming records from Kafka topics. Supports both subscribe (automatic partition assignment) and assign (manual partition assignment) modes.

class KafkaConsumer:
    def __init__(self, *topics, **configs):
        """
        Initialize Kafka consumer.
        
        Parameters:
        - *topics: str, optional topic names to subscribe to
        - **configs: consumer configuration options
          - bootstrap_servers: List[str], broker addresses  
          - group_id: str, consumer group identifier
          - key_deserializer: Callable, key deserialization function
          - value_deserializer: Callable, value deserialization function
          - auto_offset_reset: str, 'earliest' or 'latest' for new groups
          - enable_auto_commit: bool, automatic offset commits (default: True)
          - auto_commit_interval_ms: int, auto-commit interval (default: 5000)
          - session_timeout_ms: int, session timeout (default: 10000)
          - heartbeat_interval_ms: int, heartbeat interval (default: 3000)
          - max_poll_records: int, max records per poll (default: 500)
          - fetch_min_bytes: int, minimum fetch size (default: 1)
          - fetch_max_wait_ms: int, max fetch wait time (default: 500)
          - max_partition_fetch_bytes: int, max bytes per partition (default: 1MB)
          - security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
          - api_version: tuple, broker API version or 'auto'
        """
    
    def subscribe(self, topics=(), pattern=None, listener=None):
        """
        Subscribe to topics with automatic partition assignment.
        
        Parameters:
        - topics: List[str], topic names to subscribe to
        - pattern: str, regex pattern for topic matching
        - listener: ConsumerRebalanceListener, rebalance callback
        """
    
    def assign(self, partitions):
        """
        Manually assign specific partitions to consumer.
        
        Parameters:
        - partitions: List[TopicPartition], partitions to assign
        """
    
    def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
        """
        Fetch records from assigned partitions.
        
        Parameters:
        - timeout_ms: int, polling timeout in milliseconds
        - max_records: int, maximum records to return
        - update_offsets: bool, whether to update high-water mark
        
        Returns:
        - ConsumerRecords: mapping of TopicPartition to list of ConsumerRecord
        """
    
    def commit(self, offsets=None):
        """
        Commit offsets to Kafka.
        
        Parameters:
        - offsets: Dict[TopicPartition, OffsetAndMetadata], specific offsets to commit
                   If None, commits current position for all assigned partitions
        """
    
    def commit_async(self, offsets=None, callback=None):
        """
        Asynchronous offset commit.
        
        Parameters:
        - offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to commit
        - callback: Callable, completion callback function
        """
    
    def seek(self, partition, offset):
        """
        Seek to specific offset in partition.
        
        Parameters:
        - partition: TopicPartition, target partition
        - offset: int, target offset
        """
    
    def seek_to_beginning(self, *partitions):
        """
        Seek to beginning of partitions.
        
        Parameters:
        - *partitions: TopicPartition, partitions to seek (all assigned if none)
        """
    
    def seek_to_end(self, *partitions):
        """
        Seek to end of partitions.
        
        Parameters:
        - *partitions: TopicPartition, partitions to seek (all assigned if none)
        """
    
    def position(self, partition):
        """
        Get current position (next fetch offset) for partition.
        
        Parameters:
        - partition: TopicPartition, target partition
        
        Returns:
        - int: current position offset
        """
    
    def committed(self, partition):
        """
        Get last committed offset for partition.
        
        Parameters:
        - partition: TopicPartition, target partition
        
        Returns:
        - OffsetAndMetadata: last committed offset with metadata
        """
    
    def pause(self, *partitions):
        """
        Suspend fetching from partitions.
        
        Parameters:
        - *partitions: TopicPartition, partitions to pause
        """
    
    def resume(self, *partitions):
        """
        Resume fetching from partitions.
        
        Parameters:
        - *partitions: TopicPartition, partitions to resume
        """
    
    def paused(self):
        """
        Get currently paused partitions.
        
        Returns:
        - Set[TopicPartition]: paused partitions
        """
    
    def close(self, autocommit=True):
        """
        Close consumer and clean up resources.
        
        Parameters:
        - autocommit: bool, commit offsets before closing
        """
    
    def subscription(self):
        """
        Get current topic subscription.
        
        Returns:
        - Set[str]: subscribed topic names
        """
    
    def assignment(self):
        """
        Get current partition assignment.
        
        Returns:
        - Set[TopicPartition]: assigned partitions
        """
    
    def beginning_offsets(self, partitions):
        """
        Get earliest available offsets for partitions.
        
        Parameters:
        - partitions: List[TopicPartition], target partitions
        
        Returns:
        - Dict[TopicPartition, int]: earliest offsets
        """
    
    def end_offsets(self, partitions):
        """
        Get latest offsets for partitions.
        
        Parameters:
        - partitions: List[TopicPartition], target partitions
        
        Returns:
        - Dict[TopicPartition, int]: latest offsets
        """
    
    def offsets_for_times(self, timestamps):
        """
        Get offsets for specific timestamps.
        
        Parameters:
        - timestamps: Dict[TopicPartition, int], timestamp mapping
        
        Returns:
        - Dict[TopicPartition, OffsetAndTimestamp]: offset and timestamp info
        """
    
    def metrics(self):
        """
        Get consumer metrics.
        
        Returns:
        - Dict[str, float]: current metric values
        """

Consumer Rebalance Listener

Abstract base class for handling partition rebalancing events in consumer groups. Implement this interface to perform cleanup or initialization when partitions are assigned or revoked.

class ConsumerRebalanceListener:
    def on_partitions_revoked(self, revoked):
        """
        Called before partitions are reassigned.
        
        Use this callback to commit offsets and clean up state
        for partitions that are being revoked.
        
        Parameters:
        - revoked: List[TopicPartition], partitions being revoked
        """
    
    def on_partitions_assigned(self, assigned):
        """
        Called after partitions are assigned.
        
        Use this callback to set up state or seek to specific
        offsets for newly assigned partitions.  
        
        Parameters:
        - assigned: List[TopicPartition], partitions being assigned
        """

Consumer Records

Result of consumer poll() operation containing records organized by partition.

class ConsumerRecords:
    def __init__(self, record_map):
        """
        Container for poll() results.
        
        Parameters:
        - record_map: Dict[TopicPartition, List[ConsumerRecord]]
        """
    
    def __iter__(self):
        """Iterate over all records across all partitions."""
    
    def __len__(self):
        """Total number of records across all partitions."""
    
    def __bool__(self):
        """True if contains any records."""
    
    def records(self, partition):
        """
        Get records for specific partition.
        
        Parameters:
        - partition: TopicPartition, target partition
        
        Returns:
        - List[ConsumerRecord]: records for partition
        """
    
    def by_topic(self):
        """
        Group records by topic.
        
        Returns:
        - Dict[str, List[ConsumerRecord]]: records grouped by topic
        """

Consumer Record

Individual record consumed from Kafka containing message data and metadata.

class ConsumerRecord:
    topic: str                    # Topic name
    partition: int               # Partition number  
    offset: int                  # Message offset
    timestamp: int               # Message timestamp (milliseconds)
    timestamp_type: int          # Timestamp type (0=CreateTime, 1=LogAppendTime)
    key: bytes                   # Message key (raw bytes)
    value: bytes                 # Message value (raw bytes)  
    headers: List[Tuple[str, bytes]]  # Message headers
    checksum: int                # Message checksum
    serialized_key_size: int     # Serialized key size
    serialized_value_size: int   # Serialized value size
    leader_epoch: int            # Leader epoch

Usage Examples

Basic Consumer Group

from kafka import KafkaConsumer
import json

# Create consumer with automatic offset management
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    auto_commit_interval_ms=1000
)

# Process messages
for message in consumer:
    print(f"Topic: {message.topic}")
    print(f"Partition: {message.partition}")
    print(f"Offset: {message.offset}")
    print(f"Key: {message.key}")
    print(f"Value: {message.value}")
    print(f"Timestamp: {message.timestamp}")

consumer.close()

Manual Partition Assignment

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id=None,  # No consumer group
    value_deserializer=lambda m: m.decode('utf-8')
)

# Manually assign specific partitions
partitions = [
    TopicPartition('topic1', 0),
    TopicPartition('topic1', 1),
    TopicPartition('topic2', 0)
]
consumer.assign(partitions)

# Seek to specific positions
consumer.seek(TopicPartition('topic1', 0), 100)
consumer.seek_to_end(TopicPartition('topic1', 1))

# Poll for messages
while True:
    records = consumer.poll(timeout_ms=1000)
    for partition, messages in records.items():
        for message in messages:
            print(f"Partition {partition}: {message.value}")

Manual Offset Management

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='manual-commit-group',
    enable_auto_commit=False,  # Disable auto-commit
    auto_offset_reset='earliest'
)

try:
    while True:
        records = consumer.poll(timeout_ms=1000)
        
        for partition, messages in records.items():
            for message in messages:
                # Process message
                print(f"Processing: {message.value}")
                
                # Manually commit after processing
                consumer.commit({
                    partition: OffsetAndMetadata(message.offset + 1, None)
                })
                
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Rebalance Listener

from kafka import KafkaConsumer
from kafka.consumer import ConsumerRebalanceListener
import logging

class MyRebalanceListener(ConsumerRebalanceListener):
    def __init__(self, consumer):
        self.consumer = consumer
        
    def on_partitions_revoked(self, revoked):
        logging.info(f"Partitions revoked: {revoked}")
        # Commit current offsets before partitions are reassigned
        self.consumer.commit()
        
    def on_partitions_assigned(self, assigned):
        logging.info(f"Partitions assigned: {assigned}")
        # Could seek to specific offsets or perform other setup

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='rebalance-group'
)

listener = MyRebalanceListener(consumer)
consumer.subscribe(['my-topic'], listener=listener)

for message in consumer:
    print(f"Received: {message.value}")

Batch Processing

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'batch-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='batch-group',
    max_poll_records=100,  # Process up to 100 records per poll
    enable_auto_commit=False
)

def process_batch(records):
    """Process a batch of records together."""
    batch_data = []
    for record in records:
        batch_data.append(record.value)
    
    # Simulate batch processing
    print(f"Processing batch of {len(batch_data)} records")
    # ... perform batch operation ...
    
    return True

try:
    while True:
        # Poll for a batch of records
        record_batch = consumer.poll(timeout_ms=5000, max_records=50)
        
        if record_batch:
            # Flatten records from all partitions
            all_records = []
            last_offsets = {}
            
            for partition, records in record_batch.items():
                all_records.extend(records)
                if records:
                    last_offsets[partition] = records[-1].offset + 1
            
            # Process the batch
            if process_batch(all_records):
                # Commit offsets for successfully processed batch
                offset_data = {
                    partition: OffsetAndMetadata(offset, None) 
                    for partition, offset in last_offsets.items()
                }
                consumer.commit(offset_data)
                
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python

docs

admin.md

consumer.md

errors.md

index.md

producer.md

structures.md

tile.json