CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Overview
Eval results
Files

producer.mddocs/

Producer Operations

High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. The KafkaProducer is thread-safe and designed for high-throughput scenarios.

Capabilities

KafkaProducer

Main producer class for sending records to Kafka topics. Provides asynchronous sending with futures, automatic batching, and configurable retry logic.

class KafkaProducer:
    def __init__(self, **configs):
        """
        Create a KafkaProducer instance.
        
        Args:
            **configs: Producer configuration options including:
                bootstrap_servers (list): List of Kafka brokers
                key_serializer (callable): Function to serialize keys  
                value_serializer (callable): Function to serialize values
                acks (int|str): Acknowledgment requirements (0, 1, 'all')
                retries (int): Number of retry attempts
                batch_size (int): Batch size in bytes
                linger_ms (int): Time to wait for batching
                buffer_memory (int): Total memory for buffering
                compression_type (str): Compression algorithm ('gzip', 'snappy', 'lz4', 'zstd')
                max_in_flight_requests_per_connection (int): Max unacknowledged requests
                request_timeout_ms (int): Request timeout
                retry_backoff_ms (int): Retry backoff time
                client_id (str): Client identifier
                security_protocol (str): Security protocol ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
                ssl_context: SSL context for encrypted connections
                sasl_mechanism (str): SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'GSSAPI', 'OAUTHBEARER')
                sasl_plain_username (str): Username for PLAIN SASL
                sasl_plain_password (str): Password for PLAIN SASL
        """
        
    def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
        """
        Send a record to the specified topic.
        
        Args:
            topic (str): The topic to send the record to
            value: The record value (will be serialized using value_serializer)
            key: The record key (will be serialized using key_serializer) 
            headers (list): List of (key, value) header tuples
            partition (int): Specific partition to send to (optional)
            timestamp_ms (int): Record timestamp in milliseconds (optional)
            
        Returns:
            FutureRecordMetadata: Future that resolves to RecordMetadata when send completes
        """
        
    def flush(self, timeout=None):
        """
        Flush all pending records, blocking until complete.
        
        Args:
            timeout (float): Maximum time to wait in seconds
        """
        
    def close(self, timeout=None):
        """
        Close the producer and release resources.
        
        Args:
            timeout (float): Maximum time to wait for pending sends
        """
        
    def partitions_for(self, topic: str):
        """
        Get available partitions for a topic.
        
        Args:
            topic (str): Topic name
            
        Returns:
            set: Set of available partition numbers
        """
        
    def bootstrap_connected(self):
        """
        Check if producer has established bootstrap connection.
        
        Returns:
            bool: True if connected to at least one bootstrap server
        """
        
    def metrics(self, raw=False):
        """
        Get producer performance metrics.
        
        Args:
            raw (bool): If True, return raw metrics dict. If False, return formatted metrics.
            
        Returns:
            dict: Producer performance metrics including send rates, batch sizes,
                 buffer usage, and request latencies
        """

Future Objects

The send() method returns future objects that can be used to handle asynchronous results.

class FutureRecordMetadata:
    def get(self, timeout=None):
        """
        Get the RecordMetadata result, blocking if necessary.
        
        Args:
            timeout (float): Maximum time to wait in seconds
            
        Returns:
            RecordMetadata: Metadata about the sent record
            
        Raises:
            KafkaError: If send failed
        """
        
    def add_callback(self, callback):
        """
        Add callback function to be called when send completes.
        
        Args:
            callback (callable): Function called with (metadata, exception)
        """
        
    def add_errback(self, errback):
        """
        Add error callback for send failures.
        
        Args:
            errback (callable): Function called with exception on failure
        """
        
    def is_done(self):
        """
        Check if the send operation is complete.
        
        Returns:
            bool: True if send is complete (success or failure)
        """

class RecordMetadata:
    topic: str           # Topic name
    partition: int       # Partition number  
    offset: int         # Record offset in partition
    timestamp: int      # Record timestamp
    checksum: int       # Record checksum
    serialized_key_size: int    # Serialized key size in bytes
    serialized_value_size: int  # Serialized value size in bytes

Usage Examples

Basic Producer Usage

from kafka import KafkaProducer
import json

# Create producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: str(k).encode('utf-8')
)

# Send a message
future = producer.send('my-topic', key='user123', value={'action': 'login'})

# Block until send completes and get metadata
metadata = future.get(timeout=10)
print(f"Sent to partition {metadata.partition} at offset {metadata.offset}")

producer.close()

Producer with Callbacks

from kafka import KafkaProducer

def on_success(metadata):
    print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")

def on_error(exception):
    print(f"Send failed: {exception}")

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Send with callbacks
future = producer.send('my-topic', b'message-value')
future.add_callback(on_success)
future.add_errback(on_error)

producer.flush()
producer.close()

High-Throughput Configuration

from kafka import KafkaProducer

# Configure for high throughput
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    batch_size=16384,           # 16KB batches
    linger_ms=10,              # Wait 10ms for batching
    compression_type='lz4',     # Compress batches
    buffer_memory=33554432,     # 32MB buffer
    max_in_flight_requests_per_connection=5
)

# Send many messages quickly
for i in range(1000):
    producer.send('high-volume-topic', f'message-{i}'.encode())

producer.flush()
producer.close()

Secure Producer (SSL + SASL)

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['secure-broker:9093'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_username='myuser',
    sasl_plain_password='mypassword',
    ssl_check_hostname=True,
    ssl_cafile='ca-cert.pem'
)

producer.send('secure-topic', b'encrypted message')
producer.close()

Custom Partitioning

from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner

class CustomPartitioner:
    def __init__(self):
        self.default = DefaultPartitioner()
    
    def partition(self, topic, key, all_partitions, available_partitions):
        # Custom logic here
        if key and key.startswith(b'priority-'):
            return 0  # Send priority messages to partition 0
        return self.default.partition(topic, key, all_partitions, available_partitions)

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    partitioner=CustomPartitioner()
)

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python-ng

docs

admin.md

connection.md

consumer.md

errors.md

index.md

producer.md

serialization.md

structs.md

tile.json