Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
—
Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes. Provides the foundation for data transformation in Kafka producers and consumers.
Foundation classes for implementing custom serialization logic.
class Serializer:
"""
Abstract base class for key and value serializers.
Serializers convert Python objects to bytes for transmission to Kafka.
"""
def serialize(self, topic: str, value):
"""
Serialize a value to bytes.
Args:
topic (str): Topic name (can be used for topic-specific serialization)
value: Python object to serialize
Returns:
bytes: Serialized bytes, or None if value is None
Raises:
SerializationError: If serialization fails
"""
def close(self):
"""
Close the serializer and clean up resources.
Called when the producer/consumer is closed.
"""
class Deserializer:
"""
Abstract base class for key and value deserializers.
Deserializers convert bytes received from Kafka back to Python objects.
"""
def deserialize(self, topic: str, bytes_: bytes):
"""
Deserialize bytes to a Python object.
Args:
topic (str): Topic name (can be used for topic-specific deserialization)
bytes_ (bytes): Bytes to deserialize
Returns:
object: Deserialized Python object, or None if bytes_ is None
Raises:
SerializationError: If deserialization fails
"""
def close(self):
"""
Close the deserializer and clean up resources.
Called when the consumer is closed.
"""Exception class for serialization-related errors.
class SerializationError(KafkaError):
"""Error occurred during serialization or deserialization."""from kafka.serializer import Serializer, Deserializer
class StringSerializer(Serializer):
def __init__(self, encoding='utf-8'):
self.encoding = encoding
def serialize(self, topic, value):
if value is None:
return None
if isinstance(value, str):
return value.encode(self.encoding)
elif isinstance(value, bytes):
return value
else:
return str(value).encode(self.encoding)
def close(self):
pass
class StringDeserializer(Deserializer):
def __init__(self, encoding='utf-8'):
self.encoding = encoding
def deserialize(self, topic, bytes_):
if bytes_ is None:
return None
return bytes_.decode(self.encoding)
def close(self):
pass
# Usage with producer/consumer
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=StringSerializer(),
value_serializer=StringSerializer()
)
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
key_deserializer=StringDeserializer(),
value_deserializer=StringDeserializer()
)import json
from kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError
class JSONSerializer(Serializer):
def __init__(self, encoding='utf-8'):
self.encoding = encoding
def serialize(self, topic, value):
if value is None:
return None
try:
return json.dumps(value).encode(self.encoding)
except (TypeError, ValueError) as e:
raise SerializationError(f"JSON serialization failed: {e}")
def close(self):
pass
class JSONDeserializer(Deserializer):
def __init__(self, encoding='utf-8'):
self.encoding = encoding
def deserialize(self, topic, bytes_):
if bytes_ is None:
return None
try:
return json.loads(bytes_.decode(self.encoding))
except (ValueError, UnicodeDecodeError) as e:
raise SerializationError(f"JSON deserialization failed: {e}")
def close(self):
pass
# Usage
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=JSONSerializer()
)
# Send Python objects as JSON
producer.send('events', {
'user_id': 123,
'action': 'login',
'timestamp': '2024-01-01T12:00:00Z'
})
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
value_deserializer=JSONDeserializer()
)
for message in consumer:
event = message.value # Already deserialized to Python dict
print(f"User {event['user_id']} performed {event['action']}")import avro.schema
import avro.io
import io
from kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError
class AvroSerializer(Serializer):
def __init__(self, schema_str):
self.schema = avro.schema.parse(schema_str)
def serialize(self, topic, value):
if value is None:
return None
try:
writer = avro.io.DatumWriter(self.schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(value, encoder)
return bytes_writer.getvalue()
except Exception as e:
raise SerializationError(f"Avro serialization failed: {e}")
def close(self):
pass
class AvroDeserializer(Deserializer):
def __init__(self, schema_str):
self.schema = avro.schema.parse(schema_str)
def deserialize(self, topic, bytes_):
if bytes_ is None:
return None
try:
reader = avro.io.DatumReader(self.schema)
bytes_reader = io.BytesIO(bytes_)
decoder = avro.io.BinaryDecoder(bytes_reader)
return reader.read(decoder)
except Exception as e:
raise SerializationError(f"Avro deserialization failed: {e}")
def close(self):
pass
# Schema definition
schema_str = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
"""
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=AvroSerializer(schema_str)
)
# Send Avro record
producer.send('users', {
'id': 123,
'name': 'Alice',
'email': 'alice@example.com'
})from kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError
# Assuming you have generated protobuf classes
class ProtobufSerializer(Serializer):
def __init__(self, protobuf_class):
self.protobuf_class = protobuf_class
def serialize(self, topic, value):
if value is None:
return None
try:
if isinstance(value, self.protobuf_class):
return value.SerializeToString()
else:
# Convert dict to protobuf object
pb_obj = self.protobuf_class()
for field, val in value.items():
setattr(pb_obj, field, val)
return pb_obj.SerializeToString()
except Exception as e:
raise SerializationError(f"Protobuf serialization failed: {e}")
def close(self):
pass
class ProtobufDeserializer(Deserializer):
def __init__(self, protobuf_class):
self.protobuf_class = protobuf_class
def deserialize(self, topic, bytes_):
if bytes_ is None:
return None
try:
pb_obj = self.protobuf_class()
pb_obj.ParseFromString(bytes_)
return pb_obj
except Exception as e:
raise SerializationError(f"Protobuf deserialization failed: {e}")
def close(self):
passfrom kafka.serializer import Serializer, Deserializer
import json
class TopicAwareJSONSerializer(Serializer):
def __init__(self):
self.topic_schemas = {
'user-events': ['user_id', 'action', 'timestamp'],
'order-events': ['order_id', 'status', 'amount'],
}
def serialize(self, topic, value):
if value is None:
return None
# Validate required fields for topic
if topic in self.topic_schemas:
required_fields = self.topic_schemas[topic]
for field in required_fields:
if field not in value:
raise SerializationError(f"Missing required field '{field}' for topic '{topic}'")
return json.dumps(value).encode('utf-8')
def close(self):
pass
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=TopicAwareJSONSerializer()
)
# This will validate that required fields are present
producer.send('user-events', {
'user_id': 123,
'action': 'login',
'timestamp': '2024-01-01T12:00:00Z'
})import gzip
import json
from kafka.serializer import Serializer, Deserializer
class CompressedJSONSerializer(Serializer):
def __init__(self, compression_threshold=1024):
self.compression_threshold = compression_threshold
def serialize(self, topic, value):
if value is None:
return None
json_bytes = json.dumps(value).encode('utf-8')
# Compress if data is large enough
if len(json_bytes) > self.compression_threshold:
# Add compression marker
return b'\x01' + gzip.compress(json_bytes)
else:
# No compression marker
return b'\x00' + json_bytes
def close(self):
pass
class CompressedJSONDeserializer(Deserializer):
def deserialize(self, topic, bytes_):
if bytes_ is None or len(bytes_) == 0:
return None
# Check compression marker
if bytes_[0] == 1: # Compressed
json_bytes = gzip.decompress(bytes_[1:])
else: # Uncompressed
json_bytes = bytes_[1:]
return json.loads(json_bytes.decode('utf-8'))
def close(self):
passfrom kafka.serializer import Serializer, Deserializer
from kafka.errors import SerializationError
import json
import logging
logger = logging.getLogger(__name__)
class RobustJSONSerializer(Serializer):
def __init__(self, encoding='utf-8', strict=True):
self.encoding = encoding
self.strict = strict
def serialize(self, topic, value):
if value is None:
return None
try:
return json.dumps(value, ensure_ascii=False).encode(self.encoding)
except (TypeError, ValueError) as e:
if self.strict:
raise SerializationError(f"JSON serialization failed for topic '{topic}': {e}")
else:
# Fallback: serialize as string
logger.warning(f"JSON serialization failed for topic '{topic}', falling back to string: {e}")
return str(value).encode(self.encoding)
def close(self):
pass
class RobustJSONDeserializer(Deserializer):
def __init__(self, encoding='utf-8', strict=True):
self.encoding = encoding
self.strict = strict
def deserialize(self, topic, bytes_):
if bytes_ is None:
return None
try:
return json.loads(bytes_.decode(self.encoding))
except (ValueError, UnicodeDecodeError) as e:
if self.strict:
raise SerializationError(f"JSON deserialization failed for topic '{topic}': {e}")
else:
# Fallback: return raw string
logger.warning(f"JSON deserialization failed for topic '{topic}', returning raw string: {e}")
return bytes_.decode(self.encoding, errors='replace')
def close(self):
passfrom kafka import KafkaProducer, KafkaConsumer
import json
import pickle
# Simple lambda serializers
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: str(k).encode('utf-8') if k is not None else None,
value_serializer=lambda v: json.dumps(v).encode('utf-8') if v is not None else None
)
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
key_deserializer=lambda k: k.decode('utf-8') if k is not None else None,
value_deserializer=lambda v: json.loads(v.decode('utf-8')) if v is not None else None
)
# Pickle serialization for Python objects (use with caution)
pickle_producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: pickle.dumps(v) if v is not None else None
)
pickle_consumer = KafkaConsumer(
'pickle-topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: pickle.loads(v) if v is not None else None
)Install with Tessl CLI
npx tessl i tessl/pypi-kafka-python-ng