Pure Python client for Apache Kafka distributed stream processing system
—
High-level consumer for consuming records from Kafka topics with comprehensive support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.
Main consumer class providing high-level interface for consuming records from Kafka topics. Supports both subscribe (automatic partition assignment) and assign (manual partition assignment) modes.
class KafkaConsumer:
def __init__(self, *topics, **configs):
"""
Initialize Kafka consumer.
Parameters:
- *topics: str, optional topic names to subscribe to
- **configs: consumer configuration options
- bootstrap_servers: List[str], broker addresses
- group_id: str, consumer group identifier
- key_deserializer: Callable, key deserialization function
- value_deserializer: Callable, value deserialization function
- auto_offset_reset: str, 'earliest' or 'latest' for new groups
- enable_auto_commit: bool, automatic offset commits (default: True)
- auto_commit_interval_ms: int, auto-commit interval (default: 5000)
- session_timeout_ms: int, session timeout (default: 10000)
- heartbeat_interval_ms: int, heartbeat interval (default: 3000)
- max_poll_records: int, max records per poll (default: 500)
- fetch_min_bytes: int, minimum fetch size (default: 1)
- fetch_max_wait_ms: int, max fetch wait time (default: 500)
- max_partition_fetch_bytes: int, max bytes per partition (default: 1MB)
- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
- api_version: tuple, broker API version or 'auto'
"""
def subscribe(self, topics=(), pattern=None, listener=None):
"""
Subscribe to topics with automatic partition assignment.
Parameters:
- topics: List[str], topic names to subscribe to
- pattern: str, regex pattern for topic matching
- listener: ConsumerRebalanceListener, rebalance callback
"""
def assign(self, partitions):
"""
Manually assign specific partitions to consumer.
Parameters:
- partitions: List[TopicPartition], partitions to assign
"""
def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
"""
Fetch records from assigned partitions.
Parameters:
- timeout_ms: int, polling timeout in milliseconds
- max_records: int, maximum records to return
- update_offsets: bool, whether to update high-water mark
Returns:
- ConsumerRecords: mapping of TopicPartition to list of ConsumerRecord
"""
def commit(self, offsets=None):
"""
Commit offsets to Kafka.
Parameters:
- offsets: Dict[TopicPartition, OffsetAndMetadata], specific offsets to commit
If None, commits current position for all assigned partitions
"""
def commit_async(self, offsets=None, callback=None):
"""
Asynchronous offset commit.
Parameters:
- offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to commit
- callback: Callable, completion callback function
"""
def seek(self, partition, offset):
"""
Seek to specific offset in partition.
Parameters:
- partition: TopicPartition, target partition
- offset: int, target offset
"""
def seek_to_beginning(self, *partitions):
"""
Seek to beginning of partitions.
Parameters:
- *partitions: TopicPartition, partitions to seek (all assigned if none)
"""
def seek_to_end(self, *partitions):
"""
Seek to end of partitions.
Parameters:
- *partitions: TopicPartition, partitions to seek (all assigned if none)
"""
def position(self, partition):
"""
Get current position (next fetch offset) for partition.
Parameters:
- partition: TopicPartition, target partition
Returns:
- int: current position offset
"""
def committed(self, partition):
"""
Get last committed offset for partition.
Parameters:
- partition: TopicPartition, target partition
Returns:
- OffsetAndMetadata: last committed offset with metadata
"""
def pause(self, *partitions):
"""
Suspend fetching from partitions.
Parameters:
- *partitions: TopicPartition, partitions to pause
"""
def resume(self, *partitions):
"""
Resume fetching from partitions.
Parameters:
- *partitions: TopicPartition, partitions to resume
"""
def paused(self):
"""
Get currently paused partitions.
Returns:
- Set[TopicPartition]: paused partitions
"""
def close(self, autocommit=True):
"""
Close consumer and clean up resources.
Parameters:
- autocommit: bool, commit offsets before closing
"""
def subscription(self):
"""
Get current topic subscription.
Returns:
- Set[str]: subscribed topic names
"""
def assignment(self):
"""
Get current partition assignment.
Returns:
- Set[TopicPartition]: assigned partitions
"""
def beginning_offsets(self, partitions):
"""
Get earliest available offsets for partitions.
Parameters:
- partitions: List[TopicPartition], target partitions
Returns:
- Dict[TopicPartition, int]: earliest offsets
"""
def end_offsets(self, partitions):
"""
Get latest offsets for partitions.
Parameters:
- partitions: List[TopicPartition], target partitions
Returns:
- Dict[TopicPartition, int]: latest offsets
"""
def offsets_for_times(self, timestamps):
"""
Get offsets for specific timestamps.
Parameters:
- timestamps: Dict[TopicPartition, int], timestamp mapping
Returns:
- Dict[TopicPartition, OffsetAndTimestamp]: offset and timestamp info
"""
def metrics(self):
"""
Get consumer metrics.
Returns:
- Dict[str, float]: current metric values
"""Abstract base class for handling partition rebalancing events in consumer groups. Implement this interface to perform cleanup or initialization when partitions are assigned or revoked.
class ConsumerRebalanceListener:
def on_partitions_revoked(self, revoked):
"""
Called before partitions are reassigned.
Use this callback to commit offsets and clean up state
for partitions that are being revoked.
Parameters:
- revoked: List[TopicPartition], partitions being revoked
"""
def on_partitions_assigned(self, assigned):
"""
Called after partitions are assigned.
Use this callback to set up state or seek to specific
offsets for newly assigned partitions.
Parameters:
- assigned: List[TopicPartition], partitions being assigned
"""Result of consumer poll() operation containing records organized by partition.
class ConsumerRecords:
def __init__(self, record_map):
"""
Container for poll() results.
Parameters:
- record_map: Dict[TopicPartition, List[ConsumerRecord]]
"""
def __iter__(self):
"""Iterate over all records across all partitions."""
def __len__(self):
"""Total number of records across all partitions."""
def __bool__(self):
"""True if contains any records."""
def records(self, partition):
"""
Get records for specific partition.
Parameters:
- partition: TopicPartition, target partition
Returns:
- List[ConsumerRecord]: records for partition
"""
def by_topic(self):
"""
Group records by topic.
Returns:
- Dict[str, List[ConsumerRecord]]: records grouped by topic
"""Individual record consumed from Kafka containing message data and metadata.
class ConsumerRecord:
topic: str # Topic name
partition: int # Partition number
offset: int # Message offset
timestamp: int # Message timestamp (milliseconds)
timestamp_type: int # Timestamp type (0=CreateTime, 1=LogAppendTime)
key: bytes # Message key (raw bytes)
value: bytes # Message value (raw bytes)
headers: List[Tuple[str, bytes]] # Message headers
checksum: int # Message checksum
serialized_key_size: int # Serialized key size
serialized_value_size: int # Serialized value size
leader_epoch: int # Leader epochfrom kafka import KafkaConsumer
import json
# Create consumer with automatic offset management
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=1000
)
# Process messages
for message in consumer:
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Key: {message.key}")
print(f"Value: {message.value}")
print(f"Timestamp: {message.timestamp}")
consumer.close()from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id=None, # No consumer group
value_deserializer=lambda m: m.decode('utf-8')
)
# Manually assign specific partitions
partitions = [
TopicPartition('topic1', 0),
TopicPartition('topic1', 1),
TopicPartition('topic2', 0)
]
consumer.assign(partitions)
# Seek to specific positions
consumer.seek(TopicPartition('topic1', 0), 100)
consumer.seek_to_end(TopicPartition('topic1', 1))
# Poll for messages
while True:
records = consumer.poll(timeout_ms=1000)
for partition, messages in records.items():
for message in messages:
print(f"Partition {partition}: {message.value}")from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='manual-commit-group',
enable_auto_commit=False, # Disable auto-commit
auto_offset_reset='earliest'
)
try:
while True:
records = consumer.poll(timeout_ms=1000)
for partition, messages in records.items():
for message in messages:
# Process message
print(f"Processing: {message.value}")
# Manually commit after processing
consumer.commit({
partition: OffsetAndMetadata(message.offset + 1, None)
})
except KeyboardInterrupt:
pass
finally:
consumer.close()from kafka import KafkaConsumer
from kafka.consumer import ConsumerRebalanceListener
import logging
class MyRebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer):
self.consumer = consumer
def on_partitions_revoked(self, revoked):
logging.info(f"Partitions revoked: {revoked}")
# Commit current offsets before partitions are reassigned
self.consumer.commit()
def on_partitions_assigned(self, assigned):
logging.info(f"Partitions assigned: {assigned}")
# Could seek to specific offsets or perform other setup
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='rebalance-group'
)
listener = MyRebalanceListener(consumer)
consumer.subscribe(['my-topic'], listener=listener)
for message in consumer:
print(f"Received: {message.value}")from kafka import KafkaConsumer
consumer = KafkaConsumer(
'batch-topic',
bootstrap_servers=['localhost:9092'],
group_id='batch-group',
max_poll_records=100, # Process up to 100 records per poll
enable_auto_commit=False
)
def process_batch(records):
"""Process a batch of records together."""
batch_data = []
for record in records:
batch_data.append(record.value)
# Simulate batch processing
print(f"Processing batch of {len(batch_data)} records")
# ... perform batch operation ...
return True
try:
while True:
# Poll for a batch of records
record_batch = consumer.poll(timeout_ms=5000, max_records=50)
if record_batch:
# Flatten records from all partitions
all_records = []
last_offsets = {}
for partition, records in record_batch.items():
all_records.extend(records)
if records:
last_offsets[partition] = records[-1].offset + 1
# Process the batch
if process_batch(all_records):
# Commit offsets for successfully processed batch
offset_data = {
partition: OffsetAndMetadata(offset, None)
for partition, offset in last_offsets.items()
}
consumer.commit(offset_data)
except KeyboardInterrupt:
pass
finally:
consumer.close()Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python