Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
—
High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. The KafkaProducer is thread-safe and designed for high-throughput scenarios.
Main producer class for sending records to Kafka topics. Provides asynchronous sending with futures, automatic batching, and configurable retry logic.
class KafkaProducer:
def __init__(self, **configs):
"""
Create a KafkaProducer instance.
Args:
**configs: Producer configuration options including:
bootstrap_servers (list): List of Kafka brokers
key_serializer (callable): Function to serialize keys
value_serializer (callable): Function to serialize values
acks (int|str): Acknowledgment requirements (0, 1, 'all')
retries (int): Number of retry attempts
batch_size (int): Batch size in bytes
linger_ms (int): Time to wait for batching
buffer_memory (int): Total memory for buffering
compression_type (str): Compression algorithm ('gzip', 'snappy', 'lz4', 'zstd')
max_in_flight_requests_per_connection (int): Max unacknowledged requests
request_timeout_ms (int): Request timeout
retry_backoff_ms (int): Retry backoff time
client_id (str): Client identifier
security_protocol (str): Security protocol ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
ssl_context: SSL context for encrypted connections
sasl_mechanism (str): SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'GSSAPI', 'OAUTHBEARER')
sasl_plain_username (str): Username for PLAIN SASL
sasl_plain_password (str): Password for PLAIN SASL
"""
def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
"""
Send a record to the specified topic.
Args:
topic (str): The topic to send the record to
value: The record value (will be serialized using value_serializer)
key: The record key (will be serialized using key_serializer)
headers (list): List of (key, value) header tuples
partition (int): Specific partition to send to (optional)
timestamp_ms (int): Record timestamp in milliseconds (optional)
Returns:
FutureRecordMetadata: Future that resolves to RecordMetadata when send completes
"""
def flush(self, timeout=None):
"""
Flush all pending records, blocking until complete.
Args:
timeout (float): Maximum time to wait in seconds
"""
def close(self, timeout=None):
"""
Close the producer and release resources.
Args:
timeout (float): Maximum time to wait for pending sends
"""
def partitions_for(self, topic: str):
"""
Get available partitions for a topic.
Args:
topic (str): Topic name
Returns:
set: Set of available partition numbers
"""
def bootstrap_connected(self):
"""
Check if producer has established bootstrap connection.
Returns:
bool: True if connected to at least one bootstrap server
"""
def metrics(self, raw=False):
"""
Get producer performance metrics.
Args:
raw (bool): If True, return raw metrics dict. If False, return formatted metrics.
Returns:
dict: Producer performance metrics including send rates, batch sizes,
buffer usage, and request latencies
"""The send() method returns future objects that can be used to handle asynchronous results.
class FutureRecordMetadata:
def get(self, timeout=None):
"""
Get the RecordMetadata result, blocking if necessary.
Args:
timeout (float): Maximum time to wait in seconds
Returns:
RecordMetadata: Metadata about the sent record
Raises:
KafkaError: If send failed
"""
def add_callback(self, callback):
"""
Add callback function to be called when send completes.
Args:
callback (callable): Function called with (metadata, exception)
"""
def add_errback(self, errback):
"""
Add error callback for send failures.
Args:
errback (callable): Function called with exception on failure
"""
def is_done(self):
"""
Check if the send operation is complete.
Returns:
bool: True if send is complete (success or failure)
"""
class RecordMetadata:
topic: str # Topic name
partition: int # Partition number
offset: int # Record offset in partition
timestamp: int # Record timestamp
checksum: int # Record checksum
serialized_key_size: int # Serialized key size in bytes
serialized_value_size: int # Serialized value size in bytesfrom kafka import KafkaProducer
import json
# Create producer with JSON serialization
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8')
)
# Send a message
future = producer.send('my-topic', key='user123', value={'action': 'login'})
# Block until send completes and get metadata
metadata = future.get(timeout=10)
print(f"Sent to partition {metadata.partition} at offset {metadata.offset}")
producer.close()from kafka import KafkaProducer
def on_success(metadata):
print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
def on_error(exception):
print(f"Send failed: {exception}")
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Send with callbacks
future = producer.send('my-topic', b'message-value')
future.add_callback(on_success)
future.add_errback(on_error)
producer.flush()
producer.close()from kafka import KafkaProducer
# Configure for high throughput
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
batch_size=16384, # 16KB batches
linger_ms=10, # Wait 10ms for batching
compression_type='lz4', # Compress batches
buffer_memory=33554432, # 32MB buffer
max_in_flight_requests_per_connection=5
)
# Send many messages quickly
for i in range(1000):
producer.send('high-volume-topic', f'message-{i}'.encode())
producer.flush()
producer.close()from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['secure-broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='myuser',
sasl_plain_password='mypassword',
ssl_check_hostname=True,
ssl_cafile='ca-cert.pem'
)
producer.send('secure-topic', b'encrypted message')
producer.close()from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner
class CustomPartitioner:
def __init__(self):
self.default = DefaultPartitioner()
def partition(self, topic, key, all_partitions, available_partitions):
# Custom logic here
if key and key.startswith(b'priority-'):
return 0 # Send priority messages to partition 0
return self.default.partition(topic, key, all_partitions, available_partitions)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
partitioner=CustomPartitioner()
)Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python-ng