Pure Python client for Apache Kafka distributed stream processing system
npx @tessl/cli install tessl/pypi-kafka-python@2.2.0A pure Python client library for Apache Kafka distributed stream processing system. Provides comprehensive support for producers, consumers, and administrative operations with compatibility across Kafka broker versions 0.8.0 to 2.6+.
pip install kafka-pythonpip install kafka-python[crc32c] - Faster CRC32C validationpip install kafka-python[lz4] - LZ4 compression supportpip install kafka-python[snappy] - Snappy compression supportpip install kafka-python[zstd] - Zstandard compression supportBasic imports for common usage:
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClientTopic and partition management:
from kafka import TopicPartition, OffsetAndMetadataConsumer coordination:
from kafka.consumer import ConsumerRebalanceListenerAdministrative objects:
from kafka.admin import NewTopic, NewPartitions, ConfigResource, ACLLow-level client and connection:
from kafka import KafkaClient, BrokerConnectionSerialization interfaces:
from kafka import Serializer, Deserializerfrom kafka import KafkaProducer
import json
# Configure producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
# Send message
future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')
# Block for acknowledgment
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")
# Clean shutdown
producer.close()from kafka import KafkaConsumer
import json
# Configure consumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-consumer-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest'
)
# Consume messages
for message in consumer:
print(f"Received: key={message.key}, value={message.value}")
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
# Clean shutdown
consumer.close()from kafka import KafkaAdminClient
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType
# Configure admin client
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# Create topic
topic = NewTopic(name='new-topic', num_partitions=3, replication_factor=1)
admin.create_topics([topic])
# List topics
topics = admin.list_topics()
print(f"Available topics: {list(topics)}")
# Clean shutdown
admin.close()kafka-python follows a layered architecture designed for both simplicity and extensibility:
KafkaProducer, KafkaConsumer, KafkaAdminClient provide simple interfaces for common operationsKafkaClient handles protocol communication and connection managementBrokerConnection manages individual TCP connections with proper error handling and reconnectionThis design enables everything from simple fire-and-forget message sending to complex transactional processing and cluster administration.
High-level producer for publishing records to Kafka topics with support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.
class KafkaProducer:
def __init__(self, **configs): ...
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None): ...
def flush(self, timeout=None): ...
def close(self, timeout=None): ...High-level consumer for consuming records from Kafka topics with support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.
class KafkaConsumer:
def __init__(self, *topics, **configs): ...
def subscribe(self, topics=(), pattern=None, listener=None): ...
def assign(self, partitions): ...
def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
def commit(self, offsets=None): ...
def seek(self, partition, offset): ...class ConsumerRebalanceListener:
def on_partitions_revoked(self, revoked): ...
def on_partitions_assigned(self, assigned): ...Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.
class KafkaAdminClient:
def __init__(self, **configs): ...
def create_topics(self, topic_requests, timeout_ms=None, validate_only=False): ...
def delete_topics(self, topics, timeout_ms=None): ...
def list_topics(self, timeout_ms=None): ...
def describe_topics(self, topics, timeout_ms=None): ...
def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False): ...
def describe_configs(self, config_resources, timeout_ms=None): ...
def alter_configs(self, config_resources, timeout_ms=None): ...Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.
TopicPartition = NamedTuple('TopicPartition', [('topic', str), ('partition', int)])
OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [('offset', int), ('metadata', str), ('leader_epoch', int)])
BrokerMetadata = NamedTuple('BrokerMetadata', [('nodeId', int), ('host', str), ('port', int), ('rack', str)])Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.
class KafkaError(Exception):
retriable: bool
invalid_metadata: bool
class KafkaTimeoutError(KafkaError): ...
class KafkaConnectionError(KafkaError): ...
class TopicAuthorizationFailedError(AuthorizationError): ...Low-level client for direct protocol communication and connection management, providing fine-grained control over cluster interactions and metadata handling.
class KafkaClient:
def __init__(self, **configs): ...
def bootstrap_connected(self): ...
def check_version(self, node_id=None, timeout=2, strict=False): ...
def cluster(self): ...
class BrokerConnection:
def __init__(self, host, port, **configs): ...
def connect(self): ...
def connected(self): ...
def close(self): ...Abstract base classes for implementing custom serializers and deserializers with pluggable serialization strategies.
class Serializer:
def serialize(self, topic, value): ...
def close(self): ...
class Deserializer:
def deserialize(self, topic, bytes_): ...
def close(self): ...kafka-python supports multiple authentication mechanisms:
Built-in support for multiple compression algorithms:
Extensible serialization framework with abstract base classes for custom serializers and deserializers.
# Producer configuration dictionary with string keys and various value types
ProducerConfig = Dict[str, Any]
# Consumer configuration dictionary with string keys and various value types
ConsumerConfig = Dict[str, Any]
# Admin client configuration dictionary with string keys and various value types
AdminConfig = Dict[str, Any]# Producer message future returned by send()
class FutureRecordMetadata:
def get(self, timeout=None): ...
def add_callback(self, callback): ...
def add_errback(self, errback): ...
# Consumer record received from poll()
class ConsumerRecord:
topic: str
partition: int
offset: int
timestamp: int
timestamp_type: int
key: bytes
value: bytes
headers: List[Tuple[str, bytes]]