CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

schema-registry.mddocs/

Schema Registry Integration

Complete integration with Confluent Schema Registry supporting schema evolution, compatibility checking, and automatic serialization/deserialization for Avro, JSON Schema, and Protobuf formats.

Capabilities

SchemaRegistryClient

Synchronous client for interacting with Confluent Schema Registry.

class SchemaRegistryClient:
    def __init__(self, conf):
        """
        Create SchemaRegistryClient instance.
        
        Args:
            conf (dict): Configuration properties including 'url' and optional auth settings
        """

    def register_schema(self, subject_name, schema, normalize_schemas=False):
        """
        Register a schema for the specified subject.
        
        Args:
            subject_name (str): Subject name for the schema
            schema (Schema): Schema object to register
            normalize_schemas (bool): Whether to normalize the schema
            
        Returns:
            int: Schema ID assigned by registry
            
        Raises:
            SchemaRegistryError: If registration fails
        """

    def get_latest_version(self, subject_name):
        """
        Get the latest schema version for a subject.
        
        Args:
            subject_name (str): Subject name
            
        Returns:
            RegisteredSchema: Latest registered schema
            
        Raises:
            SchemaRegistryError: If subject not found
        """

    def get_version(self, subject_name, version):
        """
        Get a specific schema version for a subject.
        
        Args:
            subject_name (str): Subject name
            version (int): Schema version number
            
        Returns:
            RegisteredSchema: Registered schema at specified version
            
        Raises:
            SchemaRegistryError: If schema version not found
        """

    def get_schema(self, schema_id, fetch_max_id=True):
        """
        Get schema by ID.
        
        Args:
            schema_id (int): Schema ID
            fetch_max_id (bool): Whether to fetch maximum schema ID
            
        Returns:
            Schema: Schema object
            
        Raises:
            SchemaRegistryError: If schema not found
        """

    def get_subjects(self):
        """
        Get list of all subjects.
        
        Returns:
            list: List of subject names
            
        Raises:
            SchemaRegistryError: If request fails
        """

    def delete_subject(self, subject_name, permanent=False):
        """
        Delete a subject.
        
        Args:
            subject_name (str): Subject name to delete
            permanent (bool): Whether to permanently delete
            
        Returns:
            list: List of deleted version numbers
            
        Raises:
            SchemaRegistryError: If deletion fails
        """

    def delete_version(self, subject_name, version, permanent=False):
        """
        Delete a specific version of a subject.
        
        Args:
            subject_name (str): Subject name
            version (int): Version number to delete
            permanent (bool): Whether to permanently delete
            
        Returns:
            int: Deleted version number
            
        Raises:
            SchemaRegistryError: If deletion fails
        """

    def get_compatibility(self, subject_name=None):
        """
        Get compatibility level for subject or global default.
        
        Args:
            subject_name (str, optional): Subject name (None for global)
            
        Returns:
            str: Compatibility level
            
        Raises:
            SchemaRegistryError: If request fails
        """

    def set_compatibility(self, subject_name=None, level=None):
        """
        Set compatibility level for subject or global default.
        
        Args:
            subject_name (str, optional): Subject name (None for global)
            level (str): Compatibility level to set
            
        Returns:
            str: Updated compatibility level
            
        Raises:
            SchemaRegistryError: If update fails
        """

    def test_compatibility(self, subject_name, schema, version='latest'):
        """
        Test schema compatibility with subject.
        
        Args:
            subject_name (str): Subject name
            schema (Schema): Schema to test
            version (str|int): Version to test against
            
        Returns:
            bool: True if compatible, False otherwise
            
        Raises:
            SchemaRegistryError: If test fails
        """

AsyncSchemaRegistryClient

Asynchronous client for Schema Registry operations.

class AsyncSchemaRegistryClient:
    def __init__(self, conf):
        """
        Create AsyncSchemaRegistryClient instance.
        
        Args:
            conf (dict): Configuration properties
        """

    async def register_schema(self, subject_name, schema, normalize_schemas=False):
        """
        Async version of register_schema.
        
        Returns:
            int: Schema ID
        """

    async def get_latest_version(self, subject_name):
        """
        Async version of get_latest_version.
        
        Returns:
            RegisteredSchema: Latest registered schema
        """

    async def get_schema(self, schema_id, fetch_max_id=True):
        """
        Async version of get_schema.
        
        Returns:
            Schema: Schema object
        """

    async def close(self):
        """Close the async client and cleanup resources."""

Schema Classes

Schema

Represents a schema with its type and definition.

class Schema:
    def __init__(self, schema_str, schema_type, references=None):
        """
        Create Schema object.
        
        Args:
            schema_str (str): Schema definition string
            schema_type (str): Schema type ('AVRO', 'JSON', 'PROTOBUF')
            references (list, optional): List of schema references
        """

    @property
    def schema_str(self):
        """Schema definition string."""

    @property
    def schema_type(self):
        """Schema type."""

    @property
    def references(self):
        """Schema references."""

    def __eq__(self, other):
        """Equality comparison."""

    def __hash__(self):
        """Hash for use in sets and dicts."""

RegisteredSchema

Schema with registry metadata.

class RegisteredSchema:
    def __init__(self, schema_id, schema, version, subject):
        """
        Create RegisteredSchema object.
        
        Args:
            schema_id (int): Schema ID in registry
            schema (Schema): Schema object
            version (int): Schema version
            subject (str): Subject name
        """

    @property
    def schema_id(self):
        """Schema ID."""

    @property
    def schema(self):
        """Schema object."""

    @property
    def version(self):
        """Schema version."""

    @property
    def subject(self):
        """Subject name."""

Avro Serialization

AvroSerializer

Serializes Python objects to Avro binary format with Schema Registry integration.

class AvroSerializer:
    def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
        """
        Create AvroSerializer.
        
        Args:
            schema_registry_client (SchemaRegistryClient): Registry client
            schema_str (str): Avro schema definition
            to_dict (callable, optional): Function to convert object to dict
            conf (dict, optional): Serializer configuration
        """

    def __call__(self, obj, ctx):
        """
        Serialize object to Avro bytes.
        
        Args:
            obj: Object to serialize
            ctx (SerializationContext): Serialization context
            
        Returns:
            bytes: Serialized data with schema ID prefix
            
        Raises:
            SerializationError: If serialization fails
        """

AvroDeserializer

Deserializes Avro binary data to Python objects using Schema Registry.

class AvroDeserializer:
    def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):
        """
        Create AvroDeserializer.
        
        Args:
            schema_registry_client (SchemaRegistryClient): Registry client
            schema_str (str, optional): Reader schema definition
            from_dict (callable, optional): Function to convert dict to object
            return_record_name (bool): Whether to return record name
        """

    def __call__(self, value, ctx):
        """
        Deserialize Avro bytes to object.
        
        Args:
            value (bytes): Serialized data with schema ID prefix
            ctx (SerializationContext): Serialization context
            
        Returns:
            object: Deserialized object
            
        Raises:
            SerializationError: If deserialization fails
        """

JSON Schema Serialization

JSONSerializer

Serializes Python objects to JSON with Schema Registry integration.

class JSONSerializer:
    def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
        """
        Create JSONSerializer.
        
        Args:
            schema_registry_client (SchemaRegistryClient): Registry client
            schema_str (str): JSON schema definition
            to_dict (callable, optional): Function to convert object to dict
            conf (dict, optional): Serializer configuration
        """

    def __call__(self, obj, ctx):
        """
        Serialize object to JSON bytes.
        
        Args:
            obj: Object to serialize
            ctx (SerializationContext): Serialization context
            
        Returns:
            bytes: Serialized JSON data with schema ID prefix
            
        Raises:
            SerializationError: If serialization fails
        """

JSONDeserializer

Deserializes JSON data to Python objects using Schema Registry.

class JSONDeserializer:
    def __init__(self, schema_registry_client, schema_str=None, from_dict=None):
        """
        Create JSONDeserializer.
        
        Args:
            schema_registry_client (SchemaRegistryClient): Registry client
            schema_str (str, optional): JSON schema definition
            from_dict (callable, optional): Function to convert dict to object
        """

    def __call__(self, value, ctx):
        """
        Deserialize JSON bytes to object.
        
        Args:
            value (bytes): Serialized JSON data with schema ID prefix
            ctx (SerializationContext): Serialization context
            
        Returns:
            object: Deserialized object
            
        Raises:
            SerializationError: If deserialization fails
        """

Protobuf Serialization

ProtobufSerializer

Serializes Protobuf messages with Schema Registry integration.

class ProtobufSerializer:
    def __init__(self, msg_type, schema_registry_client, conf=None):
        """
        Create ProtobufSerializer.
        
        Args:
            msg_type: Protobuf message class
            schema_registry_client (SchemaRegistryClient): Registry client
            conf (dict, optional): Serializer configuration
        """

    def __call__(self, obj, ctx):
        """
        Serialize Protobuf message to bytes.
        
        Args:
            obj: Protobuf message instance
            ctx (SerializationContext): Serialization context
            
        Returns:
            bytes: Serialized data with schema ID prefix
            
        Raises:
            SerializationError: If serialization fails
        """

ProtobufDeserializer

Deserializes Protobuf binary data using Schema Registry.

class ProtobufDeserializer:
    def __init__(self, msg_type, schema_registry_client, conf=None):
        """
        Create ProtobufDeserializer.
        
        Args:
            msg_type: Protobuf message class
            schema_registry_client (SchemaRegistryClient): Registry client
            conf (dict, optional): Deserializer configuration
        """

    def __call__(self, value, ctx):
        """
        Deserialize Protobuf bytes to message.
        
        Args:
            value (bytes): Serialized data with schema ID prefix
            ctx (SerializationContext): Serialization context
            
        Returns:
            object: Deserialized Protobuf message
            
        Raises:
            SerializationError: If deserialization fails
        """

Subject Naming Strategies

Functions for generating subject names for schemas.

def topic_subject_name_strategy(ctx, record_name):
    """
    Create subject name as {topic}-{key|value}.
    
    Args:
        ctx (SerializationContext): Context with topic and field info
        record_name (str): Record name (unused)
        
    Returns:
        str: Subject name in format 'topic-key' or 'topic-value'
    """

def topic_record_subject_name_strategy(ctx, record_name):
    """
    Create subject name as {topic}-{record_name}.
    
    Args:
        ctx (SerializationContext): Context with topic info
        record_name (str): Record name from schema
        
    Returns:
        str: Subject name in format 'topic-recordname'
    """

def record_subject_name_strategy(ctx, record_name):
    """
    Create subject name as {record_name}.
    
    Args:
        ctx (SerializationContext): Context (unused)
        record_name (str): Record name from schema
        
    Returns:
        str: Subject name as record name
    """

Schema ID Serialization

Functions for handling schema ID serialization in messages.

def prefix_schema_id_serializer(schema_id, data):
    """
    Serialize schema ID into payload prefix.
    
    Args:
        schema_id (int): Schema ID to serialize
        data (bytes): Message payload
        
    Returns:
        bytes: Data with 5-byte schema ID prefix
    """

def header_schema_id_serializer(schema_id, data):
    """
    Serialize schema ID into message headers.
    
    Args:
        schema_id (int): Schema ID to serialize
        data (bytes): Message payload
        
    Returns:
        tuple: (data, headers_dict)
    """

def prefix_schema_id_deserializer(data):
    """
    Deserialize schema ID from payload prefix.
    
    Args:
        data (bytes): Data with schema ID prefix
        
    Returns:
        tuple: (schema_id, payload)
    """

def dual_schema_id_deserializer(data, headers=None):
    """
    Deserialize schema ID from headers or payload prefix.
    
    Args:
        data (bytes): Message payload
        headers (dict, optional): Message headers
        
    Returns:
        tuple: (schema_id, payload)
    """

Configuration Classes

ServerConfig

Schema Registry server configuration.

class ServerConfig:
    @property
    def compatibility_level(self):
        """Default compatibility level."""

    @property
    def mode(self):
        """Schema Registry mode."""

Enumeration Classes

class ConfigCompatibilityLevel:
    BACKWARD = "BACKWARD"
    BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
    FORWARD = "FORWARD"
    FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE"
    FULL = "FULL"
    FULL_TRANSITIVE = "FULL_TRANSITIVE"
    NONE = "NONE"

class RuleKind:
    CONDITION = "CONDITION"
    TRANSFORM = "TRANSFORM"

class RuleMode:
    UPGRADE = "UPGRADE"
    DOWNGRADE = "DOWNGRADE"
    UPDOWN = "UPDOWN"
    WRITE = "WRITE"
    READ = "READ"
    WRITEREAD = "WRITEREAD"

Usage Examples

Basic Avro Serialization

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField

# Schema Registry client
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Avro schema
value_schema_str = """
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
"""

# Create serializer
avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)

# Producer configuration
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': StringSerializer('utf_8'),
    'value.serializer': avro_serializer
}

producer = SerializingProducer(producer_conf)

# Produce message
user_record = {'name': 'Alice', 'age': 30}
producer.produce(topic='users', key='user1', value=user_record)
producer.flush()

# Consumer configuration
avro_deserializer = AvroDeserializer(schema_registry_client, value_schema_str)
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'key.deserializer': StringDeserializer('utf_8'),
    'value.deserializer': avro_deserializer,
    'group.id': 'user-group',
    'auto.offset.reset': 'earliest'
}

consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['users'])

msg = consumer.poll(1.0)
if msg is not None:
    user_object = msg.value()
    print(f"User: {user_object['name']}, Age: {user_object['age']}")

consumer.close()

Schema Evolution Example

from confluent_kafka.schema_registry import SchemaRegistryClient, Schema

schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})

# Register initial schema
initial_schema = Schema("""
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
""", schema_type='AVRO')

schema_id = schema_registry_client.register_schema('users-value', initial_schema)
print(f"Registered schema with ID: {schema_id}")

# Evolve schema (add optional field)
evolved_schema = Schema("""
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}
""", schema_type='AVRO')

# Test compatibility
compatible = schema_registry_client.test_compatibility('users-value', evolved_schema)
print(f"Schema is compatible: {compatible}")

if compatible:
    new_schema_id = schema_registry_client.register_schema('users-value', evolved_schema)
    print(f"Registered evolved schema with ID: {new_schema_id}")

JSON Schema Example

from confluent_kafka.schema_registry.json_schema import JSONSerializer, JSONDeserializer

# JSON schema
json_schema_str = """
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "properties": {
    "name": {"type": "string"},
    "age": {"type": "integer", "minimum": 0}
  },
  "required": ["name", "age"]
}
"""

json_serializer = JSONSerializer(schema_registry_client, json_schema_str)
json_deserializer = JSONDeserializer(schema_registry_client, json_schema_str)

# Use with SerializingProducer/DeserializingConsumer
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'value.serializer': json_serializer
}

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'value.deserializer': json_deserializer,
    'group.id': 'json-group',
    'auto.offset.reset': 'earliest'
}

Protobuf Example

from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer
import user_pb2  # Generated from .proto file

# Protobuf serializer/deserializer
protobuf_serializer = ProtobufSerializer(user_pb2.User, schema_registry_client)
protobuf_deserializer = ProtobufDeserializer(user_pb2.User, schema_registry_client)

# Create protobuf message
user = user_pb2.User()
user.name = "Bob"
user.age = 25

# Use with producer/consumer
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'value.serializer': protobuf_serializer
}

producer = SerializingProducer(producer_conf)
producer.produce('users-protobuf', value=user)
producer.flush()

Install with Tessl CLI

npx tessl i tessl/pypi-confluent-kafka

docs

admin-client.md

core-producer-consumer.md

error-handling.md

index.md

schema-registry.md

serialization.md

tile.json