Pure Python client for Apache Kafka distributed stream processing system
—
High-level producer for publishing records to Kafka topics with comprehensive support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.
Main producer class providing high-level interface for sending records to Kafka topics. Supports synchronous and asynchronous sending with configurable batching, compression, and delivery guarantees.
class KafkaProducer:
def __init__(self, **configs):
"""
Initialize Kafka producer.
Configuration Parameters:
- bootstrap_servers: List[str], broker addresses
- key_serializer: Callable, key serialization function
- value_serializer: Callable, value serialization function
- acks: str|int, acknowledgment policy ('0', '1', 'all'/-1)
- retries: int, number of retries for failed sends (default: 2147483647)
- batch_size: int, batch size in bytes (default: 16384)
- linger_ms: int, time to wait for batching (default: 0)
- buffer_memory: int, total memory for buffering (default: 33554432)
- compression_type: str, 'none', 'gzip', 'snappy', 'lz4', 'zstd'
- max_in_flight_requests_per_connection: int, max unacknowledged requests (default: 5)
- request_timeout_ms: int, request timeout (default: 30000)
- delivery_timeout_ms: int, delivery timeout (default: 120000)
- max_request_size: int, max request size (default: 1048576)
- send_buffer_bytes: int, TCP send buffer size (default: 131072)
- receive_buffer_bytes: int, TCP receive buffer size (default: 32768)
- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
- api_version: tuple, broker API version or 'auto'
- enable_idempotence: bool, enable idempotent producer (default: False)
- transactional_id: str, transactional producer ID
- partitioner: Callable, custom partitioner function
- max_block_ms: int, max time to block on send (default: 60000)
"""
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None):
"""
Asynchronously send record to topic.
Parameters:
- topic: str, target topic name
- value: any, message value (will be serialized)
- key: any, message key (will be serialized)
- partition: int, specific partition (optional)
- timestamp_ms: int, message timestamp in milliseconds
- headers: List[Tuple[str, bytes]], message headers
Returns:
- FutureRecordMetadata: future representing the send result
"""
def flush(self, timeout=None):
"""
Force all buffered records to be sent immediately.
Parameters:
- timeout: float, max time to wait for completion (seconds)
Raises:
- KafkaTimeoutError: if timeout exceeded
"""
def close(self, timeout=None):
"""
Close producer and clean up resources.
Parameters:
- timeout: float, max time to wait for completion (seconds)
"""
def partitions_for(self, topic):
"""
Get available partitions for topic.
Parameters:
- topic: str, topic name
Returns:
- Set[int]: available partition numbers
"""
def metrics(self):
"""
Get producer metrics.
Returns:
- Dict[str, float]: current metric values
"""
def bootstrap_connected(self):
"""
Check if connected to at least one bootstrap server.
Returns:
- bool: True if connected
"""
# Transactional methods (requires Kafka 0.11+)
def init_transactions(self):
"""
Initialize transactions for transactional producer.
Must be called before any other transaction methods.
Raises:
- IllegalStateError: if transactional_id not configured
- ProducerFencedError: if another producer with same ID is active
"""
def begin_transaction(self):
"""
Begin a new transaction.
Must call init_transactions() first.
Raises:
- IllegalStateError: if no transaction initialized
- ProducerFencedError: if producer is fenced
"""
def send_offsets_to_transaction(self, offsets, consumer_group_id):
"""
Add consumer offsets to current transaction.
Parameters:
- offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to include
- consumer_group_id: str, consumer group ID
Raises:
- IllegalStateError: if no active transaction
- ProducerFencedError: if producer is fenced
"""
def commit_transaction(self):
"""
Commit the current transaction.
Raises:
- IllegalStateError: if no active transaction
- ProducerFencedError: if producer is fenced
"""
def abort_transaction(self):
"""
Abort the current transaction.
Raises:
- IllegalStateError: if no active transaction
- ProducerFencedError: if producer is fenced
"""Future object returned by producer.send() representing an asynchronous send operation.
class FutureRecordMetadata:
def get(self, timeout=None):
"""
Block until send completes and return metadata.
Parameters:
- timeout: float, max time to wait (seconds)
Returns:
- RecordMetadata: send result metadata
Raises:
- KafkaError: if send failed
- KafkaTimeoutError: if timeout exceeded
"""
def add_callback(self, callback):
"""
Add success callback.
Parameters:
- callback: Callable[[RecordMetadata], None], success callback
"""
def add_errback(self, errback):
"""
Add error callback.
Parameters:
- errback: Callable[[Exception], None], error callback
"""
def is_done(self):
"""
Check if send operation completed.
Returns:
- bool: True if completed (success or failure)
"""
def succeeded(self):
"""
Check if send succeeded.
Returns:
- bool: True if succeeded
"""
def failed(self):
"""
Check if send failed.
Returns:
- bool: True if failed
"""Metadata returned after successful record send containing partition and offset information.
class RecordMetadata:
topic: str # Topic name
partition: int # Partition number
offset: int # Record offset
timestamp: int # Record timestamp
checksum: int # Record checksum
serialized_key_size: int # Serialized key size
serialized_value_size: int # Serialized value size
leader_epoch: int # Leader epochInterface for custom partitioning strategies. The default partitioner uses murmur2 hash for keyed messages and round-robin for keyless messages.
class DefaultPartitioner:
def __call__(self, key, all_partitions, available_partitions):
"""
Select partition for message.
Parameters:
- key: bytes, message key (may be None)
- all_partitions: List[int], all partition numbers
- available_partitions: List[int], available partition numbers
Returns:
- int: selected partition number
"""
def murmur2(data):
"""
Pure Python murmur2 hash implementation.
Parameters:
- data: bytes, data to hash
Returns:
- int: hash value
"""Abstract base classes for custom key and value serialization.
class Serializer:
def serialize(self, topic, value):
"""
Serialize value to bytes.
Parameters:
- topic: str, topic name
- value: any, value to serialize
Returns:
- bytes: serialized value
"""
def close(self):
"""Clean up resources."""from kafka import KafkaProducer
import json
# Create producer with JSON serialization
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
# Send message asynchronously
future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')
# Block for acknowledgment
try:
record_metadata = future.get(timeout=10)
print(f"Message sent to topic {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
except Exception as e:
print(f"Send failed: {e}")
producer.close()from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
acks=0 # Fire and forget
)
# Send many messages without waiting for acknowledgment
for i in range(1000):
producer.send('events', value=f'Event {i}')
# Force send all buffered messages
producer.flush()
producer.close()from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
acks='all', # Wait for all replicas
retries=3
)
def send_sync(topic, value, key=None):
"""Send message synchronously with error handling."""
try:
future = producer.send(topic, value=value, key=key)
record_metadata = future.get(timeout=30)
return record_metadata
except KafkaError as e:
print(f"Failed to send message: {e}")
return None
# Send messages synchronously
metadata = send_sync('orders', 'Order #12345', key='customer-123')
if metadata:
print(f"Order sent successfully to offset {metadata.offset}")
producer.close()from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode
)
def on_success(record_metadata):
print(f"Message sent successfully: topic={record_metadata.topic} "
f"partition={record_metadata.partition} offset={record_metadata.offset}")
def on_error(exception):
print(f"Message send failed: {exception}")
# Send with callbacks
for i in range(10):
future = producer.send('async-topic', value=f'Message {i}')
future.add_callback(on_success)
future.add_errback(on_error)
# Give time for callbacks to complete
time.sleep(2)
producer.close()from kafka import KafkaProducer
# Configure for high throughput with batching and compression
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
# Batching configuration
batch_size=32768, # 32KB batches
linger_ms=100, # Wait up to 100ms to fill batch
# Compression
compression_type='lz4',
# Memory and throughput
buffer_memory=67108864, # 64MB buffer
max_in_flight_requests_per_connection=10
)
# Send many messages - will be batched and compressed
for i in range(10000):
producer.send('high-throughput', value=f'Batch message {i}')
producer.flush()
producer.close()from kafka import KafkaProducer
import hashlib
def custom_partitioner(key, all_partitions, available_partitions):
"""Custom partitioner using SHA-256 hash."""
if key is None:
# Round-robin for messages without keys
partition = hash(time.time()) % len(available_partitions)
return available_partitions[partition]
else:
# Hash-based partitioning for keyed messages
hash_value = int(hashlib.sha256(key).hexdigest(), 16)
return all_partitions[hash_value % len(all_partitions)]
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
key_serializer=str.encode,
partitioner=custom_partitioner
)
# Messages with same key will go to same partition
producer.send('partitioned-topic', key='user-123', value='Event A')
producer.send('partitioned-topic', key='user-123', value='Event B')
producer.send('partitioned-topic', key='user-456', value='Event C')
producer.close()from kafka import KafkaProducer
# Enable idempotence for exactly-once semantics
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
enable_idempotence=True,
acks='all',
retries=10,
max_in_flight_requests_per_connection=1 # Required for idempotence
)
# Producer will automatically retry and deduplicate
for i in range(100):
producer.send('exactly-once-topic', value=f'Idempotent message {i}')
producer.close()from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
# Configure transactional producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
transactional_id='my-transactional-id', # Required for transactions
enable_idempotence=True, # Required for transactions
acks='all' # Required for transactions
)
# Initialize transactions (must be called once)
producer.init_transactions()
try:
# Begin transaction
producer.begin_transaction()
# Send messages as part of transaction
producer.send('orders', value='Order #1001')
producer.send('inventory', value='Item #5001 reserved')
# Include consumer offsets in transaction (consume-transform-produce pattern)
consumer_offsets = {
TopicPartition('input-topic', 0): OffsetAndMetadata(100, None)
}
producer.send_offsets_to_transaction(consumer_offsets, 'my-consumer-group')
# Commit transaction - all messages become visible atomically
producer.commit_transaction()
print("Transaction committed successfully")
except Exception as e:
print(f"Transaction failed: {e}")
# Abort transaction - no messages become visible
producer.abort_transaction()
finally:
producer.close()from kafka import KafkaProducer
from kafka.serializer import Serializer
import pickle
import json
class PickleSerializer(Serializer):
"""Custom serializer using pickle."""
def serialize(self, topic, value):
if value is None:
return None
return pickle.dumps(value)
class JSONSerializer(Serializer):
"""Custom JSON serializer with error handling."""
def serialize(self, topic, value):
if value is None:
return None
try:
return json.dumps(value).encode('utf-8')
except (TypeError, ValueError) as e:
raise ValueError(f"Cannot serialize {value} to JSON: {e}")
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=str.encode,
value_serializer=JSONSerializer()
)
# Send complex objects
producer.send('json-topic',
key='order-123',
value={'order_id': 123, 'items': ['item1', 'item2'], 'total': 99.99})
producer.close()from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError, MessageSizeTooLargeError
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
retries=5,
retry_backoff_ms=1000,
request_timeout_ms=30000
)
def robust_send(topic, value, key=None, max_retries=3):
"""Send with custom retry logic and error handling."""
for attempt in range(max_retries + 1):
try:
future = producer.send(topic, value=value, key=key)
record_metadata = future.get(timeout=30)
return record_metadata
except MessageSizeTooLargeError:
print(f"Message too large for topic {topic}")
return None
except KafkaTimeoutError:
print(f"Timeout on attempt {attempt + 1}")
if attempt == max_retries:
print("Max retries reached, giving up")
return None
time.sleep(2 ** attempt) # Exponential backoff
except KafkaError as e:
print(f"Kafka error on attempt {attempt + 1}: {e}")
if not e.retriable or attempt == max_retries:
return None
time.sleep(1)
return None
# Use robust sending
result = robust_send('critical-topic', 'Important message')
if result:
print(f"Message sent successfully: offset {result.offset}")
else:
print("Failed to send message after all retries")
producer.close()# High-throughput producer configuration
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
# Increase batch size for better throughput
batch_size=65536, # 64KB batches
linger_ms=50, # Small delay to fill batches
# Use fast compression
compression_type='lz4', # Fast compression
# Increase memory and concurrent requests
buffer_memory=134217728, # 128MB buffer
max_in_flight_requests_per_connection=10,
# Reduce acknowledgment requirements
acks=1, # Only wait for leader
# Increase network buffers
send_buffer_bytes=262144, # 256KB send buffer
receive_buffer_bytes=65536 # 64KB receive buffer
)# Low-latency producer configuration
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=str.encode,
# Minimize batching delays
batch_size=1, # Send immediately
linger_ms=0, # No delay
# No compression for speed
compression_type='none',
# Faster acknowledgments
acks=1,
# Reduce timeout values
request_timeout_ms=10000, # 10 second timeout
delivery_timeout_ms=30000 # 30 second delivery timeout
)Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python