CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python

Pure Python client for Apache Kafka distributed stream processing system

Pending
Overview
Eval results
Files

producer.mddocs/

Producer API

High-level producer for publishing records to Kafka topics with comprehensive support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.

Capabilities

KafkaProducer

Main producer class providing high-level interface for sending records to Kafka topics. Supports synchronous and asynchronous sending with configurable batching, compression, and delivery guarantees.

class KafkaProducer:
    def __init__(self, **configs):
        """
        Initialize Kafka producer.
        
        Configuration Parameters:
        - bootstrap_servers: List[str], broker addresses
        - key_serializer: Callable, key serialization function  
        - value_serializer: Callable, value serialization function
        - acks: str|int, acknowledgment policy ('0', '1', 'all'/-1)
        - retries: int, number of retries for failed sends (default: 2147483647)
        - batch_size: int, batch size in bytes (default: 16384)
        - linger_ms: int, time to wait for batching (default: 0)
        - buffer_memory: int, total memory for buffering (default: 33554432)
        - compression_type: str, 'none', 'gzip', 'snappy', 'lz4', 'zstd'
        - max_in_flight_requests_per_connection: int, max unacknowledged requests (default: 5)
        - request_timeout_ms: int, request timeout (default: 30000)
        - delivery_timeout_ms: int, delivery timeout (default: 120000)
        - max_request_size: int, max request size (default: 1048576)
        - send_buffer_bytes: int, TCP send buffer size (default: 131072)
        - receive_buffer_bytes: int, TCP receive buffer size (default: 32768)
        - security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
        - api_version: tuple, broker API version or 'auto'
        - enable_idempotence: bool, enable idempotent producer (default: False)
        - transactional_id: str, transactional producer ID
        - partitioner: Callable, custom partitioner function
        - max_block_ms: int, max time to block on send (default: 60000)
        """
    
    def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None):
        """
        Asynchronously send record to topic.
        
        Parameters:
        - topic: str, target topic name
        - value: any, message value (will be serialized)
        - key: any, message key (will be serialized)  
        - partition: int, specific partition (optional)
        - timestamp_ms: int, message timestamp in milliseconds
        - headers: List[Tuple[str, bytes]], message headers
        
        Returns:
        - FutureRecordMetadata: future representing the send result
        """
    
    def flush(self, timeout=None):
        """
        Force all buffered records to be sent immediately.
        
        Parameters:
        - timeout: float, max time to wait for completion (seconds)
        
        Raises:
        - KafkaTimeoutError: if timeout exceeded
        """
    
    def close(self, timeout=None):
        """
        Close producer and clean up resources.
        
        Parameters:
        - timeout: float, max time to wait for completion (seconds)
        """
    
    def partitions_for(self, topic):
        """
        Get available partitions for topic.
        
        Parameters:
        - topic: str, topic name
        
        Returns:
        - Set[int]: available partition numbers
        """
    
    def metrics(self):
        """
        Get producer metrics.
        
        Returns:
        - Dict[str, float]: current metric values
        """
    
    def bootstrap_connected(self):
        """
        Check if connected to at least one bootstrap server.
        
        Returns:
        - bool: True if connected
        """
    
    # Transactional methods (requires Kafka 0.11+)
    def init_transactions(self):
        """
        Initialize transactions for transactional producer.
        Must be called before any other transaction methods.
        
        Raises:
        - IllegalStateError: if transactional_id not configured
        - ProducerFencedError: if another producer with same ID is active
        """
    
    def begin_transaction(self):
        """
        Begin a new transaction.
        Must call init_transactions() first.
        
        Raises:
        - IllegalStateError: if no transaction initialized
        - ProducerFencedError: if producer is fenced
        """
    
    def send_offsets_to_transaction(self, offsets, consumer_group_id):
        """
        Add consumer offsets to current transaction.
        
        Parameters:
        - offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to include
        - consumer_group_id: str, consumer group ID
        
        Raises:
        - IllegalStateError: if no active transaction
        - ProducerFencedError: if producer is fenced
        """
    
    def commit_transaction(self):
        """
        Commit the current transaction.
        
        Raises:
        - IllegalStateError: if no active transaction
        - ProducerFencedError: if producer is fenced
        """
    
    def abort_transaction(self):
        """
        Abort the current transaction.
        
        Raises:
        - IllegalStateError: if no active transaction
        - ProducerFencedError: if producer is fenced
        """

Future Record Metadata

Future object returned by producer.send() representing an asynchronous send operation.

class FutureRecordMetadata:
    def get(self, timeout=None):
        """
        Block until send completes and return metadata.
        
        Parameters:
        - timeout: float, max time to wait (seconds)
        
        Returns:
        - RecordMetadata: send result metadata
        
        Raises:
        - KafkaError: if send failed
        - KafkaTimeoutError: if timeout exceeded
        """
    
    def add_callback(self, callback):
        """
        Add success callback.
        
        Parameters:
        - callback: Callable[[RecordMetadata], None], success callback
        """
    
    def add_errback(self, errback):
        """
        Add error callback.
        
        Parameters:
        - errback: Callable[[Exception], None], error callback
        """
    
    def is_done(self):
        """
        Check if send operation completed.
        
        Returns:
        - bool: True if completed (success or failure)
        """
    
    def succeeded(self):
        """
        Check if send succeeded.
        
        Returns:
        - bool: True if succeeded
        """
    
    def failed(self):
        """
        Check if send failed.
        
        Returns:
        - bool: True if failed
        """

Record Metadata

Metadata returned after successful record send containing partition and offset information.

class RecordMetadata:
    topic: str           # Topic name
    partition: int       # Partition number
    offset: int          # Record offset  
    timestamp: int       # Record timestamp
    checksum: int        # Record checksum
    serialized_key_size: int     # Serialized key size
    serialized_value_size: int   # Serialized value size
    leader_epoch: int    # Leader epoch

Partitioner Interface

Interface for custom partitioning strategies. The default partitioner uses murmur2 hash for keyed messages and round-robin for keyless messages.

class DefaultPartitioner:
    def __call__(self, key, all_partitions, available_partitions):
        """
        Select partition for message.
        
        Parameters:
        - key: bytes, message key (may be None)
        - all_partitions: List[int], all partition numbers
        - available_partitions: List[int], available partition numbers
        
        Returns:
        - int: selected partition number
        """

def murmur2(data):
    """
    Pure Python murmur2 hash implementation.
    
    Parameters:
    - data: bytes, data to hash
    
    Returns:
    - int: hash value
    """

Serializer Interface

Abstract base classes for custom key and value serialization.

class Serializer:
    def serialize(self, topic, value):
        """
        Serialize value to bytes.
        
        Parameters:
        - topic: str, topic name
        - value: any, value to serialize
        
        Returns:
        - bytes: serialized value
        """
    
    def close(self):
        """Clean up resources."""

Usage Examples

Basic Producer

from kafka import KafkaProducer
import json

# Create producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# Send message asynchronously  
future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')

# Block for acknowledgment
try:
    record_metadata = future.get(timeout=10)
    print(f"Message sent to topic {record_metadata.topic} "
          f"partition {record_metadata.partition} "
          f"offset {record_metadata.offset}")
except Exception as e:
    print(f"Send failed: {e}")

producer.close()

Fire and Forget

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    acks=0  # Fire and forget
)

# Send many messages without waiting for acknowledgment
for i in range(1000):
    producer.send('events', value=f'Event {i}')

# Force send all buffered messages
producer.flush()
producer.close()

Synchronous Sending

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    acks='all',  # Wait for all replicas
    retries=3
)

def send_sync(topic, value, key=None):
    """Send message synchronously with error handling."""
    try:
        future = producer.send(topic, value=value, key=key)
        record_metadata = future.get(timeout=30)
        return record_metadata
    except KafkaError as e:
        print(f"Failed to send message: {e}")
        return None

# Send messages synchronously
metadata = send_sync('orders', 'Order #12345', key='customer-123')
if metadata:
    print(f"Order sent successfully to offset {metadata.offset}")

producer.close()

Asynchronous with Callbacks

from kafka import KafkaProducer
import time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode
)

def on_success(record_metadata):
    print(f"Message sent successfully: topic={record_metadata.topic} "
          f"partition={record_metadata.partition} offset={record_metadata.offset}")

def on_error(exception):
    print(f"Message send failed: {exception}")

# Send with callbacks
for i in range(10):
    future = producer.send('async-topic', value=f'Message {i}')
    future.add_callback(on_success)
    future.add_errback(on_error)

# Give time for callbacks to complete
time.sleep(2)
producer.close()

Batching and Compression

from kafka import KafkaProducer

# Configure for high throughput with batching and compression
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    
    # Batching configuration
    batch_size=32768,    # 32KB batches
    linger_ms=100,       # Wait up to 100ms to fill batch
    
    # Compression
    compression_type='lz4',
    
    # Memory and throughput
    buffer_memory=67108864,  # 64MB buffer
    max_in_flight_requests_per_connection=10
)

# Send many messages - will be batched and compressed
for i in range(10000):
    producer.send('high-throughput', value=f'Batch message {i}')

producer.flush()
producer.close()

Custom Partitioner

from kafka import KafkaProducer
import hashlib

def custom_partitioner(key, all_partitions, available_partitions):
    """Custom partitioner using SHA-256 hash."""
    if key is None:
        # Round-robin for messages without keys
        partition = hash(time.time()) % len(available_partitions)
        return available_partitions[partition]
    else:
        # Hash-based partitioning for keyed messages
        hash_value = int(hashlib.sha256(key).hexdigest(), 16)
        return all_partitions[hash_value % len(all_partitions)]

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    key_serializer=str.encode,
    partitioner=custom_partitioner
)

# Messages with same key will go to same partition
producer.send('partitioned-topic', key='user-123', value='Event A')
producer.send('partitioned-topic', key='user-123', value='Event B')
producer.send('partitioned-topic', key='user-456', value='Event C')

producer.close()

Idempotent Producer

from kafka import KafkaProducer

# Enable idempotence for exactly-once semantics
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    enable_idempotence=True,
    acks='all',
    retries=10,
    max_in_flight_requests_per_connection=1  # Required for idempotence
)

# Producer will automatically retry and deduplicate
for i in range(100):
    producer.send('exactly-once-topic', value=f'Idempotent message {i}')

producer.close()

Transactional Producer

from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata

# Configure transactional producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    transactional_id='my-transactional-id',  # Required for transactions
    enable_idempotence=True,  # Required for transactions
    acks='all'  # Required for transactions
)

# Initialize transactions (must be called once)
producer.init_transactions()

try:
    # Begin transaction
    producer.begin_transaction()
    
    # Send messages as part of transaction
    producer.send('orders', value='Order #1001')
    producer.send('inventory', value='Item #5001 reserved')
    
    # Include consumer offsets in transaction (consume-transform-produce pattern)
    consumer_offsets = {
        TopicPartition('input-topic', 0): OffsetAndMetadata(100, None)
    }
    producer.send_offsets_to_transaction(consumer_offsets, 'my-consumer-group')
    
    # Commit transaction - all messages become visible atomically
    producer.commit_transaction()
    print("Transaction committed successfully")
    
except Exception as e:
    print(f"Transaction failed: {e}")
    # Abort transaction - no messages become visible
    producer.abort_transaction()

finally:
    producer.close()

Custom Serializer

from kafka import KafkaProducer
from kafka.serializer import Serializer
import pickle
import json

class PickleSerializer(Serializer):
    """Custom serializer using pickle."""
    
    def serialize(self, topic, value):
        if value is None:
            return None
        return pickle.dumps(value)

class JSONSerializer(Serializer):
    """Custom JSON serializer with error handling."""
    
    def serialize(self, topic, value):
        if value is None:
            return None
        try:
            return json.dumps(value).encode('utf-8')
        except (TypeError, ValueError) as e:
            raise ValueError(f"Cannot serialize {value} to JSON: {e}")

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=str.encode,
    value_serializer=JSONSerializer()
)

# Send complex objects
producer.send('json-topic', 
              key='order-123',
              value={'order_id': 123, 'items': ['item1', 'item2'], 'total': 99.99})

producer.close()

Error Handling and Retries

from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError, MessageSizeTooLargeError

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    retries=5,
    retry_backoff_ms=1000,
    request_timeout_ms=30000
)

def robust_send(topic, value, key=None, max_retries=3):
    """Send with custom retry logic and error handling."""
    
    for attempt in range(max_retries + 1):
        try:
            future = producer.send(topic, value=value, key=key)
            record_metadata = future.get(timeout=30)
            return record_metadata
            
        except MessageSizeTooLargeError:
            print(f"Message too large for topic {topic}")
            return None
            
        except KafkaTimeoutError:
            print(f"Timeout on attempt {attempt + 1}")
            if attempt == max_retries:
                print("Max retries reached, giving up")
                return None
            time.sleep(2 ** attempt)  # Exponential backoff
            
        except KafkaError as e:
            print(f"Kafka error on attempt {attempt + 1}: {e}")
            if not e.retriable or attempt == max_retries:
                return None
            time.sleep(1)
    
    return None

# Use robust sending
result = robust_send('critical-topic', 'Important message')
if result:
    print(f"Message sent successfully: offset {result.offset}")
else:
    print("Failed to send message after all retries")

producer.close()

Performance Considerations

Throughput Optimization

# High-throughput producer configuration
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    
    # Increase batch size for better throughput
    batch_size=65536,           # 64KB batches
    linger_ms=50,               # Small delay to fill batches
    
    # Use fast compression
    compression_type='lz4',     # Fast compression
    
    # Increase memory and concurrent requests
    buffer_memory=134217728,    # 128MB buffer
    max_in_flight_requests_per_connection=10,
    
    # Reduce acknowledgment requirements
    acks=1,                     # Only wait for leader
    
    # Increase network buffers
    send_buffer_bytes=262144,   # 256KB send buffer
    receive_buffer_bytes=65536  # 64KB receive buffer
)

Latency Optimization

# Low-latency producer configuration  
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=str.encode,
    
    # Minimize batching delays
    batch_size=1,               # Send immediately
    linger_ms=0,                # No delay
    
    # No compression for speed
    compression_type='none',
    
    # Faster acknowledgments
    acks=1,
    
    # Reduce timeout values
    request_timeout_ms=10000,   # 10 second timeout
    delivery_timeout_ms=30000   # 30 second delivery timeout
)

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python

docs

admin.md

consumer.md

errors.md

index.md

producer.md

structures.md

tile.json