CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kafka-python-ng

Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities

Pending
Overview
Eval results
Files

serialization.mddocs/

Serialization

Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes. Provides the foundation for data transformation in Kafka producers and consumers.

Capabilities

Abstract Base Classes

Foundation classes for implementing custom serialization logic.

class Serializer:
    """
    Abstract base class for key and value serializers.
    
    Serializers convert Python objects to bytes for transmission to Kafka.
    """
    
    def serialize(self, topic: str, value):
        """
        Serialize a value to bytes.
        
        Args:
            topic (str): Topic name (can be used for topic-specific serialization)
            value: Python object to serialize
            
        Returns:
            bytes: Serialized bytes, or None if value is None
            
        Raises:
            SerializationError: If serialization fails
        """
        
    def close(self):
        """
        Close the serializer and clean up resources.
        
        Called when the producer/consumer is closed.
        """

class Deserializer:
    """
    Abstract base class for key and value deserializers.
    
    Deserializers convert bytes received from Kafka back to Python objects.
    """
    
    def deserialize(self, topic: str, bytes_: bytes):
        """
        Deserialize bytes to a Python object.
        
        Args:
            topic (str): Topic name (can be used for topic-specific deserialization)
            bytes_ (bytes): Bytes to deserialize
            
        Returns:
            object: Deserialized Python object, or None if bytes_ is None
            
        Raises:
            SerializationError: If deserialization fails
        """
        
    def close(self):
        """
        Close the deserializer and clean up resources.
        
        Called when the consumer is closed.
        """

Serialization Errors

Exception class for serialization-related errors.

class SerializationError(KafkaError):
    """Error occurred during serialization or deserialization."""

Usage Examples

String Serialization

from kafka.serializer import Serializer, Deserializer

class StringSerializer(Serializer):
    def __init__(self, encoding='utf-8'):
        self.encoding = encoding
        
    def serialize(self, topic, value):
        if value is None:
            return None
        if isinstance(value, str):
            return value.encode(self.encoding)
        elif isinstance(value, bytes):
            return value
        else:
            return str(value).encode(self.encoding)
            
    def close(self):
        pass

class StringDeserializer(Deserializer):
    def __init__(self, encoding='utf-8'):
        self.encoding = encoding
        
    def deserialize(self, topic, bytes_):
        if bytes_ is None:
            return None
        return bytes_.decode(self.encoding)
        
    def close(self):
        pass

# Usage with producer/consumer
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=StringSerializer(),
    value_serializer=StringSerializer()
)

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    key_deserializer=StringDeserializer(),
    value_deserializer=StringDeserializer()
)

JSON Serialization

import json
from kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError

class JSONSerializer(Serializer):
    def __init__(self, encoding='utf-8'):
        self.encoding = encoding
        
    def serialize(self, topic, value):
        if value is None:
            return None
        try:
            return json.dumps(value).encode(self.encoding)
        except (TypeError, ValueError) as e:
            raise SerializationError(f"JSON serialization failed: {e}")
            
    def close(self):
        pass

class JSONDeserializer(Deserializer):
    def __init__(self, encoding='utf-8'):
        self.encoding = encoding
        
    def deserialize(self, topic, bytes_):
        if bytes_ is None:
            return None
        try:
            return json.loads(bytes_.decode(self.encoding))
        except (ValueError, UnicodeDecodeError) as e:
            raise SerializationError(f"JSON deserialization failed: {e}")
            
    def close(self):
        pass

# Usage
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=JSONSerializer()
)

# Send Python objects as JSON
producer.send('events', {
    'user_id': 123,
    'action': 'login',
    'timestamp': '2024-01-01T12:00:00Z'
})

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=JSONDeserializer()
)

for message in consumer:
    event = message.value  # Already deserialized to Python dict
    print(f"User {event['user_id']} performed {event['action']}")

Avro Serialization

import avro.schema
import avro.io
import io
from kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError

class AvroSerializer(Serializer):
    def __init__(self, schema_str):
        self.schema = avro.schema.parse(schema_str)
        
    def serialize(self, topic, value):
        if value is None:
            return None
        try:
            writer = avro.io.DatumWriter(self.schema)
            bytes_writer = io.BytesIO()
            encoder = avro.io.BinaryEncoder(bytes_writer)
            writer.write(value, encoder)
            return bytes_writer.getvalue()
        except Exception as e:
            raise SerializationError(f"Avro serialization failed: {e}")
            
    def close(self):
        pass

class AvroDeserializer(Deserializer):
    def __init__(self, schema_str):
        self.schema = avro.schema.parse(schema_str)
        
    def deserialize(self, topic, bytes_):
        if bytes_ is None:
            return None
        try:
            reader = avro.io.DatumReader(self.schema)
            bytes_reader = io.BytesIO(bytes_)
            decoder = avro.io.BinaryDecoder(bytes_reader)
            return reader.read(decoder)
        except Exception as e:
            raise SerializationError(f"Avro deserialization failed: {e}")
            
    def close(self):
        pass

# Schema definition
schema_str = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}
"""

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=AvroSerializer(schema_str)
)

# Send Avro record
producer.send('users', {
    'id': 123,
    'name': 'Alice',
    'email': 'alice@example.com'
})

Protobuf Serialization

from kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError
# Assuming you have generated protobuf classes

class ProtobufSerializer(Serializer):
    def __init__(self, protobuf_class):
        self.protobuf_class = protobuf_class
        
    def serialize(self, topic, value):
        if value is None:
            return None
        try:
            if isinstance(value, self.protobuf_class):
                return value.SerializeToString()
            else:
                # Convert dict to protobuf object
                pb_obj = self.protobuf_class()
                for field, val in value.items():
                    setattr(pb_obj, field, val)
                return pb_obj.SerializeToString()
        except Exception as e:
            raise SerializationError(f"Protobuf serialization failed: {e}")
            
    def close(self):
        pass

class ProtobufDeserializer(Deserializer):
    def __init__(self, protobuf_class):
        self.protobuf_class = protobuf_class
        
    def deserialize(self, topic, bytes_):
        if bytes_ is None:
            return None
        try:
            pb_obj = self.protobuf_class()
            pb_obj.ParseFromString(bytes_)
            return pb_obj
        except Exception as e:
            raise SerializationError(f"Protobuf deserialization failed: {e}")
            
    def close(self):
        pass

Topic-Specific Serialization

from kafka.serializer import Serializer, Deserializer
import json

class TopicAwareJSONSerializer(Serializer):
    def __init__(self):
        self.topic_schemas = {
            'user-events': ['user_id', 'action', 'timestamp'],
            'order-events': ['order_id', 'status', 'amount'],
        }
        
    def serialize(self, topic, value):
        if value is None:
            return None
            
        # Validate required fields for topic
        if topic in self.topic_schemas:
            required_fields = self.topic_schemas[topic]
            for field in required_fields:
                if field not in value:
                    raise SerializationError(f"Missing required field '{field}' for topic '{topic}'")
        
        return json.dumps(value).encode('utf-8')
        
    def close(self):
        pass

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=TopicAwareJSONSerializer()
)

# This will validate that required fields are present
producer.send('user-events', {
    'user_id': 123,
    'action': 'login',
    'timestamp': '2024-01-01T12:00:00Z'
})

Compression-Aware Serialization

import gzip
import json
from kafka.serializer import Serializer, Deserializer

class CompressedJSONSerializer(Serializer):
    def __init__(self, compression_threshold=1024):
        self.compression_threshold = compression_threshold
        
    def serialize(self, topic, value):
        if value is None:
            return None
            
        json_bytes = json.dumps(value).encode('utf-8')
        
        # Compress if data is large enough
        if len(json_bytes) > self.compression_threshold:
            # Add compression marker
            return b'\x01' + gzip.compress(json_bytes)
        else:
            # No compression marker
            return b'\x00' + json_bytes
            
    def close(self):
        pass

class CompressedJSONDeserializer(Deserializer):
    def deserialize(self, topic, bytes_):
        if bytes_ is None or len(bytes_) == 0:
            return None
            
        # Check compression marker
        if bytes_[0] == 1:  # Compressed
            json_bytes = gzip.decompress(bytes_[1:])
        else:  # Uncompressed
            json_bytes = bytes_[1:]
            
        return json.loads(json_bytes.decode('utf-8'))
        
    def close(self):
        pass

Error Handling in Serializers

from kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError
import json
import logging

logger = logging.getLogger(__name__)

class RobustJSONSerializer(Serializer):
    def __init__(self, encoding='utf-8', strict=True):
        self.encoding = encoding
        self.strict = strict
        
    def serialize(self, topic, value):
        if value is None:
            return None
            
        try:
            return json.dumps(value, ensure_ascii=False).encode(self.encoding)
        except (TypeError, ValueError) as e:
            if self.strict:
                raise SerializationError(f"JSON serialization failed for topic '{topic}': {e}")
            else:
                # Fallback: serialize as string
                logger.warning(f"JSON serialization failed for topic '{topic}', falling back to string: {e}")
                return str(value).encode(self.encoding)
                
    def close(self):
        pass

class RobustJSONDeserializer(Deserializer):
    def __init__(self, encoding='utf-8', strict=True):
        self.encoding = encoding
        self.strict = strict
        
    def deserialize(self, topic, bytes_):
        if bytes_ is None:
            return None
            
        try:
            return json.loads(bytes_.decode(self.encoding))
        except (ValueError, UnicodeDecodeError) as e:
            if self.strict:
                raise SerializationError(f"JSON deserialization failed for topic '{topic}': {e}")
            else:
                # Fallback: return raw string
                logger.warning(f"JSON deserialization failed for topic '{topic}', returning raw string: {e}")
                return bytes_.decode(self.encoding, errors='replace')
                
    def close(self):
        pass

Lambda Function Serializers

from kafka import KafkaProducer, KafkaConsumer
import json
import pickle

# Simple lambda serializers
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda k: str(k).encode('utf-8') if k is not None else None,
    value_serializer=lambda v: json.dumps(v).encode('utf-8') if v is not None else None
)

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    key_deserializer=lambda k: k.decode('utf-8') if k is not None else None,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')) if v is not None else None
)

# Pickle serialization for Python objects (use with caution)
pickle_producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: pickle.dumps(v) if v is not None else None
)

pickle_consumer = KafkaConsumer(
    'pickle-topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda v: pickle.loads(v) if v is not None else None
)

Install with Tessl CLI

npx tessl i tessl/pypi-kafka-python-ng

docs

admin.md

connection.md

consumer.md

errors.md

index.md

producer.md

serialization.md

structs.md

tile.json