CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

serialization.mddocs/

Serialization Framework

Comprehensive serialization framework providing pluggable serialization/deserialization with built-in support for common data types and high-level producer/consumer APIs with automatic serialization.

Capabilities

Base Serializer/Deserializer Classes

Abstract base classes for implementing custom serialization logic.

class Serializer:
    def __call__(self, obj, ctx=None):
        """
        Serialize an object.
        
        Args:
            obj: Object to serialize
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            bytes: Serialized data
            
        Raises:
            SerializationError: If serialization fails
        """
        raise NotImplementedError

class Deserializer:
    def __call__(self, value, ctx=None):
        """
        Deserialize data.
        
        Args:
            value (bytes): Data to deserialize
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            object: Deserialized object
            
        Raises:
            SerializationError: If deserialization fails
        """
        raise NotImplementedError

Built-in Serializers

Ready-to-use serializers for common data types.

StringSerializer

class StringSerializer(Serializer):
    def __init__(self, codec='utf_8'):
        """
        Create StringSerializer.
        
        Args:
            codec (str): Character encoding (default: 'utf_8')
        """

    def __call__(self, obj, ctx=None):
        """
        Serialize string to bytes.
        
        Args:
            obj (str): String to serialize
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            bytes: UTF-8 encoded string or None if obj is None
            
        Raises:
            SerializationError: If obj is not a string
        """

IntegerSerializer

class IntegerSerializer(Serializer):
    def __call__(self, obj, ctx=None):
        """
        Serialize integer to int32 bytes.
        
        Args:
            obj (int): Integer to serialize
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            bytes: 4-byte big-endian int32 or None if obj is None
            
        Raises:
            SerializationError: If obj is not an integer or out of int32 range
        """

DoubleSerializer

class DoubleSerializer(Serializer):
    def __call__(self, obj, ctx=None):
        """
        Serialize float to IEEE 754 binary64 bytes.
        
        Args:
            obj (float): Float to serialize
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            bytes: 8-byte IEEE 754 binary64 or None if obj is None
            
        Raises:
            SerializationError: If obj is not a float
        """

Built-in Deserializers

Ready-to-use deserializers for common data types.

StringDeserializer

class StringDeserializer(Deserializer):
    def __init__(self, codec='utf_8'):
        """
        Create StringDeserializer.
        
        Args:
            codec (str): Character encoding (default: 'utf_8')
        """

    def __call__(self, value, ctx=None):
        """
        Deserialize bytes to string.
        
        Args:
            value (bytes): Bytes to deserialize
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            str: Decoded string or None if value is None
            
        Raises:
            SerializationError: If decoding fails
        """

IntegerDeserializer

class IntegerDeserializer(Deserializer):
    def __call__(self, value, ctx=None):
        """
        Deserialize int32 bytes to integer.
        
        Args:
            value (bytes): 4-byte big-endian int32
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            int: Deserialized integer or None if value is None
            
        Raises:
            SerializationError: If value is not 4 bytes
        """

DoubleDeserializer

class DoubleDeserializer(Deserializer):
    def __call__(self, value, ctx=None):
        """
        Deserialize IEEE 754 binary64 bytes to float.
        
        Args:
            value (bytes): 8-byte IEEE 754 binary64
            ctx (SerializationContext, optional): Serialization context
            
        Returns:
            float: Deserialized float or None if value is None
            
        Raises:
            SerializationError: If value is not 8 bytes
        """

Serialization Context

Provides contextual information for serialization operations.

class SerializationContext:
    def __init__(self, topic, field, headers=None):
        """
        Create SerializationContext.
        
        Args:
            topic (str): Topic name
            field (MessageField): Message field being serialized
            headers (dict, optional): Message headers
        """

    @property
    def topic(self):
        """
        Topic name.
        
        Returns:
            str: Topic name
        """

    @property
    def field(self):
        """
        Message field being serialized.
        
        Returns:
            MessageField: NONE, KEY, or VALUE
        """

    @property
    def headers(self):
        """
        Message headers.
        
        Returns:
            dict: Message headers or None
        """

MessageField

class MessageField:
    NONE = 0
    KEY = 1
    VALUE = 2

High-Level Producer/Consumer APIs

SerializingProducer

High-level producer with pluggable serialization.

class SerializingProducer:
    def __init__(self, conf):
        """
        Create SerializingProducer.
        
        Args:
            conf (dict): Configuration including 'key.serializer' and 'value.serializer'
        """

    def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None):
        """
        Produce message with automatic serialization.
        
        Args:
            topic (str): Topic to produce to
            key: Key object (will be serialized using key.serializer)
            value: Value object (will be serialized using value.serializer)
            partition (int, optional): Specific partition (-1 for automatic)
            on_delivery (callable, optional): Delivery report callback
            timestamp (int, optional): Message timestamp (0 for current time)
            headers (dict, optional): Message headers
            
        Raises:
            SerializationError: If serialization fails
            BufferError: If local producer queue is full
            KafkaException: For other produce errors
        """

    def poll(self, timeout=-1):
        """
        Poll for events and call registered callbacks.
        
        Args:
            timeout (float): Maximum time to wait in seconds (-1 for infinite)
            
        Returns:
            int: Number of events processed
        """

    def flush(self, timeout=-1):
        """
        Wait for all messages to be delivered.
        
        Args:
            timeout (float): Maximum time to wait in seconds (-1 for infinite)
            
        Returns:
            int: Number of messages still in queue (0 on success)
        """

    def purge(self, in_queue=True, in_flight=True, blocking=True):
        """
        Purge messages from internal queues.
        
        Args:
            in_queue (bool): Purge messages in local queue
            in_flight (bool): Purge messages in flight to broker
            blocking (bool): Block until purge is complete
            
        Returns:
            int: Number of messages purged
        """

    def abort_transaction(self, timeout=-1):
        """Abort ongoing transaction."""

    def begin_transaction(self):
        """Begin a new transaction."""

    def commit_transaction(self, timeout=-1):
        """Commit current transaction."""

    def init_transactions(self, timeout=-1):
        """Initialize transactions for this producer."""

DeserializingConsumer

High-level consumer with pluggable deserialization.

class DeserializingConsumer:
    def __init__(self, conf):
        """
        Create DeserializingConsumer.
        
        Args:
            conf (dict): Configuration including 'key.deserializer' and 'value.deserializer'
        """

    def subscribe(self, topics, listener=None):
        """
        Subscribe to list of topics for automatic partition assignment.
        
        Args:
            topics (list): List of topic names to subscribe to
            listener (RebalanceCallback, optional): Rebalance callback
        """

    def poll(self, timeout=-1):
        """
        Poll for messages with automatic deserialization.
        
        Args:
            timeout (float): Maximum time to wait in seconds (-1 for infinite)
            
        Returns:
            Message: Message with deserialized key/value or None if timeout
            
        Note:
            If deserialization fails, the error is stored in the message
            and can be accessed via ConsumeError.
        """

    def consume(self, num_messages=1, timeout=-1):
        """
        Consume multiple messages (not implemented).
        
        Raises:
            NotImplementedError: This method is not supported
        """

    def assign(self, partitions):
        """
        Manually assign partitions to consume from.
        
        Args:
            partitions (list): List of TopicPartition objects
        """

    def assignment(self):
        """
        Get current partition assignment.
        
        Returns:
            list: List of assigned TopicPartition objects
        """

    def unassign(self):
        """Remove current partition assignment."""

    def commit(self, message=None, offsets=None, asynchronous=True):
        """
        Commit message offset or specified offsets.
        
        Args:
            message (Message, optional): Commit offset for this message
            offsets (list, optional): List of TopicPartition objects with offsets
            asynchronous (bool): Commit asynchronously if True
            
        Returns:
            list: Committed offsets if synchronous, None if asynchronous
        """

    def committed(self, partitions, timeout=-1):
        """
        Get committed offsets for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects
            timeout (float): Maximum time to wait in seconds
            
        Returns:
            list: List of TopicPartition objects with committed offsets
        """

    def position(self, partitions):
        """
        Get current position (next fetch offset) for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects
            
        Returns:
            list: List of TopicPartition objects with current positions
        """

    def seek(self, partition):
        """
        Seek to offset for partition.
        
        Args:
            partition (TopicPartition): Partition with offset to seek to
        """

    def pause(self, partitions):
        """
        Pause consumption for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects to pause
        """

    def resume(self, partitions):
        """
        Resume consumption for partitions.
        
        Args:
            partitions (list): List of TopicPartition objects to resume
        """

    def close(self):
        """Close the consumer and leave consumer group."""

    def store_offsets(self, message=None, offsets=None):
        """
        Store offset for message or specified offsets.
        
        Args:
            message (Message, optional): Store offset for this message
            offsets (list, optional): List of TopicPartition objects with offsets
        """

Error Classes

class SerializationError(Exception):
    """Base class for serialization errors."""
    
    def __init__(self, message, inner_exception=None):
        """
        Create SerializationError.
        
        Args:
            message (str): Error message
            inner_exception (Exception, optional): Underlying exception
        """

    @property
    def inner_exception(self):
        """Underlying exception that caused the serialization error."""

Usage Examples

Custom Serializer/Deserializer

from confluent_kafka.serialization import Serializer, Deserializer, SerializationError
import json

class JSONSerializer(Serializer):
    """Custom JSON serializer."""
    
    def __call__(self, obj, ctx=None):
        if obj is None:
            return None
        try:
            return json.dumps(obj).encode('utf-8')
        except Exception as e:
            raise SerializationError(f"Failed to serialize to JSON: {e}")

class JSONDeserializer(Deserializer):
    """Custom JSON deserializer."""
    
    def __call__(self, value, ctx=None):
        if value is None:
            return None
        try:
            return json.loads(value.decode('utf-8'))
        except Exception as e:
            raise SerializationError(f"Failed to deserialize JSON: {e}")

# Use custom serializers
from confluent_kafka import SerializingProducer, DeserializingConsumer

producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': JSONSerializer()
}

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.deserializer': StringDeserializer('utf_8'),
    'value.deserializer': JSONDeserializer(),
    'group.id': 'json-group',
    'auto.offset.reset': 'earliest'
}

producer = SerializingProducer(producer_conf)
consumer = DeserializingConsumer(consumer_conf)

Using Built-in Serializers

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import (
    StringSerializer, StringDeserializer,
    IntegerSerializer, IntegerDeserializer,
    DoubleSerializer, DoubleDeserializer
)

# Producer with different serializers for key and value
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': IntegerSerializer()
}

producer = SerializingProducer(producer_conf)

# Produce messages with automatic serialization
for i in range(10):
    producer.produce(
        'numbers-topic',
        key=f'key-{i}',  # String key
        value=i * 100    # Integer value
    )

producer.flush()

# Consumer with corresponding deserializers
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.deserializer': StringDeserializer('utf_8'),
    'value.deserializer': IntegerDeserializer(),
    'group.id': 'numbers-group',
    'auto.offset.reset': 'earliest'
}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['numbers-topic'])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue
        
        # Key and value are automatically deserialized
        print(f"Key: {msg.key()} (type: {type(msg.key())})")
        print(f"Value: {msg.value()} (type: {type(msg.value())})")

finally:
    consumer.close()

Serialization Context Usage

from confluent_kafka.serialization import SerializationContext, MessageField

class ContextAwareSerializer(Serializer):
    """Serializer that uses serialization context."""
    
    def __call__(self, obj, ctx=None):
        if obj is None:
            return None
            
        # Use context information
        if ctx is not None:
            print(f"Serializing for topic: {ctx.topic}")
            print(f"Field: {ctx.field}")
            if ctx.headers:
                print(f"Headers: {ctx.headers}")
        
        # Different serialization based on field
        if ctx and ctx.field == MessageField.KEY:
            # Keys serialized as uppercase strings
            return str(obj).upper().encode('utf-8')
        else:
            # Values serialized as JSON
            return json.dumps(obj).encode('utf-8')

# The SerializingProducer automatically creates SerializationContext
# and passes it to serializers
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': ContextAwareSerializer(),
    'value.serializer': ContextAwareSerializer()
}

producer = SerializingProducer(producer_conf)
producer.produce('my-topic', key='mykey', value={'data': 'value'})

Error Handling in Serialization

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import SerializationError
from confluent_kafka.error import ProduceError, KeySerializationError, ValueSerializationError

class StrictIntegerSerializer(Serializer):
    """Integer serializer that raises errors for non-integers."""
    
    def __call__(self, obj, ctx=None):
        if obj is None:
            return None
        if not isinstance(obj, int):
            raise SerializationError(f"Expected int, got {type(obj)}")
        return obj.to_bytes(4, 'big', signed=True)

producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': StrictIntegerSerializer()
}

producer = SerializingProducer(producer_conf)

def delivery_callback(err, msg):
    if err is not None:
        if isinstance(err, ValueSerializationError):
            print(f"Value serialization failed: {err}")
        elif isinstance(err, KeySerializationError):
            print(f"Key serialization failed: {err}")
        else:
            print(f"Other error: {err}")
    else:
        print(f"Message delivered: {msg.topic()} [{msg.partition()}]")

try:
    # This will succeed
    producer.produce('numbers', key='valid', value=42, callback=delivery_callback)
    
    # This will fail due to serialization error
    producer.produce('numbers', key='invalid', value='not-a-number', callback=delivery_callback)
    
except Exception as e:
    print(f"Immediate error: {e}")

producer.poll(0)  # Process delivery callbacks
producer.flush()

Advanced Configuration

from confluent_kafka import SerializingProducer, DeserializingConsumer

# Producer with serializer-specific configuration
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': StringSerializer('ascii'),  # Different encoding
    
    # Standard producer settings
    'acks': 'all',
    'retries': 3,
    'batch.size': 16384,
    'linger.ms': 5
}

# Consumer with error handling configuration
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.deserializer': StringDeserializer('utf_8'),
    'value.deserializer': StringDeserializer('utf_8'),
    
    # Standard consumer settings
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000
}

producer = SerializingProducer(producer_conf)
consumer = DeserializingConsumer(consumer_conf)

# Error handling in consumer
try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
            
        if msg.error():
            print(f"Message error: {msg.error()}")
            continue
            
        # Process message
        try:
            key = msg.key()
            value = msg.value()
            
            # Process deserialized data
            print(f"Processed: key={key}, value={value}")
            
            # Manual commit after successful processing
            consumer.commit(message=msg)
            
        except Exception as e:
            print(f"Processing error: {e}")
            # Could skip this message or handle error differently
            
finally:
    consumer.close()

Install with Tessl CLI

npx tessl i tessl/pypi-confluent-kafka

docs

admin-client.md

core-producer-consumer.md

error-handling.md

index.md

schema-registry.md

serialization.md

tile.json