CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Overview
Eval results
Files

structs.mddocs/

Data Structures

Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.

Imports

from collections import namedtuple

Capabilities

Topic and Partition Identifiers

Data structures for identifying topics and partitions within a Kafka cluster.

TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])
"""
A topic and partition tuple.

Args:
    topic (str): Topic name
    partition (int): Partition number

Usage:
    tp = TopicPartition('my-topic', 0)
    print(tp.topic)     # 'my-topic'
    print(tp.partition) # 0
"""

Offset and Metadata

Data structures for representing offsets with associated metadata.

OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata"])
"""
Offset with commit metadata.

Args:
    offset (int): The offset to be committed
    metadata (str): Non-null metadata

Usage:
    oam = OffsetAndMetadata(100, "my-metadata")
    print(oam.offset)   # 100
    print(oam.metadata) # "my-metadata"
"""

OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp"])
"""
Offset with associated timestamp.

Args:
    offset (int): An offset
    timestamp (int): The timestamp associated to the offset

Usage:
    oat = OffsetAndTimestamp(100, 1640995200000)
    print(oat.offset)    # 100
    print(oat.timestamp) # 1640995200000
"""

Broker and Cluster Metadata

Data structures for representing broker and cluster information.

class BrokerMetadata:
    def __init__(self, nodeId: int, host: str, port: int, rack: str = None):
        """
        Kafka broker metadata.
        
        Args:
            nodeId (int): Broker node ID
            host (str): Broker hostname
            port (int): Broker port
            rack (str): Broker rack identifier (optional)
        """
        
    nodeId: int     # Broker node ID
    host: str       # Broker hostname  
    port: int       # Broker port
    rack: str       # Broker rack (optional)

class PartitionMetadata:
    def __init__(self, topic: str, partition: int, leader: int, replicas: list, isr: list, error: int = None):
        """
        Partition metadata from cluster.
        
        Args:
            topic (str): Topic name
            partition (int): Partition number
            leader (int): Leader broker node ID
            replicas (list): List of replica broker node IDs
            isr (list): List of in-sync replica broker node IDs
            error (int): Error code (optional)
        """
        
    topic: str      # Topic name
    partition: int  # Partition number
    leader: int     # Leader broker node ID
    replicas: list  # Replica broker node IDs
    isr: list       # In-sync replica broker node IDs
    error: int      # Error code

Consumer Group Information

Data structures for representing consumer group state and member information.

class MemberInformation:
    def __init__(self, member_id: str, client_id: str, client_host: str, member_metadata: bytes, member_assignment: bytes):
        """
        Consumer group member information.
        
        Args:
            member_id (str): Member identifier
            client_id (str): Client identifier
            client_host (str): Client hostname
            member_metadata (bytes): Member metadata
            member_assignment (bytes): Member assignment data
        """
        
    member_id: str          # Member identifier
    client_id: str          # Client identifier
    client_host: str        # Client hostname
    member_metadata: bytes  # Member metadata
    member_assignment: bytes # Member assignment

class GroupInformation:
    def __init__(self, error_code: int, group: str, state: str, protocol_type: str, protocol: str, members: list, authorized_operations: set = None):
        """
        Consumer group information.
        
        Args:
            error_code (int): Error code
            group (str): Group ID
            state (str): Group state
            protocol_type (str): Protocol type
            protocol (str): Protocol name
            members (list): List of MemberInformation objects
            authorized_operations (set): Authorized operations (optional)
        """
        
    error_code: int             # Error code
    group: str                  # Group ID
    state: str                  # Group state
    protocol_type: str          # Protocol type
    protocol: str               # Protocol name
    members: list               # List of members
    authorized_operations: set  # Authorized operations

Producer Configuration

Data structures for producer retry and configuration options.

class RetryOptions:
    def __init__(self, limit: int, backoff_ms: int, retry_on_timeouts: bool = True):
        """
        Retry policy configuration for async producer.
        
        Args:
            limit (int): Maximum retry attempts
            backoff_ms (int): Backoff time between retries in milliseconds
            retry_on_timeouts (bool): Whether to retry on timeout errors
        """
        
    limit: int                # Maximum retry attempts
    backoff_ms: int          # Backoff time in milliseconds
    retry_on_timeouts: bool  # Retry on timeouts

Record Metadata and Timestamps

Data structures for record metadata and timing information.

class RecordMetadata:
    def __init__(self, topic: str, partition: int, offset: int, timestamp: int = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):
        """
        Metadata for a produced record.
        
        Args:
            topic (str): Topic name
            partition (int): Partition number
            offset (int): Record offset
            timestamp (int): Record timestamp (optional)
            checksum (int): Record checksum (optional)
            serialized_key_size (int): Key size in bytes (optional)
            serialized_value_size (int): Value size in bytes (optional)
        """
        
    topic: str                    # Topic name
    partition: int                # Partition number
    offset: int                   # Record offset
    timestamp: int                # Record timestamp
    checksum: int                 # Record checksum
    serialized_key_size: int      # Key size in bytes
    serialized_value_size: int    # Value size in bytes

class ConsumerRecord:
    def __init__(self, topic: str, partition: int, offset: int, timestamp: int, timestamp_type: int, key: bytes, value: bytes, headers: list = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):
        """
        Record consumed from Kafka.
        
        Args:
            topic (str): Topic name
            partition (int): Partition number
            offset (int): Record offset
            timestamp (int): Record timestamp
            timestamp_type (int): Timestamp type
            key (bytes): Record key
            value (bytes): Record value
            headers (list): List of (key, value) header tuples (optional)
            checksum (int): Record checksum (optional)
            serialized_key_size (int): Key size in bytes (optional)
            serialized_value_size (int): Value size in bytes (optional)
        """
        
    topic: str                    # Topic name
    partition: int                # Partition number
    offset: int                   # Record offset
    timestamp: int                # Record timestamp
    timestamp_type: int           # Timestamp type
    key: bytes                    # Record key
    value: bytes                  # Record value
    headers: list                 # Header tuples
    checksum: int                 # Record checksum
    serialized_key_size: int      # Key size in bytes
    serialized_value_size: int    # Value size in bytes

Usage Examples

Working with TopicPartition

from kafka.structs import TopicPartition
from kafka import KafkaConsumer

# Create topic partition identifiers
partition_0 = TopicPartition('my-topic', 0)
partition_1 = TopicPartition('my-topic', 1)

# Use in consumer assignment
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
consumer.assign([partition_0, partition_1])

# Use in offset management
consumer.seek(partition_0, 1000)
current_position = consumer.position(partition_0)

# TopicPartition objects are hashable
partition_set = {partition_0, partition_1}
partition_dict = {partition_0: 'data for partition 0'}

Offset and Metadata Management

from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    enable_auto_commit=False
)

partition = TopicPartition('my-topic', 0)

# Manual offset commits with metadata
offset_metadata = OffsetAndMetadata(1500, 'processed batch 123')
consumer.commit({partition: offset_metadata})

# Check committed offset
committed = consumer.committed(partition, metadata=True)
print(f"Committed offset: {committed.offset}, metadata: {committed.metadata}")

# Get offsets with timestamps
from kafka.structs import OffsetAndTimestamp
# Note: OffsetAndTimestamp is typically returned by offset-by-timestamp queries

Broker and Cluster Information

from kafka import KafkaAdminClient
from kafka.structs import BrokerMetadata

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Get cluster metadata
metadata = admin.list_topics()

# Access broker information
for broker_id, broker in metadata.brokers.items():
    print(f"Broker {broker.nodeId}: {broker.host}:{broker.port}")
    if broker.rack:
        print(f"  Rack: {broker.rack}")

# Access partition metadata
for topic_name, topic_metadata in metadata.topics.items():
    for partition in topic_metadata.partitions.values():
        print(f"Topic {partition.topic} partition {partition.partition}:")
        print(f"  Leader: {partition.leader}")
        print(f"  Replicas: {partition.replicas}")
        print(f"  ISR: {partition.isr}")

Consumer Group Information

from kafka import KafkaAdminClient

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# Get consumer group details
group_details = admin.describe_consumer_groups(['my-consumer-group'])

for group_id, group_info in group_details.items():
    print(f"Group: {group_info.group}")
    print(f"State: {group_info.state}")
    print(f"Protocol: {group_info.protocol}")
    
    print("Members:")
    for member in group_info.members:
        print(f"  Member ID: {member.member_id}")
        print(f"  Client ID: {member.client_id}")
        print(f"  Host: {member.client_host}")

Record Metadata Handling

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Send message and get metadata
future = producer.send('my-topic', key=b'key1', value=b'value1')
record_metadata = future.get(timeout=10)

print(f"Record sent to:")
print(f"  Topic: {record_metadata.topic}")
print(f"  Partition: {record_metadata.partition}")
print(f"  Offset: {record_metadata.offset}")
print(f"  Timestamp: {record_metadata.timestamp}")
print(f"  Key size: {record_metadata.serialized_key_size} bytes")
print(f"  Value size: {record_metadata.serialized_value_size} bytes")

Consumer Record Processing

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group'
)

for message in consumer:
    print(f"Consumed record:")
    print(f"  Topic: {message.topic}")
    print(f"  Partition: {message.partition}")
    print(f"  Offset: {message.offset}")
    print(f"  Timestamp: {message.timestamp}")
    print(f"  Key: {message.key}")
    print(f"  Value: {message.value}")
    
    # Process headers if present
    if message.headers:
        print("  Headers:")
        for header_key, header_value in message.headers:
            print(f"    {header_key}: {header_value}")

Data Structure Comparisons and Collections

from kafka.structs import TopicPartition, OffsetAndMetadata

# TopicPartition equality and hashing
tp1 = TopicPartition('topic-a', 0)
tp2 = TopicPartition('topic-a', 0)
tp3 = TopicPartition('topic-a', 1)

print(tp1 == tp2)  # True
print(tp1 == tp3)  # False

# Use in sets and dictionaries
partitions = {tp1, tp2, tp3}  # Only 2 unique partitions
print(len(partitions))  # 2

# Offset mapping
offsets = {
    tp1: OffsetAndMetadata(1000, 'first partition'),
    tp3: OffsetAndMetadata(2000, 'second partition')
}

for partition, offset_data in offsets.items():
    print(f"{partition.topic}:{partition.partition} = {offset_data.offset}")

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python-ng

docs

admin.md

connection.md

consumer.md

errors.md

index.md

producer.md

serialization.md

structs.md

tile.json