Confluent's Python client for Apache Kafka
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete integration with Confluent Schema Registry supporting schema evolution, compatibility checking, and automatic serialization/deserialization for Avro, JSON Schema, and Protobuf formats.
Synchronous client for interacting with Confluent Schema Registry.
class SchemaRegistryClient:
def __init__(self, conf):
"""
Create SchemaRegistryClient instance.
Args:
conf (dict): Configuration properties including 'url' and optional auth settings
"""
def register_schema(self, subject_name, schema, normalize_schemas=False):
"""
Register a schema for the specified subject.
Args:
subject_name (str): Subject name for the schema
schema (Schema): Schema object to register
normalize_schemas (bool): Whether to normalize the schema
Returns:
int: Schema ID assigned by registry
Raises:
SchemaRegistryError: If registration fails
"""
def get_latest_version(self, subject_name):
"""
Get the latest schema version for a subject.
Args:
subject_name (str): Subject name
Returns:
RegisteredSchema: Latest registered schema
Raises:
SchemaRegistryError: If subject not found
"""
def get_version(self, subject_name, version):
"""
Get a specific schema version for a subject.
Args:
subject_name (str): Subject name
version (int): Schema version number
Returns:
RegisteredSchema: Registered schema at specified version
Raises:
SchemaRegistryError: If schema version not found
"""
def get_schema(self, schema_id, fetch_max_id=True):
"""
Get schema by ID.
Args:
schema_id (int): Schema ID
fetch_max_id (bool): Whether to fetch maximum schema ID
Returns:
Schema: Schema object
Raises:
SchemaRegistryError: If schema not found
"""
def get_subjects(self):
"""
Get list of all subjects.
Returns:
list: List of subject names
Raises:
SchemaRegistryError: If request fails
"""
def delete_subject(self, subject_name, permanent=False):
"""
Delete a subject.
Args:
subject_name (str): Subject name to delete
permanent (bool): Whether to permanently delete
Returns:
list: List of deleted version numbers
Raises:
SchemaRegistryError: If deletion fails
"""
def delete_version(self, subject_name, version, permanent=False):
"""
Delete a specific version of a subject.
Args:
subject_name (str): Subject name
version (int): Version number to delete
permanent (bool): Whether to permanently delete
Returns:
int: Deleted version number
Raises:
SchemaRegistryError: If deletion fails
"""
def get_compatibility(self, subject_name=None):
"""
Get compatibility level for subject or global default.
Args:
subject_name (str, optional): Subject name (None for global)
Returns:
str: Compatibility level
Raises:
SchemaRegistryError: If request fails
"""
def set_compatibility(self, subject_name=None, level=None):
"""
Set compatibility level for subject or global default.
Args:
subject_name (str, optional): Subject name (None for global)
level (str): Compatibility level to set
Returns:
str: Updated compatibility level
Raises:
SchemaRegistryError: If update fails
"""
def test_compatibility(self, subject_name, schema, version='latest'):
"""
Test schema compatibility with subject.
Args:
subject_name (str): Subject name
schema (Schema): Schema to test
version (str|int): Version to test against
Returns:
bool: True if compatible, False otherwise
Raises:
SchemaRegistryError: If test fails
"""Asynchronous client for Schema Registry operations.
class AsyncSchemaRegistryClient:
def __init__(self, conf):
"""
Create AsyncSchemaRegistryClient instance.
Args:
conf (dict): Configuration properties
"""
async def register_schema(self, subject_name, schema, normalize_schemas=False):
"""
Async version of register_schema.
Returns:
int: Schema ID
"""
async def get_latest_version(self, subject_name):
"""
Async version of get_latest_version.
Returns:
RegisteredSchema: Latest registered schema
"""
async def get_schema(self, schema_id, fetch_max_id=True):
"""
Async version of get_schema.
Returns:
Schema: Schema object
"""
async def close(self):
"""Close the async client and cleanup resources."""Represents a schema with its type and definition.
class Schema:
def __init__(self, schema_str, schema_type, references=None):
"""
Create Schema object.
Args:
schema_str (str): Schema definition string
schema_type (str): Schema type ('AVRO', 'JSON', 'PROTOBUF')
references (list, optional): List of schema references
"""
@property
def schema_str(self):
"""Schema definition string."""
@property
def schema_type(self):
"""Schema type."""
@property
def references(self):
"""Schema references."""
def __eq__(self, other):
"""Equality comparison."""
def __hash__(self):
"""Hash for use in sets and dicts."""Schema with registry metadata.
class RegisteredSchema:
def __init__(self, schema_id, schema, version, subject):
"""
Create RegisteredSchema object.
Args:
schema_id (int): Schema ID in registry
schema (Schema): Schema object
version (int): Schema version
subject (str): Subject name
"""
@property
def schema_id(self):
"""Schema ID."""
@property
def schema(self):
"""Schema object."""
@property
def version(self):
"""Schema version."""
@property
def subject(self):
"""Subject name."""Serializes Python objects to Avro binary format with Schema Registry integration.
class AvroSerializer:
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
"""
Create AvroSerializer.
Args:
schema_registry_client (SchemaRegistryClient): Registry client
schema_str (str): Avro schema definition
to_dict (callable, optional): Function to convert object to dict
conf (dict, optional): Serializer configuration
"""
def __call__(self, obj, ctx):
"""
Serialize object to Avro bytes.
Args:
obj: Object to serialize
ctx (SerializationContext): Serialization context
Returns:
bytes: Serialized data with schema ID prefix
Raises:
SerializationError: If serialization fails
"""Deserializes Avro binary data to Python objects using Schema Registry.
class AvroDeserializer:
def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):
"""
Create AvroDeserializer.
Args:
schema_registry_client (SchemaRegistryClient): Registry client
schema_str (str, optional): Reader schema definition
from_dict (callable, optional): Function to convert dict to object
return_record_name (bool): Whether to return record name
"""
def __call__(self, value, ctx):
"""
Deserialize Avro bytes to object.
Args:
value (bytes): Serialized data with schema ID prefix
ctx (SerializationContext): Serialization context
Returns:
object: Deserialized object
Raises:
SerializationError: If deserialization fails
"""Serializes Python objects to JSON with Schema Registry integration.
class JSONSerializer:
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
"""
Create JSONSerializer.
Args:
schema_registry_client (SchemaRegistryClient): Registry client
schema_str (str): JSON schema definition
to_dict (callable, optional): Function to convert object to dict
conf (dict, optional): Serializer configuration
"""
def __call__(self, obj, ctx):
"""
Serialize object to JSON bytes.
Args:
obj: Object to serialize
ctx (SerializationContext): Serialization context
Returns:
bytes: Serialized JSON data with schema ID prefix
Raises:
SerializationError: If serialization fails
"""Deserializes JSON data to Python objects using Schema Registry.
class JSONDeserializer:
def __init__(self, schema_registry_client, schema_str=None, from_dict=None):
"""
Create JSONDeserializer.
Args:
schema_registry_client (SchemaRegistryClient): Registry client
schema_str (str, optional): JSON schema definition
from_dict (callable, optional): Function to convert dict to object
"""
def __call__(self, value, ctx):
"""
Deserialize JSON bytes to object.
Args:
value (bytes): Serialized JSON data with schema ID prefix
ctx (SerializationContext): Serialization context
Returns:
object: Deserialized object
Raises:
SerializationError: If deserialization fails
"""Serializes Protobuf messages with Schema Registry integration.
class ProtobufSerializer:
def __init__(self, msg_type, schema_registry_client, conf=None):
"""
Create ProtobufSerializer.
Args:
msg_type: Protobuf message class
schema_registry_client (SchemaRegistryClient): Registry client
conf (dict, optional): Serializer configuration
"""
def __call__(self, obj, ctx):
"""
Serialize Protobuf message to bytes.
Args:
obj: Protobuf message instance
ctx (SerializationContext): Serialization context
Returns:
bytes: Serialized data with schema ID prefix
Raises:
SerializationError: If serialization fails
"""Deserializes Protobuf binary data using Schema Registry.
class ProtobufDeserializer:
def __init__(self, msg_type, schema_registry_client, conf=None):
"""
Create ProtobufDeserializer.
Args:
msg_type: Protobuf message class
schema_registry_client (SchemaRegistryClient): Registry client
conf (dict, optional): Deserializer configuration
"""
def __call__(self, value, ctx):
"""
Deserialize Protobuf bytes to message.
Args:
value (bytes): Serialized data with schema ID prefix
ctx (SerializationContext): Serialization context
Returns:
object: Deserialized Protobuf message
Raises:
SerializationError: If deserialization fails
"""Functions for generating subject names for schemas.
def topic_subject_name_strategy(ctx, record_name):
"""
Create subject name as {topic}-{key|value}.
Args:
ctx (SerializationContext): Context with topic and field info
record_name (str): Record name (unused)
Returns:
str: Subject name in format 'topic-key' or 'topic-value'
"""
def topic_record_subject_name_strategy(ctx, record_name):
"""
Create subject name as {topic}-{record_name}.
Args:
ctx (SerializationContext): Context with topic info
record_name (str): Record name from schema
Returns:
str: Subject name in format 'topic-recordname'
"""
def record_subject_name_strategy(ctx, record_name):
"""
Create subject name as {record_name}.
Args:
ctx (SerializationContext): Context (unused)
record_name (str): Record name from schema
Returns:
str: Subject name as record name
"""Functions for handling schema ID serialization in messages.
def prefix_schema_id_serializer(schema_id, data):
"""
Serialize schema ID into payload prefix.
Args:
schema_id (int): Schema ID to serialize
data (bytes): Message payload
Returns:
bytes: Data with 5-byte schema ID prefix
"""
def header_schema_id_serializer(schema_id, data):
"""
Serialize schema ID into message headers.
Args:
schema_id (int): Schema ID to serialize
data (bytes): Message payload
Returns:
tuple: (data, headers_dict)
"""
def prefix_schema_id_deserializer(data):
"""
Deserialize schema ID from payload prefix.
Args:
data (bytes): Data with schema ID prefix
Returns:
tuple: (schema_id, payload)
"""
def dual_schema_id_deserializer(data, headers=None):
"""
Deserialize schema ID from headers or payload prefix.
Args:
data (bytes): Message payload
headers (dict, optional): Message headers
Returns:
tuple: (schema_id, payload)
"""Schema Registry server configuration.
class ServerConfig:
@property
def compatibility_level(self):
"""Default compatibility level."""
@property
def mode(self):
"""Schema Registry mode."""class ConfigCompatibilityLevel:
BACKWARD = "BACKWARD"
BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
FORWARD = "FORWARD"
FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE"
FULL = "FULL"
FULL_TRANSITIVE = "FULL_TRANSITIVE"
NONE = "NONE"
class RuleKind:
CONDITION = "CONDITION"
TRANSFORM = "TRANSFORM"
class RuleMode:
UPGRADE = "UPGRADE"
DOWNGRADE = "DOWNGRADE"
UPDOWN = "UPDOWN"
WRITE = "WRITE"
READ = "READ"
WRITEREAD = "WRITEREAD"from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
# Schema Registry client
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Avro schema
value_schema_str = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
# Create serializer
avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)
# Producer configuration
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer
}
producer = SerializingProducer(producer_conf)
# Produce message
user_record = {'name': 'Alice', 'age': 30}
producer.produce(topic='users', key='user1', value=user_record)
producer.flush()
# Consumer configuration
avro_deserializer = AvroDeserializer(schema_registry_client, value_schema_str)
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.deserializer': StringDeserializer('utf_8'),
'value.deserializer': avro_deserializer,
'group.id': 'user-group',
'auto.offset.reset': 'earliest'
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['users'])
msg = consumer.poll(1.0)
if msg is not None:
user_object = msg.value()
print(f"User: {user_object['name']}, Age: {user_object['age']}")
consumer.close()from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
# Register initial schema
initial_schema = Schema("""
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
""", schema_type='AVRO')
schema_id = schema_registry_client.register_schema('users-value', initial_schema)
print(f"Registered schema with ID: {schema_id}")
# Evolve schema (add optional field)
evolved_schema = Schema("""
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
""", schema_type='AVRO')
# Test compatibility
compatible = schema_registry_client.test_compatibility('users-value', evolved_schema)
print(f"Schema is compatible: {compatible}")
if compatible:
new_schema_id = schema_registry_client.register_schema('users-value', evolved_schema)
print(f"Registered evolved schema with ID: {new_schema_id}")from confluent_kafka.schema_registry.json_schema import JSONSerializer, JSONDeserializer
# JSON schema
json_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer", "minimum": 0}
},
"required": ["name", "age"]
}
"""
json_serializer = JSONSerializer(schema_registry_client, json_schema_str)
json_deserializer = JSONDeserializer(schema_registry_client, json_schema_str)
# Use with SerializingProducer/DeserializingConsumer
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'value.serializer': json_serializer
}
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'value.deserializer': json_deserializer,
'group.id': 'json-group',
'auto.offset.reset': 'earliest'
}from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer
import user_pb2 # Generated from .proto file
# Protobuf serializer/deserializer
protobuf_serializer = ProtobufSerializer(user_pb2.User, schema_registry_client)
protobuf_deserializer = ProtobufDeserializer(user_pb2.User, schema_registry_client)
# Create protobuf message
user = user_pb2.User()
user.name = "Bob"
user.age = 25
# Use with producer/consumer
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'value.serializer': protobuf_serializer
}
producer = SerializingProducer(producer_conf)
producer.produce('users-protobuf', value=user)
producer.flush()Install with Tessl CLI
npx tessl i tessl/pypi-confluent-kafka