Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
—
Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.
from collections import namedtupleData structures for identifying topics and partitions within a Kafka cluster.
TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])
"""
A topic and partition tuple.
Args:
topic (str): Topic name
partition (int): Partition number
Usage:
tp = TopicPartition('my-topic', 0)
print(tp.topic) # 'my-topic'
print(tp.partition) # 0
"""Data structures for representing offsets with associated metadata.
OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata"])
"""
Offset with commit metadata.
Args:
offset (int): The offset to be committed
metadata (str): Non-null metadata
Usage:
oam = OffsetAndMetadata(100, "my-metadata")
print(oam.offset) # 100
print(oam.metadata) # "my-metadata"
"""
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp"])
"""
Offset with associated timestamp.
Args:
offset (int): An offset
timestamp (int): The timestamp associated to the offset
Usage:
oat = OffsetAndTimestamp(100, 1640995200000)
print(oat.offset) # 100
print(oat.timestamp) # 1640995200000
"""Data structures for representing broker and cluster information.
class BrokerMetadata:
def __init__(self, nodeId: int, host: str, port: int, rack: str = None):
"""
Kafka broker metadata.
Args:
nodeId (int): Broker node ID
host (str): Broker hostname
port (int): Broker port
rack (str): Broker rack identifier (optional)
"""
nodeId: int # Broker node ID
host: str # Broker hostname
port: int # Broker port
rack: str # Broker rack (optional)
class PartitionMetadata:
def __init__(self, topic: str, partition: int, leader: int, replicas: list, isr: list, error: int = None):
"""
Partition metadata from cluster.
Args:
topic (str): Topic name
partition (int): Partition number
leader (int): Leader broker node ID
replicas (list): List of replica broker node IDs
isr (list): List of in-sync replica broker node IDs
error (int): Error code (optional)
"""
topic: str # Topic name
partition: int # Partition number
leader: int # Leader broker node ID
replicas: list # Replica broker node IDs
isr: list # In-sync replica broker node IDs
error: int # Error codeData structures for representing consumer group state and member information.
class MemberInformation:
def __init__(self, member_id: str, client_id: str, client_host: str, member_metadata: bytes, member_assignment: bytes):
"""
Consumer group member information.
Args:
member_id (str): Member identifier
client_id (str): Client identifier
client_host (str): Client hostname
member_metadata (bytes): Member metadata
member_assignment (bytes): Member assignment data
"""
member_id: str # Member identifier
client_id: str # Client identifier
client_host: str # Client hostname
member_metadata: bytes # Member metadata
member_assignment: bytes # Member assignment
class GroupInformation:
def __init__(self, error_code: int, group: str, state: str, protocol_type: str, protocol: str, members: list, authorized_operations: set = None):
"""
Consumer group information.
Args:
error_code (int): Error code
group (str): Group ID
state (str): Group state
protocol_type (str): Protocol type
protocol (str): Protocol name
members (list): List of MemberInformation objects
authorized_operations (set): Authorized operations (optional)
"""
error_code: int # Error code
group: str # Group ID
state: str # Group state
protocol_type: str # Protocol type
protocol: str # Protocol name
members: list # List of members
authorized_operations: set # Authorized operationsData structures for producer retry and configuration options.
class RetryOptions:
def __init__(self, limit: int, backoff_ms: int, retry_on_timeouts: bool = True):
"""
Retry policy configuration for async producer.
Args:
limit (int): Maximum retry attempts
backoff_ms (int): Backoff time between retries in milliseconds
retry_on_timeouts (bool): Whether to retry on timeout errors
"""
limit: int # Maximum retry attempts
backoff_ms: int # Backoff time in milliseconds
retry_on_timeouts: bool # Retry on timeoutsData structures for record metadata and timing information.
class RecordMetadata:
def __init__(self, topic: str, partition: int, offset: int, timestamp: int = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):
"""
Metadata for a produced record.
Args:
topic (str): Topic name
partition (int): Partition number
offset (int): Record offset
timestamp (int): Record timestamp (optional)
checksum (int): Record checksum (optional)
serialized_key_size (int): Key size in bytes (optional)
serialized_value_size (int): Value size in bytes (optional)
"""
topic: str # Topic name
partition: int # Partition number
offset: int # Record offset
timestamp: int # Record timestamp
checksum: int # Record checksum
serialized_key_size: int # Key size in bytes
serialized_value_size: int # Value size in bytes
class ConsumerRecord:
def __init__(self, topic: str, partition: int, offset: int, timestamp: int, timestamp_type: int, key: bytes, value: bytes, headers: list = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):
"""
Record consumed from Kafka.
Args:
topic (str): Topic name
partition (int): Partition number
offset (int): Record offset
timestamp (int): Record timestamp
timestamp_type (int): Timestamp type
key (bytes): Record key
value (bytes): Record value
headers (list): List of (key, value) header tuples (optional)
checksum (int): Record checksum (optional)
serialized_key_size (int): Key size in bytes (optional)
serialized_value_size (int): Value size in bytes (optional)
"""
topic: str # Topic name
partition: int # Partition number
offset: int # Record offset
timestamp: int # Record timestamp
timestamp_type: int # Timestamp type
key: bytes # Record key
value: bytes # Record value
headers: list # Header tuples
checksum: int # Record checksum
serialized_key_size: int # Key size in bytes
serialized_value_size: int # Value size in bytesfrom kafka.structs import TopicPartition
from kafka import KafkaConsumer
# Create topic partition identifiers
partition_0 = TopicPartition('my-topic', 0)
partition_1 = TopicPartition('my-topic', 1)
# Use in consumer assignment
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
consumer.assign([partition_0, partition_1])
# Use in offset management
consumer.seek(partition_0, 1000)
current_position = consumer.position(partition_0)
# TopicPartition objects are hashable
partition_set = {partition_0, partition_1}
partition_dict = {partition_0: 'data for partition 0'}from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='my-group',
enable_auto_commit=False
)
partition = TopicPartition('my-topic', 0)
# Manual offset commits with metadata
offset_metadata = OffsetAndMetadata(1500, 'processed batch 123')
consumer.commit({partition: offset_metadata})
# Check committed offset
committed = consumer.committed(partition, metadata=True)
print(f"Committed offset: {committed.offset}, metadata: {committed.metadata}")
# Get offsets with timestamps
from kafka.structs import OffsetAndTimestamp
# Note: OffsetAndTimestamp is typically returned by offset-by-timestamp queriesfrom kafka import KafkaAdminClient
from kafka.structs import BrokerMetadata
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# Get cluster metadata
metadata = admin.list_topics()
# Access broker information
for broker_id, broker in metadata.brokers.items():
print(f"Broker {broker.nodeId}: {broker.host}:{broker.port}")
if broker.rack:
print(f" Rack: {broker.rack}")
# Access partition metadata
for topic_name, topic_metadata in metadata.topics.items():
for partition in topic_metadata.partitions.values():
print(f"Topic {partition.topic} partition {partition.partition}:")
print(f" Leader: {partition.leader}")
print(f" Replicas: {partition.replicas}")
print(f" ISR: {partition.isr}")from kafka import KafkaAdminClient
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# Get consumer group details
group_details = admin.describe_consumer_groups(['my-consumer-group'])
for group_id, group_info in group_details.items():
print(f"Group: {group_info.group}")
print(f"State: {group_info.state}")
print(f"Protocol: {group_info.protocol}")
print("Members:")
for member in group_info.members:
print(f" Member ID: {member.member_id}")
print(f" Client ID: {member.client_id}")
print(f" Host: {member.client_host}")from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Send message and get metadata
future = producer.send('my-topic', key=b'key1', value=b'value1')
record_metadata = future.get(timeout=10)
print(f"Record sent to:")
print(f" Topic: {record_metadata.topic}")
print(f" Partition: {record_metadata.partition}")
print(f" Offset: {record_metadata.offset}")
print(f" Timestamp: {record_metadata.timestamp}")
print(f" Key size: {record_metadata.serialized_key_size} bytes")
print(f" Value size: {record_metadata.serialized_value_size} bytes")from kafka import KafkaConsumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group'
)
for message in consumer:
print(f"Consumed record:")
print(f" Topic: {message.topic}")
print(f" Partition: {message.partition}")
print(f" Offset: {message.offset}")
print(f" Timestamp: {message.timestamp}")
print(f" Key: {message.key}")
print(f" Value: {message.value}")
# Process headers if present
if message.headers:
print(" Headers:")
for header_key, header_value in message.headers:
print(f" {header_key}: {header_value}")from kafka.structs import TopicPartition, OffsetAndMetadata
# TopicPartition equality and hashing
tp1 = TopicPartition('topic-a', 0)
tp2 = TopicPartition('topic-a', 0)
tp3 = TopicPartition('topic-a', 1)
print(tp1 == tp2) # True
print(tp1 == tp3) # False
# Use in sets and dictionaries
partitions = {tp1, tp2, tp3} # Only 2 unique partitions
print(len(partitions)) # 2
# Offset mapping
offsets = {
tp1: OffsetAndMetadata(1000, 'first partition'),
tp3: OffsetAndMetadata(2000, 'second partition')
}
for partition, offset_data in offsets.items():
print(f"{partition.topic}:{partition.partition} = {offset_data.offset}")Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python-ng