Pure Python client for Apache Kafka distributed stream processing system
—
Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.
Fundamental structures for identifying Kafka topics and partitions.
TopicPartition = NamedTuple('TopicPartition', [
('topic', str), # Topic name
('partition', int) # Partition number
])
def TopicPartition(topic, partition):
"""
Create topic-partition identifier.
Parameters:
- topic: str, topic name
- partition: int, partition number (0-based)
Returns:
- TopicPartition: immutable topic-partition tuple
"""
# Usage examples:
# tp = TopicPartition('events', 0)
# tp = TopicPartition(topic='logs', partition=2)Structures for managing consumer offsets and metadata.
OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [
('offset', int), # Offset to commit
('metadata', str), # Optional metadata string
('leader_epoch', int) # Leader epoch (optional)
])
def OffsetAndMetadata(offset, metadata=None, leader_epoch=None):
"""
Create offset commit information.
Parameters:
- offset: int, offset to commit
- metadata: str, optional metadata (max 4096 bytes)
- leader_epoch: int, optional leader epoch for fencing
Returns:
- OffsetAndMetadata: offset commit information
"""
OffsetAndTimestamp = NamedTuple('OffsetAndTimestamp', [
('offset', int), # Message offset
('timestamp', int), # Message timestamp (milliseconds)
('leader_epoch', int) # Leader epoch
])
def OffsetAndTimestamp(offset, timestamp, leader_epoch=None):
"""
Create offset-timestamp pair.
Parameters:
- offset: int, message offset
- timestamp: int, message timestamp in milliseconds
- leader_epoch: int, optional leader epoch
Returns:
- OffsetAndTimestamp: offset with timestamp information
"""Structures representing broker information and cluster topology.
BrokerMetadata = NamedTuple('BrokerMetadata', [
('nodeId', int), # Broker node ID
('host', str), # Broker hostname
('port', int), # Broker port
('rack', str) # Rack identifier (may be None)
])
def BrokerMetadata(nodeId, host, port, rack=None):
"""
Create broker metadata.
Parameters:
- nodeId: int, unique broker ID
- host: str, broker hostname or IP
- port: int, broker port number
- rack: str, optional rack identifier for rack awareness
Returns:
- BrokerMetadata: broker information
"""
PartitionMetadata = NamedTuple('PartitionMetadata', [
('topic', str), # Topic name
('partition', int), # Partition number
('leader', int), # Leader broker ID
('leader_epoch', int), # Leader epoch
('replicas', list), # List of replica broker IDs
('isr', list), # In-sync replica broker IDs
('offline_replicas', list), # Offline replica broker IDs
('error', 'KafkaError') # Error information (None if no error)
])
def PartitionMetadata(topic, partition, leader, leader_epoch,
replicas, isr, offline_replicas=None, error=None):
"""
Create partition metadata.
Parameters:
- topic: str, topic name
- partition: int, partition number
- leader: int, leader broker ID (-1 if no leader)
- leader_epoch: int, current leader epoch
- replicas: List[int], all replica broker IDs
- isr: List[int], in-sync replica broker IDs
- offline_replicas: List[int], offline replica broker IDs
- error: KafkaError, error information if any
Returns:
- PartitionMetadata: complete partition information
"""Structures for consumer group membership and coordination.
MemberInformation = NamedTuple('MemberInformation', [
('member_id', str), # Member ID assigned by coordinator
('client_id', str), # Client ID specified by consumer
('client_host', str), # Client hostname
('member_metadata', bytes), # Member metadata (serialized)
('member_assignment', bytes) # Member assignment (serialized)
])
def MemberInformation(member_id, client_id, client_host,
member_metadata, member_assignment):
"""
Create consumer group member information.
Parameters:
- member_id: str, unique member identifier
- client_id: str, client identifier
- client_host: str, client hostname
- member_metadata: bytes, serialized member metadata
- member_assignment: bytes, serialized partition assignment
Returns:
- MemberInformation: consumer group member details
"""
GroupInformation = NamedTuple('GroupInformation', [
('error_code', int), # Error code (0 = success)
('group', str), # Group ID
('state', str), # Group state
('protocol_type', str), # Protocol type (usually 'consumer')
('protocol', str), # Assignment protocol name
('members', list), # List of MemberInformation
('authorized_operations', list) # List of authorized operations
])
def GroupInformation(error_code, group, state, protocol_type, protocol,
members, authorized_operations=None):
"""
Create consumer group information.
Parameters:
- error_code: int, operation error code
- group: str, consumer group ID
- state: str, group state ('Stable', 'Rebalancing', etc.)
- protocol_type: str, protocol type
- protocol: str, partition assignment protocol
- members: List[MemberInformation], group members
- authorized_operations: List[int], authorized operations
Returns:
- GroupInformation: complete group details
"""Structures for configuration management and operational parameters.
RetryOptions = NamedTuple('RetryOptions', [
('limit', int), # Maximum retry attempts
('backoff_ms', int), # Backoff interval in milliseconds
('retry_on_timeouts', bool) # Whether to retry on timeout errors
])
def RetryOptions(limit=3, backoff_ms=1000, retry_on_timeouts=True):
"""
Create retry configuration.
Parameters:
- limit: int, maximum number of retry attempts
- backoff_ms: int, backoff interval between retries
- retry_on_timeouts: bool, retry on timeout errors
Returns:
- RetryOptions: retry configuration
"""
Node = NamedTuple('Node', [
('id', int), # Node ID
('host', str), # Hostname
('port', int), # Port number
('rack', str) # Rack (may be None)
])
def Node(id, host, port, rack=None):
"""
Create node information.
Parameters:
- id: int, node identifier
- host: str, hostname
- port: int, port number
- rack: str, optional rack identifier
Returns:
- Node: node connection information
"""Structures representing individual messages and record batches.
class ConsumerRecord:
"""
Individual record consumed from Kafka.
Attributes:
- topic: str, topic name
- partition: int, partition number
- offset: int, message offset
- timestamp: int, message timestamp (milliseconds since epoch)
- timestamp_type: int, timestamp type (0=CreateTime, 1=LogAppendTime)
- key: bytes, message key (raw bytes, may be None)
- value: bytes, message value (raw bytes, may be None)
- headers: List[Tuple[str, bytes]], message headers
- checksum: int, message checksum
- serialized_key_size: int, size of serialized key
- serialized_value_size: int, size of serialized value
- leader_epoch: int, leader epoch when message was written
"""
def __init__(self, topic, partition, offset, timestamp, timestamp_type,
key, value, headers=None, checksum=None,
serialized_key_size=None, serialized_value_size=None,
leader_epoch=None):
self.topic = topic
self.partition = partition
self.offset = offset
self.timestamp = timestamp
self.timestamp_type = timestamp_type
self.key = key
self.value = value
self.headers = headers or []
self.checksum = checksum
self.serialized_key_size = serialized_key_size
self.serialized_value_size = serialized_value_size
self.leader_epoch = leader_epoch
class RecordMetadata:
"""
Metadata returned after successful record production.
Attributes:
- topic: str, topic name
- partition: int, partition number
- offset: int, assigned offset
- timestamp: int, record timestamp
- checksum: int, record checksum
- serialized_key_size: int, size of serialized key
- serialized_value_size: int, size of serialized value
- leader_epoch: int, leader epoch
"""
def __init__(self, topic, partition, offset, timestamp=None,
checksum=None, serialized_key_size=None,
serialized_value_size=None, leader_epoch=None):
self.topic = topic
self.partition = partition
self.offset = offset
self.timestamp = timestamp
self.checksum = checksum
self.serialized_key_size = serialized_key_size
self.serialized_value_size = serialized_value_size
self.leader_epoch = leader_epoch
class ConsumerRecords:
"""
Collection of records returned by consumer poll operation.
Provides various ways to access records: by partition, by topic,
or iterate over all records.
"""
def __init__(self, record_map):
"""
Initialize with mapping of TopicPartition to List[ConsumerRecord].
Parameters:
- record_map: Dict[TopicPartition, List[ConsumerRecord]]
"""
self._record_map = record_map
def __iter__(self):
"""Iterate over all records across all partitions."""
for records in self._record_map.values():
for record in records:
yield record
def __len__(self):
"""Total number of records across all partitions."""
return sum(len(records) for records in self._record_map.values())
def __bool__(self):
"""True if contains any records."""
return len(self) > 0
def records(self, partition):
"""
Get records for specific partition.
Parameters:
- partition: TopicPartition
Returns:
- List[ConsumerRecord]: records for partition
"""
return self._record_map.get(partition, [])
def by_topic(self):
"""
Group records by topic.
Returns:
- Dict[str, List[ConsumerRecord]]: records grouped by topic
"""
topic_records = {}
for tp, records in self._record_map.items():
if tp.topic not in topic_records:
topic_records[tp.topic] = []
topic_records[tp.topic].extend(records)
return topic_records
def partitions(self):
"""
Get all partitions with records.
Returns:
- Set[TopicPartition]: partitions with records
"""
return set(self._record_map.keys())from kafka import TopicPartition, KafkaConsumer, KafkaProducer
# Create topic partition identifiers
tp1 = TopicPartition('events', 0)
tp2 = TopicPartition('events', 1)
tp3 = TopicPartition('logs', 0)
# Use with consumer assignment
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
consumer.assign([tp1, tp2, tp3])
# Use with producer partition selection
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Send to specific partition
future = producer.send(tp1.topic, value=b'message', partition=tp1.partition)
metadata = future.get()
print(f"Sent to {metadata.topic} partition {metadata.partition} at offset {metadata.offset}")
# Check available partitions
available_partitions = producer.partitions_for(tp1.topic)
print(f"Available partitions for {tp1.topic}: {available_partitions}")
consumer.close()
producer.close()from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='offset-demo',
enable_auto_commit=False # Manual offset management
)
# Subscribe to topic
consumer.subscribe(['events'])
# Poll for messages
records = consumer.poll(timeout_ms=5000)
# Process records and commit offsets manually
for partition, messages in records.items():
for message in messages:
# Process message
print(f"Processing message: {message.value}")
# Create offset metadata with custom information
offset_metadata = OffsetAndMetadata(
offset=message.offset + 1, # Next offset to read
metadata=f"processed_at_{int(time.time())}"
)
# Commit offset for this partition
consumer.commit({partition: offset_metadata})
# Check committed offsets
for partition in consumer.assignment():
committed = consumer.committed(partition)
if committed:
print(f"Partition {partition}: committed offset {committed.offset}, "
f"metadata: {committed.metadata}")
consumer.close()from kafka.client_async import KafkaClient
from kafka import TopicPartition
client = KafkaClient(bootstrap_servers=['localhost:9092'])
try:
# Wait for metadata to load
client.poll(timeout_ms=5000)
cluster = client.cluster
# Examine broker information
print("Cluster Brokers:")
for broker in cluster.brokers():
print(f" Broker {broker.nodeId}: {broker.host}:{broker.port}")
if broker.rack:
print(f" Rack: {broker.rack}")
# Examine topic metadata
print(f"\nTopics ({len(cluster.topics())}):")
for topic in sorted(cluster.topics()):
partitions = cluster.partitions_for_topic(topic)
print(f" {topic}: {len(partitions)} partitions")
# Get detailed partition information
for partition_id in sorted(partitions):
tp = TopicPartition(topic, partition_id)
leader = cluster.leader_for_partition(tp)
replicas = cluster.replicas_for_partition(tp)
isr = cluster.in_sync_replicas_for_partition(tp)
print(f" Partition {partition_id}:")
print(f" Leader: {leader}")
print(f" Replicas: {replicas}")
print(f" ISR: {isr}")
finally:
client.close()from kafka import KafkaAdminClient
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
try:
# List all consumer groups
groups = admin.list_consumer_groups()
print(f"Found {len(groups)} consumer groups:")
group_ids = []
for group in groups:
print(f" {group.group}: {group.state}")
group_ids.append(group.group)
# Get detailed information for each group
if group_ids:
descriptions = admin.describe_consumer_groups(group_ids)
for group_id, description in descriptions.items():
print(f"\nGroup: {group_id}")
print(f" State: {description.state}")
print(f" Protocol: {description.partition_assignor}")
print(f" Members: {len(description.members)}")
for i, member in enumerate(description.members):
print(f" Member {i+1}:")
print(f" ID: {member.member_id}")
print(f" Client: {member.client_id}")
print(f" Host: {member.host}")
# Parse assignment if available
if hasattr(member, 'assignment') and member.assignment:
partitions = member.assignment.topic_partitions
print(f" Assigned partitions: {len(partitions)}")
for tp in sorted(partitions):
print(f" {tp.topic}[{tp.partition}]")
finally:
admin.close()from kafka import KafkaConsumer
import json
from collections import defaultdict
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
group_id='processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
max_poll_records=100
)
# Statistics tracking
partition_stats = defaultdict(lambda: {'count': 0, 'latest_offset': -1})
message_types = defaultdict(int)
try:
while True:
# Poll for batch of records
record_batch = consumer.poll(timeout_ms=5000)
if not record_batch:
print("No new messages")
continue
print(f"Received {len(record_batch)} records")
# Process records by partition to maintain ordering
for partition, records in record_batch.items():
print(f"\nProcessing {len(records)} records from {partition}")
for record in records:
# Update statistics
partition_stats[partition]['count'] += 1
partition_stats[partition]['latest_offset'] = record.offset
# Analyze message content
if isinstance(record.value, dict) and 'type' in record.value:
message_types[record.value['type']] += 1
# Process message headers
if record.headers:
print(f" Headers: {dict(record.headers)}")
# Process message based on timestamp
age_seconds = (time.time() * 1000 - record.timestamp) / 1000
if age_seconds > 300: # 5 minutes
print(f" Warning: Old message ({age_seconds:.1f}s old)")
# Print periodic statistics
print(f"\nPartition Statistics:")
for partition, stats in partition_stats.items():
print(f" {partition}: {stats['count']} messages, "
f"latest offset {stats['latest_offset']}")
print(f"Message Types: {dict(message_types)}")
except KeyboardInterrupt:
print("Shutting down...")
finally:
consumer.close()from collections import namedtuple
from kafka import TopicPartition
# Custom application-specific structures
MessageEnvelope = namedtuple('MessageEnvelope', [
'correlation_id', 'timestamp', 'source', 'data'
])
ProcessingResult = namedtuple('ProcessingResult', [
'success', 'partition', 'offset', 'processing_time', 'error'
])
PartitionLag = namedtuple('PartitionLag', [
'partition', 'current_offset', 'high_water_mark', 'lag'
])
def calculate_consumer_lag(consumer, partitions):
"""Calculate lag for consumer partitions."""
# Get current positions
current_offsets = {}
for partition in partitions:
try:
current_offsets[partition] = consumer.position(partition)
except Exception:
current_offsets[partition] = -1
# Get high water marks
end_offsets = consumer.end_offsets(partitions)
# Calculate lag
lag_info = []
for partition in partitions:
current = current_offsets.get(partition, -1)
high_water = end_offsets.get(partition, -1)
if current >= 0 and high_water >= 0:
lag = high_water - current
else:
lag = -1 # Unknown
lag_info.append(PartitionLag(
partition=partition,
current_offset=current,
high_water_mark=high_water,
lag=lag
))
return lag_info
# Usage
consumer = KafkaConsumer('events', bootstrap_servers=['localhost:9092'])
consumer.poll(0) # Initialize assignments
partitions = list(consumer.assignment())
lag_info = calculate_consumer_lag(consumer, partitions)
for lag in lag_info:
print(f"Partition {lag.partition}: "
f"current={lag.current_offset}, "
f"high_water={lag.high_water_mark}, "
f"lag={lag.lag}")
consumer.close()Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python