Confluent's Python client for Apache Kafka
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive serialization framework providing pluggable serialization/deserialization with built-in support for common data types and high-level producer/consumer APIs with automatic serialization.
Abstract base classes for implementing custom serialization logic.
class Serializer:
def __call__(self, obj, ctx=None):
"""
Serialize an object.
Args:
obj: Object to serialize
ctx (SerializationContext, optional): Serialization context
Returns:
bytes: Serialized data
Raises:
SerializationError: If serialization fails
"""
raise NotImplementedError
class Deserializer:
def __call__(self, value, ctx=None):
"""
Deserialize data.
Args:
value (bytes): Data to deserialize
ctx (SerializationContext, optional): Serialization context
Returns:
object: Deserialized object
Raises:
SerializationError: If deserialization fails
"""
raise NotImplementedErrorReady-to-use serializers for common data types.
class StringSerializer(Serializer):
def __init__(self, codec='utf_8'):
"""
Create StringSerializer.
Args:
codec (str): Character encoding (default: 'utf_8')
"""
def __call__(self, obj, ctx=None):
"""
Serialize string to bytes.
Args:
obj (str): String to serialize
ctx (SerializationContext, optional): Serialization context
Returns:
bytes: UTF-8 encoded string or None if obj is None
Raises:
SerializationError: If obj is not a string
"""class IntegerSerializer(Serializer):
def __call__(self, obj, ctx=None):
"""
Serialize integer to int32 bytes.
Args:
obj (int): Integer to serialize
ctx (SerializationContext, optional): Serialization context
Returns:
bytes: 4-byte big-endian int32 or None if obj is None
Raises:
SerializationError: If obj is not an integer or out of int32 range
"""class DoubleSerializer(Serializer):
def __call__(self, obj, ctx=None):
"""
Serialize float to IEEE 754 binary64 bytes.
Args:
obj (float): Float to serialize
ctx (SerializationContext, optional): Serialization context
Returns:
bytes: 8-byte IEEE 754 binary64 or None if obj is None
Raises:
SerializationError: If obj is not a float
"""Ready-to-use deserializers for common data types.
class StringDeserializer(Deserializer):
def __init__(self, codec='utf_8'):
"""
Create StringDeserializer.
Args:
codec (str): Character encoding (default: 'utf_8')
"""
def __call__(self, value, ctx=None):
"""
Deserialize bytes to string.
Args:
value (bytes): Bytes to deserialize
ctx (SerializationContext, optional): Serialization context
Returns:
str: Decoded string or None if value is None
Raises:
SerializationError: If decoding fails
"""class IntegerDeserializer(Deserializer):
def __call__(self, value, ctx=None):
"""
Deserialize int32 bytes to integer.
Args:
value (bytes): 4-byte big-endian int32
ctx (SerializationContext, optional): Serialization context
Returns:
int: Deserialized integer or None if value is None
Raises:
SerializationError: If value is not 4 bytes
"""class DoubleDeserializer(Deserializer):
def __call__(self, value, ctx=None):
"""
Deserialize IEEE 754 binary64 bytes to float.
Args:
value (bytes): 8-byte IEEE 754 binary64
ctx (SerializationContext, optional): Serialization context
Returns:
float: Deserialized float or None if value is None
Raises:
SerializationError: If value is not 8 bytes
"""Provides contextual information for serialization operations.
class SerializationContext:
def __init__(self, topic, field, headers=None):
"""
Create SerializationContext.
Args:
topic (str): Topic name
field (MessageField): Message field being serialized
headers (dict, optional): Message headers
"""
@property
def topic(self):
"""
Topic name.
Returns:
str: Topic name
"""
@property
def field(self):
"""
Message field being serialized.
Returns:
MessageField: NONE, KEY, or VALUE
"""
@property
def headers(self):
"""
Message headers.
Returns:
dict: Message headers or None
"""class MessageField:
NONE = 0
KEY = 1
VALUE = 2High-level producer with pluggable serialization.
class SerializingProducer:
def __init__(self, conf):
"""
Create SerializingProducer.
Args:
conf (dict): Configuration including 'key.serializer' and 'value.serializer'
"""
def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None):
"""
Produce message with automatic serialization.
Args:
topic (str): Topic to produce to
key: Key object (will be serialized using key.serializer)
value: Value object (will be serialized using value.serializer)
partition (int, optional): Specific partition (-1 for automatic)
on_delivery (callable, optional): Delivery report callback
timestamp (int, optional): Message timestamp (0 for current time)
headers (dict, optional): Message headers
Raises:
SerializationError: If serialization fails
BufferError: If local producer queue is full
KafkaException: For other produce errors
"""
def poll(self, timeout=-1):
"""
Poll for events and call registered callbacks.
Args:
timeout (float): Maximum time to wait in seconds (-1 for infinite)
Returns:
int: Number of events processed
"""
def flush(self, timeout=-1):
"""
Wait for all messages to be delivered.
Args:
timeout (float): Maximum time to wait in seconds (-1 for infinite)
Returns:
int: Number of messages still in queue (0 on success)
"""
def purge(self, in_queue=True, in_flight=True, blocking=True):
"""
Purge messages from internal queues.
Args:
in_queue (bool): Purge messages in local queue
in_flight (bool): Purge messages in flight to broker
blocking (bool): Block until purge is complete
Returns:
int: Number of messages purged
"""
def abort_transaction(self, timeout=-1):
"""Abort ongoing transaction."""
def begin_transaction(self):
"""Begin a new transaction."""
def commit_transaction(self, timeout=-1):
"""Commit current transaction."""
def init_transactions(self, timeout=-1):
"""Initialize transactions for this producer."""High-level consumer with pluggable deserialization.
class DeserializingConsumer:
def __init__(self, conf):
"""
Create DeserializingConsumer.
Args:
conf (dict): Configuration including 'key.deserializer' and 'value.deserializer'
"""
def subscribe(self, topics, listener=None):
"""
Subscribe to list of topics for automatic partition assignment.
Args:
topics (list): List of topic names to subscribe to
listener (RebalanceCallback, optional): Rebalance callback
"""
def poll(self, timeout=-1):
"""
Poll for messages with automatic deserialization.
Args:
timeout (float): Maximum time to wait in seconds (-1 for infinite)
Returns:
Message: Message with deserialized key/value or None if timeout
Note:
If deserialization fails, the error is stored in the message
and can be accessed via ConsumeError.
"""
def consume(self, num_messages=1, timeout=-1):
"""
Consume multiple messages (not implemented).
Raises:
NotImplementedError: This method is not supported
"""
def assign(self, partitions):
"""
Manually assign partitions to consume from.
Args:
partitions (list): List of TopicPartition objects
"""
def assignment(self):
"""
Get current partition assignment.
Returns:
list: List of assigned TopicPartition objects
"""
def unassign(self):
"""Remove current partition assignment."""
def commit(self, message=None, offsets=None, asynchronous=True):
"""
Commit message offset or specified offsets.
Args:
message (Message, optional): Commit offset for this message
offsets (list, optional): List of TopicPartition objects with offsets
asynchronous (bool): Commit asynchronously if True
Returns:
list: Committed offsets if synchronous, None if asynchronous
"""
def committed(self, partitions, timeout=-1):
"""
Get committed offsets for partitions.
Args:
partitions (list): List of TopicPartition objects
timeout (float): Maximum time to wait in seconds
Returns:
list: List of TopicPartition objects with committed offsets
"""
def position(self, partitions):
"""
Get current position (next fetch offset) for partitions.
Args:
partitions (list): List of TopicPartition objects
Returns:
list: List of TopicPartition objects with current positions
"""
def seek(self, partition):
"""
Seek to offset for partition.
Args:
partition (TopicPartition): Partition with offset to seek to
"""
def pause(self, partitions):
"""
Pause consumption for partitions.
Args:
partitions (list): List of TopicPartition objects to pause
"""
def resume(self, partitions):
"""
Resume consumption for partitions.
Args:
partitions (list): List of TopicPartition objects to resume
"""
def close(self):
"""Close the consumer and leave consumer group."""
def store_offsets(self, message=None, offsets=None):
"""
Store offset for message or specified offsets.
Args:
message (Message, optional): Store offset for this message
offsets (list, optional): List of TopicPartition objects with offsets
"""class SerializationError(Exception):
"""Base class for serialization errors."""
def __init__(self, message, inner_exception=None):
"""
Create SerializationError.
Args:
message (str): Error message
inner_exception (Exception, optional): Underlying exception
"""
@property
def inner_exception(self):
"""Underlying exception that caused the serialization error."""from confluent_kafka.serialization import Serializer, Deserializer, SerializationError
import json
class JSONSerializer(Serializer):
"""Custom JSON serializer."""
def __call__(self, obj, ctx=None):
if obj is None:
return None
try:
return json.dumps(obj).encode('utf-8')
except Exception as e:
raise SerializationError(f"Failed to serialize to JSON: {e}")
class JSONDeserializer(Deserializer):
"""Custom JSON deserializer."""
def __call__(self, value, ctx=None):
if value is None:
return None
try:
return json.loads(value.decode('utf-8'))
except Exception as e:
raise SerializationError(f"Failed to deserialize JSON: {e}")
# Use custom serializers
from confluent_kafka import SerializingProducer, DeserializingConsumer
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': JSONSerializer()
}
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.deserializer': StringDeserializer('utf_8'),
'value.deserializer': JSONDeserializer(),
'group.id': 'json-group',
'auto.offset.reset': 'earliest'
}
producer = SerializingProducer(producer_conf)
consumer = DeserializingConsumer(consumer_conf)from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import (
StringSerializer, StringDeserializer,
IntegerSerializer, IntegerDeserializer,
DoubleSerializer, DoubleDeserializer
)
# Producer with different serializers for key and value
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': IntegerSerializer()
}
producer = SerializingProducer(producer_conf)
# Produce messages with automatic serialization
for i in range(10):
producer.produce(
'numbers-topic',
key=f'key-{i}', # String key
value=i * 100 # Integer value
)
producer.flush()
# Consumer with corresponding deserializers
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.deserializer': StringDeserializer('utf_8'),
'value.deserializer': IntegerDeserializer(),
'group.id': 'numbers-group',
'auto.offset.reset': 'earliest'
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['numbers-topic'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
# Key and value are automatically deserialized
print(f"Key: {msg.key()} (type: {type(msg.key())})")
print(f"Value: {msg.value()} (type: {type(msg.value())})")
finally:
consumer.close()from confluent_kafka.serialization import SerializationContext, MessageField
class ContextAwareSerializer(Serializer):
"""Serializer that uses serialization context."""
def __call__(self, obj, ctx=None):
if obj is None:
return None
# Use context information
if ctx is not None:
print(f"Serializing for topic: {ctx.topic}")
print(f"Field: {ctx.field}")
if ctx.headers:
print(f"Headers: {ctx.headers}")
# Different serialization based on field
if ctx and ctx.field == MessageField.KEY:
# Keys serialized as uppercase strings
return str(obj).upper().encode('utf-8')
else:
# Values serialized as JSON
return json.dumps(obj).encode('utf-8')
# The SerializingProducer automatically creates SerializationContext
# and passes it to serializers
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': ContextAwareSerializer(),
'value.serializer': ContextAwareSerializer()
}
producer = SerializingProducer(producer_conf)
producer.produce('my-topic', key='mykey', value={'data': 'value'})from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import SerializationError
from confluent_kafka.error import ProduceError, KeySerializationError, ValueSerializationError
class StrictIntegerSerializer(Serializer):
"""Integer serializer that raises errors for non-integers."""
def __call__(self, obj, ctx=None):
if obj is None:
return None
if not isinstance(obj, int):
raise SerializationError(f"Expected int, got {type(obj)}")
return obj.to_bytes(4, 'big', signed=True)
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': StrictIntegerSerializer()
}
producer = SerializingProducer(producer_conf)
def delivery_callback(err, msg):
if err is not None:
if isinstance(err, ValueSerializationError):
print(f"Value serialization failed: {err}")
elif isinstance(err, KeySerializationError):
print(f"Key serialization failed: {err}")
else:
print(f"Other error: {err}")
else:
print(f"Message delivered: {msg.topic()} [{msg.partition()}]")
try:
# This will succeed
producer.produce('numbers', key='valid', value=42, callback=delivery_callback)
# This will fail due to serialization error
producer.produce('numbers', key='invalid', value='not-a-number', callback=delivery_callback)
except Exception as e:
print(f"Immediate error: {e}")
producer.poll(0) # Process delivery callbacks
producer.flush()from confluent_kafka import SerializingProducer, DeserializingConsumer
# Producer with serializer-specific configuration
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': StringSerializer('ascii'), # Different encoding
# Standard producer settings
'acks': 'all',
'retries': 3,
'batch.size': 16384,
'linger.ms': 5
}
# Consumer with error handling configuration
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.deserializer': StringDeserializer('utf_8'),
'value.deserializer': StringDeserializer('utf_8'),
# Standard consumer settings
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000
}
producer = SerializingProducer(producer_conf)
consumer = DeserializingConsumer(consumer_conf)
# Error handling in consumer
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Message error: {msg.error()}")
continue
# Process message
try:
key = msg.key()
value = msg.value()
# Process deserialized data
print(f"Processed: key={key}, value={value}")
# Manual commit after successful processing
consumer.commit(message=msg)
except Exception as e:
print(f"Processing error: {e}")
# Could skip this message or handle error differently
finally:
consumer.close()Install with Tessl CLI
npx tessl i tessl/pypi-confluent-kafka