Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
—
High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based consumption and manual partition assignment.
Main consumer class for consuming records from Kafka topics. Provides automatic group coordination, offset management, and flexible consumption patterns.
class KafkaConsumer:
def __init__(self, *topics, **configs):
"""
Create a KafkaConsumer instance.
Args:
*topics: Topics to subscribe to initially
**configs: Consumer configuration options including:
bootstrap_servers (list): List of Kafka brokers
group_id (str): Consumer group identifier
key_deserializer (callable): Function to deserialize keys
value_deserializer (callable): Function to deserialize values
auto_offset_reset (str): What to do when no offset ('earliest', 'latest', 'none')
enable_auto_commit (bool): Whether to auto-commit offsets
auto_commit_interval_ms (int): Auto-commit interval
session_timeout_ms (int): Group session timeout
heartbeat_interval_ms (int): Heartbeat interval
max_poll_records (int): Maximum records per poll
max_poll_interval_ms (int): Maximum time between polls
client_id (str): Client identifier
security_protocol (str): Security protocol
ssl_context: SSL context
sasl_mechanism (str): SASL mechanism
consumer_timeout_ms (int): Consumer timeout
"""
def subscribe(self, topics=None, pattern=None, listener=None):
"""
Subscribe to topics or topic pattern.
Args:
topics (list): List of topic names to subscribe to
pattern (str): Regex pattern for topic matching
listener (ConsumerRebalanceListener): Rebalance event listener
"""
def unsubscribe(self):
"""Unsubscribe from all topics."""
def assign(self, partitions):
"""
Manually assign partitions to consumer.
Args:
partitions (list): List of TopicPartition objects
"""
def assignment(self):
"""
Get current partition assignment.
Returns:
set: Set of TopicPartition objects currently assigned
"""
def subscription(self):
"""
Get current topic subscription.
Returns:
set: Set of subscribed topic names
"""
def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
"""
Fetch records from Kafka.
Args:
timeout_ms (int): Maximum time to wait for records
max_records (int): Maximum number of records to return
update_offsets (bool): Whether to update fetch positions
Returns:
dict: Dictionary mapping TopicPartition to list of ConsumerRecord
"""
def commit(self, offsets=None):
"""
Commit offsets synchronously.
Args:
offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
"""
def commit_async(self, offsets=None, callback=None):
"""
Commit offsets asynchronously.
Args:
offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
callback (callable): Callback function for commit result
"""
def committed(self, partition, metadata=False):
"""
Get committed offset for partition.
Args:
partition (TopicPartition): Partition to check
metadata (bool): Whether to return metadata with offset
Returns:
int or OffsetAndMetadata: Committed offset
"""
def position(self, partition):
"""
Get current position (next fetch offset) for partition.
Args:
partition (TopicPartition): Partition to check
Returns:
int: Current fetch position
"""
def seek(self, partition, offset):
"""
Seek to specific offset for partition.
Args:
partition (TopicPartition): Partition to seek
offset (int): Offset to seek to
"""
def seek_to_beginning(self, *partitions):
"""
Seek to beginning of partitions.
Args:
*partitions: TopicPartition objects (empty = all assigned)
"""
def seek_to_end(self, *partitions):
"""
Seek to end of partitions.
Args:
*partitions: TopicPartition objects (empty = all assigned)
"""
def pause(self, *partitions):
"""
Pause consumption from partitions.
Args:
*partitions: TopicPartition objects to pause
"""
def resume(self, *partitions):
"""
Resume consumption from previously paused partitions.
Args:
*partitions: TopicPartition objects to resume
"""
def paused(self):
"""
Get currently paused partitions.
Returns:
set: Set of paused TopicPartition objects
"""
def topics(self):
"""
Get available topics from cluster.
Returns:
set: Set of available topic names
"""
def partitions_for_topic(self, topic):
"""
Get available partitions for topic.
Args:
topic (str): Topic name
Returns:
set: Set of partition numbers for topic
"""
def beginning_offsets(self, partitions):
"""
Get earliest available offsets for partitions.
Args:
partitions (list): List of TopicPartition objects
Returns:
dict: Dictionary mapping TopicPartition to offset
"""
def end_offsets(self, partitions):
"""
Get latest available offsets for partitions.
Args:
partitions (list): List of TopicPartition objects
Returns:
dict: Dictionary mapping TopicPartition to offset
"""
def close(self):
"""Close the consumer and release resources."""
def highwater(self, partition):
"""
Get high watermark offset for partition.
Args:
partition (TopicPartition): Partition to check
Returns:
int: High watermark offset
"""
def bootstrap_connected(self):
"""
Check if consumer has established bootstrap connection.
Returns:
bool: True if connected to at least one bootstrap server
"""
def offsets_for_times(self, timestamps):
"""
Get offsets for given timestamps.
Args:
timestamps (dict): Dictionary mapping TopicPartition to timestamp
Returns:
dict: Dictionary mapping TopicPartition to OffsetAndTimestamp
"""
def metrics(self, raw=False):
"""
Get consumer performance metrics.
Args:
raw (bool): If True, return raw metrics dict
Returns:
dict: Consumer performance metrics including fetch rates, lag, and timing
"""Data structures returned by the consumer for representing consumed records.
class ConsumerRecord:
topic: str # Topic name
partition: int # Partition number
offset: int # Record offset
timestamp: int # Record timestamp
timestamp_type: int # Timestamp type
key: bytes # Record key (deserialized if deserializer provided)
value: bytes # Record value (deserialized if deserializer provided)
headers: list # List of (key, value) header tuples
checksum: int # Record checksum
serialized_key_size: int # Key size in bytes
serialized_value_size: int # Value size in bytes
class ConsumerRebalanceListener:
def on_partitions_revoked(self, revoked):
"""
Called when partitions are revoked from consumer.
Args:
revoked (list): List of TopicPartition objects being revoked
"""
def on_partitions_assigned(self, assigned):
"""
Called when new partitions are assigned to consumer.
Args:
assigned (list): List of TopicPartition objects being assigned
"""from kafka import KafkaConsumer
import json
# Create consumer with JSON deserialization
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-consumer-group',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Consume messages
for message in consumer:
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Value: {message.value}")from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='manual-commit-group',
enable_auto_commit=False # Disable auto-commit
)
consumer.subscribe(['my-topic'])
try:
while True:
# Poll for messages
message_batch = consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
for message in messages:
# Process message
print(f"Processing: {message.value}")
# Manually commit after processing
consumer.commit({
topic_partition: OffsetAndMetadata(message.offset + 1, "processed")
})
except KeyboardInterrupt:
consumer.close()from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id=None # No group coordination
)
# Manually assign specific partitions
partitions = [
TopicPartition('topic-1', 0),
TopicPartition('topic-1', 1),
TopicPartition('topic-2', 0)
]
consumer.assign(partitions)
# Seek to specific offsets
consumer.seek(TopicPartition('topic-1', 0), 100)
consumer.seek_to_beginning(TopicPartition('topic-1', 1))
for message in consumer:
print(f"Manual assignment: {message.topic}:{message.partition}:{message.offset}")from kafka import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
class RebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer):
self.consumer = consumer
def on_partitions_revoked(self, revoked):
print(f"Partitions revoked: {revoked}")
# Commit current offsets before rebalance
self.consumer.commit()
def on_partitions_assigned(self, assigned):
print(f"Partitions assigned: {assigned}")
# Reset to beginning for new partitions
for partition in assigned:
self.consumer.seek_to_beginning(partition)
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='rebalance-group'
)
listener = RebalanceListener(consumer)
consumer.subscribe(['my-topic'], listener=listener)
for message in consumer:
print(f"Consumed: {message.value}")from kafka import KafkaConsumer
consumer = KafkaConsumer(
'batch-topic',
bootstrap_servers=['localhost:9092'],
group_id='batch-processor',
max_poll_records=100, # Get up to 100 records per poll
enable_auto_commit=False
)
while True:
# Get batch of messages
message_batch = consumer.poll(timeout_ms=5000, max_records=100)
if not message_batch:
continue
# Process batch
batch_count = 0
for topic_partition, messages in message_batch.items():
for message in messages:
# Process message
batch_count += 1
print(f"Processed batch of {batch_count} messages")
# Commit batch
consumer.commit()from kafka import KafkaConsumer
consumer = KafkaConsumer(
'secure-topic',
bootstrap_servers=['secure-broker:9093'],
group_id='secure-group',
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='myuser',
sasl_plain_password='mypassword',
ssl_check_hostname=True,
ssl_cafile='ca-cert.pem'
)
for message in consumer:
print(f"Secure message: {message.value}")Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python-ng