CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Pending
Overview
Eval results
Files

structures.mddocs/

Data Structures

Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.

Capabilities

Topic and Partition Identifiers

Fundamental structures for identifying Kafka topics and partitions.

TopicPartition = NamedTuple('TopicPartition', [
    ('topic', str),      # Topic name
    ('partition', int)   # Partition number
])

def TopicPartition(topic, partition):
    """
    Create topic-partition identifier.
    
    Parameters:
    - topic: str, topic name
    - partition: int, partition number (0-based)
    
    Returns:
    - TopicPartition: immutable topic-partition tuple
    """

# Usage examples:
# tp = TopicPartition('events', 0)
# tp = TopicPartition(topic='logs', partition=2)

Offset Management

Structures for managing consumer offsets and metadata.

OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [
    ('offset', int),        # Offset to commit
    ('metadata', str),      # Optional metadata string
    ('leader_epoch', int)   # Leader epoch (optional)
])

def OffsetAndMetadata(offset, metadata=None, leader_epoch=None):
    """
    Create offset commit information.
    
    Parameters:
    - offset: int, offset to commit
    - metadata: str, optional metadata (max 4096 bytes)
    - leader_epoch: int, optional leader epoch for fencing
    
    Returns:
    - OffsetAndMetadata: offset commit information
    """

OffsetAndTimestamp = NamedTuple('OffsetAndTimestamp', [
    ('offset', int),        # Message offset
    ('timestamp', int),     # Message timestamp (milliseconds)
    ('leader_epoch', int)   # Leader epoch
])

def OffsetAndTimestamp(offset, timestamp, leader_epoch=None):
    """
    Create offset-timestamp pair.
    
    Parameters:
    - offset: int, message offset
    - timestamp: int, message timestamp in milliseconds
    - leader_epoch: int, optional leader epoch
    
    Returns:
    - OffsetAndTimestamp: offset with timestamp information
    """

Broker and Cluster Metadata

Structures representing broker information and cluster topology.

BrokerMetadata = NamedTuple('BrokerMetadata', [
    ('nodeId', int),    # Broker node ID
    ('host', str),      # Broker hostname
    ('port', int),      # Broker port
    ('rack', str)       # Rack identifier (may be None)
])

def BrokerMetadata(nodeId, host, port, rack=None):
    """
    Create broker metadata.
    
    Parameters:
    - nodeId: int, unique broker ID
    - host: str, broker hostname or IP
    - port: int, broker port number
    - rack: str, optional rack identifier for rack awareness
    
    Returns:
    - BrokerMetadata: broker information
    """

PartitionMetadata = NamedTuple('PartitionMetadata', [
    ('topic', str),              # Topic name
    ('partition', int),          # Partition number
    ('leader', int),             # Leader broker ID
    ('leader_epoch', int),       # Leader epoch
    ('replicas', list),          # List of replica broker IDs
    ('isr', list),              # In-sync replica broker IDs
    ('offline_replicas', list),  # Offline replica broker IDs
    ('error', 'KafkaError')     # Error information (None if no error)
])

def PartitionMetadata(topic, partition, leader, leader_epoch, 
                     replicas, isr, offline_replicas=None, error=None):
    """
    Create partition metadata.
    
    Parameters:
    - topic: str, topic name
    - partition: int, partition number
    - leader: int, leader broker ID (-1 if no leader)
    - leader_epoch: int, current leader epoch
    - replicas: List[int], all replica broker IDs
    - isr: List[int], in-sync replica broker IDs
    - offline_replicas: List[int], offline replica broker IDs
    - error: KafkaError, error information if any
    
    Returns:
    - PartitionMetadata: complete partition information
    """

Consumer Group Information

Structures for consumer group membership and coordination.

MemberInformation = NamedTuple('MemberInformation', [
    ('member_id', str),         # Member ID assigned by coordinator
    ('client_id', str),         # Client ID specified by consumer
    ('client_host', str),       # Client hostname
    ('member_metadata', bytes), # Member metadata (serialized)
    ('member_assignment', bytes) # Member assignment (serialized)
])

def MemberInformation(member_id, client_id, client_host, 
                     member_metadata, member_assignment):
    """
    Create consumer group member information.
    
    Parameters:
    - member_id: str, unique member identifier
    - client_id: str, client identifier
    - client_host: str, client hostname  
    - member_metadata: bytes, serialized member metadata
    - member_assignment: bytes, serialized partition assignment
    
    Returns:
    - MemberInformation: consumer group member details
    """

GroupInformation = NamedTuple('GroupInformation', [
    ('error_code', int),            # Error code (0 = success)
    ('group', str),                 # Group ID
    ('state', str),                 # Group state
    ('protocol_type', str),         # Protocol type (usually 'consumer')
    ('protocol', str),              # Assignment protocol name
    ('members', list),              # List of MemberInformation
    ('authorized_operations', list) # List of authorized operations
])

def GroupInformation(error_code, group, state, protocol_type, protocol, 
                    members, authorized_operations=None):
    """
    Create consumer group information.
    
    Parameters:
    - error_code: int, operation error code
    - group: str, consumer group ID
    - state: str, group state ('Stable', 'Rebalancing', etc.)
    - protocol_type: str, protocol type
    - protocol: str, partition assignment protocol
    - members: List[MemberInformation], group members
    - authorized_operations: List[int], authorized operations
    
    Returns:
    - GroupInformation: complete group details
    """

Configuration and Operational Data

Structures for configuration management and operational parameters.

RetryOptions = NamedTuple('RetryOptions', [
    ('limit', int),           # Maximum retry attempts
    ('backoff_ms', int),      # Backoff interval in milliseconds
    ('retry_on_timeouts', bool) # Whether to retry on timeout errors
])

def RetryOptions(limit=3, backoff_ms=1000, retry_on_timeouts=True):
    """
    Create retry configuration.
    
    Parameters:
    - limit: int, maximum number of retry attempts
    - backoff_ms: int, backoff interval between retries
    - retry_on_timeouts: bool, retry on timeout errors
    
    Returns:  
    - RetryOptions: retry configuration
    """

Node = NamedTuple('Node', [
    ('id', int),        # Node ID
    ('host', str),      # Hostname
    ('port', int),      # Port number
    ('rack', str)       # Rack (may be None)
])

def Node(id, host, port, rack=None):
    """
    Create node information.
    
    Parameters:
    - id: int, node identifier
    - host: str, hostname
    - port: int, port number
    - rack: str, optional rack identifier
    
    Returns:
    - Node: node connection information
    """

Record and Message Data

Structures representing individual messages and record batches.

class ConsumerRecord:
    """
    Individual record consumed from Kafka.
    
    Attributes:
    - topic: str, topic name
    - partition: int, partition number
    - offset: int, message offset
    - timestamp: int, message timestamp (milliseconds since epoch)
    - timestamp_type: int, timestamp type (0=CreateTime, 1=LogAppendTime)
    - key: bytes, message key (raw bytes, may be None)
    - value: bytes, message value (raw bytes, may be None)
    - headers: List[Tuple[str, bytes]], message headers
    - checksum: int, message checksum
    - serialized_key_size: int, size of serialized key
    - serialized_value_size: int, size of serialized value
    - leader_epoch: int, leader epoch when message was written
    """
    
    def __init__(self, topic, partition, offset, timestamp, timestamp_type,
                 key, value, headers=None, checksum=None, 
                 serialized_key_size=None, serialized_value_size=None,
                 leader_epoch=None):
        self.topic = topic
        self.partition = partition
        self.offset = offset
        self.timestamp = timestamp
        self.timestamp_type = timestamp_type
        self.key = key
        self.value = value
        self.headers = headers or []
        self.checksum = checksum
        self.serialized_key_size = serialized_key_size
        self.serialized_value_size = serialized_value_size
        self.leader_epoch = leader_epoch

class RecordMetadata:
    """
    Metadata returned after successful record production.
    
    Attributes:
    - topic: str, topic name
    - partition: int, partition number
    - offset: int, assigned offset
    - timestamp: int, record timestamp
    - checksum: int, record checksum
    - serialized_key_size: int, size of serialized key
    - serialized_value_size: int, size of serialized value
    - leader_epoch: int, leader epoch
    """
    
    def __init__(self, topic, partition, offset, timestamp=None,
                 checksum=None, serialized_key_size=None, 
                 serialized_value_size=None, leader_epoch=None):
        self.topic = topic
        self.partition = partition
        self.offset = offset
        self.timestamp = timestamp
        self.checksum = checksum
        self.serialized_key_size = serialized_key_size
        self.serialized_value_size = serialized_value_size
        self.leader_epoch = leader_epoch

class ConsumerRecords:
    """
    Collection of records returned by consumer poll operation.
    
    Provides various ways to access records: by partition, by topic,
    or iterate over all records.
    """
    
    def __init__(self, record_map):
        """
        Initialize with mapping of TopicPartition to List[ConsumerRecord].
        
        Parameters:
        - record_map: Dict[TopicPartition, List[ConsumerRecord]]
        """
        self._record_map = record_map
    
    def __iter__(self):
        """Iterate over all records across all partitions."""
        for records in self._record_map.values():
            for record in records:
                yield record
    
    def __len__(self):
        """Total number of records across all partitions."""
        return sum(len(records) for records in self._record_map.values())
    
    def __bool__(self):
        """True if contains any records."""
        return len(self) > 0
    
    def records(self, partition):
        """
        Get records for specific partition.
        
        Parameters:
        - partition: TopicPartition
        
        Returns:
        - List[ConsumerRecord]: records for partition
        """
        return self._record_map.get(partition, [])
    
    def by_topic(self):
        """
        Group records by topic.
        
        Returns:
        - Dict[str, List[ConsumerRecord]]: records grouped by topic
        """
        topic_records = {}
        for tp, records in self._record_map.items():
            if tp.topic not in topic_records:
                topic_records[tp.topic] = []
            topic_records[tp.topic].extend(records)
        return topic_records
    
    def partitions(self):
        """
        Get all partitions with records.
        
        Returns:
        - Set[TopicPartition]: partitions with records
        """
        return set(self._record_map.keys())

Usage Examples

Working with Topic Partitions

from kafka import TopicPartition, KafkaConsumer, KafkaProducer

# Create topic partition identifiers
tp1 = TopicPartition('events', 0)
tp2 = TopicPartition('events', 1)
tp3 = TopicPartition('logs', 0)

# Use with consumer assignment
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
consumer.assign([tp1, tp2, tp3])

# Use with producer partition selection
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Send to specific partition
future = producer.send(tp1.topic, value=b'message', partition=tp1.partition)
metadata = future.get()

print(f"Sent to {metadata.topic} partition {metadata.partition} at offset {metadata.offset}")

# Check available partitions
available_partitions = producer.partitions_for(tp1.topic)
print(f"Available partitions for {tp1.topic}: {available_partitions}")

consumer.close()
producer.close()

Offset Management

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='offset-demo',
    enable_auto_commit=False  # Manual offset management
)

# Subscribe to topic
consumer.subscribe(['events'])

# Poll for messages
records = consumer.poll(timeout_ms=5000)

# Process records and commit offsets manually
for partition, messages in records.items():
    for message in messages:
        # Process message
        print(f"Processing message: {message.value}")
        
        # Create offset metadata with custom information
        offset_metadata = OffsetAndMetadata(
            offset=message.offset + 1,  # Next offset to read
            metadata=f"processed_at_{int(time.time())}"
        )
        
        # Commit offset for this partition
        consumer.commit({partition: offset_metadata})

# Check committed offsets
for partition in consumer.assignment():
    committed = consumer.committed(partition)
    if committed:
        print(f"Partition {partition}: committed offset {committed.offset}, "
              f"metadata: {committed.metadata}")

consumer.close()

Cluster Metadata Inspection

from kafka.client_async import KafkaClient
from kafka import TopicPartition

client = KafkaClient(bootstrap_servers=['localhost:9092'])

try:
    # Wait for metadata to load
    client.poll(timeout_ms=5000)
    
    cluster = client.cluster
    
    # Examine broker information
    print("Cluster Brokers:")
    for broker in cluster.brokers():
        print(f"  Broker {broker.nodeId}: {broker.host}:{broker.port}")
        if broker.rack:
            print(f"    Rack: {broker.rack}")
    
    # Examine topic metadata
    print(f"\nTopics ({len(cluster.topics())}):")
    for topic in sorted(cluster.topics()):
        partitions = cluster.partitions_for_topic(topic)
        print(f"  {topic}: {len(partitions)} partitions")
        
        # Get detailed partition information
        for partition_id in sorted(partitions):
            tp = TopicPartition(topic, partition_id)
            leader = cluster.leader_for_partition(tp)
            replicas = cluster.replicas_for_partition(tp)
            isr = cluster.in_sync_replicas_for_partition(tp)
            
            print(f"    Partition {partition_id}:")
            print(f"      Leader: {leader}")
            print(f"      Replicas: {replicas}")  
            print(f"      ISR: {isr}")

finally:
    client.close()

Consumer Group Analysis

from kafka import KafkaAdminClient

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

try:
    # List all consumer groups
    groups = admin.list_consumer_groups()
    print(f"Found {len(groups)} consumer groups:")
    
    group_ids = []
    for group in groups:
        print(f"  {group.group}: {group.state}")
        group_ids.append(group.group)
    
    # Get detailed information for each group
    if group_ids:
        descriptions = admin.describe_consumer_groups(group_ids)
        
        for group_id, description in descriptions.items():
            print(f"\nGroup: {group_id}")
            print(f"  State: {description.state}")
            print(f"  Protocol: {description.partition_assignor}")
            print(f"  Members: {len(description.members)}")
            
            for i, member in enumerate(description.members):
                print(f"    Member {i+1}:")
                print(f"      ID: {member.member_id}")
                print(f"      Client: {member.client_id}")
                print(f"      Host: {member.host}")
                
                # Parse assignment if available
                if hasattr(member, 'assignment') and member.assignment:
                    partitions = member.assignment.topic_partitions
                    print(f"      Assigned partitions: {len(partitions)}")
                    for tp in sorted(partitions):
                        print(f"        {tp.topic}[{tp.partition}]")

finally:
    admin.close()

Record Processing Patterns

from kafka import KafkaConsumer
import json
from collections import defaultdict

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['localhost:9092'],
    group_id='processor',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    max_poll_records=100
)

# Statistics tracking
partition_stats = defaultdict(lambda: {'count': 0, 'latest_offset': -1})
message_types = defaultdict(int)

try:
    while True:
        # Poll for batch of records
        record_batch = consumer.poll(timeout_ms=5000)
        
        if not record_batch:
            print("No new messages")
            continue
        
        print(f"Received {len(record_batch)} records")
        
        # Process records by partition to maintain ordering
        for partition, records in record_batch.items():
            print(f"\nProcessing {len(records)} records from {partition}")
            
            for record in records:
                # Update statistics
                partition_stats[partition]['count'] += 1
                partition_stats[partition]['latest_offset'] = record.offset
                
                # Analyze message content
                if isinstance(record.value, dict) and 'type' in record.value:
                    message_types[record.value['type']] += 1
                
                # Process message headers
                if record.headers:
                    print(f"  Headers: {dict(record.headers)}")
                
                # Process message based on timestamp
                age_seconds = (time.time() * 1000 - record.timestamp) / 1000
                if age_seconds > 300:  # 5 minutes
                    print(f"  Warning: Old message ({age_seconds:.1f}s old)")
        
        # Print periodic statistics  
        print(f"\nPartition Statistics:")
        for partition, stats in partition_stats.items():
            print(f"  {partition}: {stats['count']} messages, "
                  f"latest offset {stats['latest_offset']}")
        
        print(f"Message Types: {dict(message_types)}")

except KeyboardInterrupt:
    print("Shutting down...")

finally:
    consumer.close()

Custom Data Structures

from collections import namedtuple
from kafka import TopicPartition

# Custom application-specific structures
MessageEnvelope = namedtuple('MessageEnvelope', [
    'correlation_id', 'timestamp', 'source', 'data'
])

ProcessingResult = namedtuple('ProcessingResult', [
    'success', 'partition', 'offset', 'processing_time', 'error'
])

PartitionLag = namedtuple('PartitionLag', [
    'partition', 'current_offset', 'high_water_mark', 'lag'
])

def calculate_consumer_lag(consumer, partitions):
    """Calculate lag for consumer partitions."""
    
    # Get current positions
    current_offsets = {}
    for partition in partitions:
        try:
            current_offsets[partition] = consumer.position(partition)
        except Exception:
            current_offsets[partition] = -1
    
    # Get high water marks
    end_offsets = consumer.end_offsets(partitions)
    
    # Calculate lag
    lag_info = []
    for partition in partitions:
        current = current_offsets.get(partition, -1)
        high_water = end_offsets.get(partition, -1)
        
        if current >= 0 and high_water >= 0:
            lag = high_water - current
        else:
            lag = -1  # Unknown
        
        lag_info.append(PartitionLag(
            partition=partition,
            current_offset=current,
            high_water_mark=high_water,
            lag=lag
        ))
    
    return lag_info

# Usage
consumer = KafkaConsumer('events', bootstrap_servers=['localhost:9092'])
consumer.poll(0)  # Initialize assignments

partitions = list(consumer.assignment())
lag_info = calculate_consumer_lag(consumer, partitions)

for lag in lag_info:
    print(f"Partition {lag.partition}: "
          f"current={lag.current_offset}, "
          f"high_water={lag.high_water_mark}, "
          f"lag={lag.lag}")

consumer.close()

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python

docs

admin.md

consumer.md

errors.md

index.md

producer.md

structures.md

tile.json