or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

admin-client.mdcore-producer-consumer.mderror-handling.mdindex.mdschema-registry.mdserialization.md
tile.json

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/confluent-kafka@2.11.x

To install, run

npx @tessl/cli install tessl/pypi-confluent-kafka@2.11.0

index.mddocs/

Confluent Kafka Python Client

Confluent's Python client for Apache Kafka, providing high-performance, feature-rich access to Kafka clusters. Built on librdkafka, it offers both low-level producer/consumer APIs and high-level serialization-aware APIs with Schema Registry integration for Avro, JSON Schema, and Protobuf.

Package Information

  • Package Name: confluent-kafka
  • Language: Python
  • Installation: pip install confluent-kafka
  • Version: 2.11.1

Core Imports

from confluent_kafka import (
    Producer, Consumer, Message, TopicPartition, Uuid,
    Node, ConsumerGroupTopicPartitions, ConsumerGroupState, ConsumerGroupType,
    TopicCollection, TopicPartitionInfo, IsolationLevel,
    KafkaException, KafkaError, libversion, version,
    TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME,
    OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID
)

# Additional classes (not in __all__ but available)
from confluent_kafka import ThrottleEvent, ElectionType

For admin operations:

from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource

For high-level serialization-aware APIs:

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer

For Schema Registry integration:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

Basic Usage

from confluent_kafka import Producer, Consumer, KafkaError

# Basic Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Produce a message
producer.produce('my-topic', key='key1', value='Hello World', callback=delivery_report)
producer.flush()

# Basic Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['my-topic'])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f'End of partition reached {msg.topic()} [{msg.partition()}]')
            else:
                print(f'Error: {msg.error()}')
        else:
            print(f'Received message: key={msg.key()}, value={msg.value()}')
finally:
    consumer.close()

Architecture

The confluent-kafka library is built around a layered architecture:

  • C Extension Core (cimpl): Low-level librdkafka bindings providing high-performance Kafka operations
  • Python Wrappers: Producer, Consumer, and admin clients with Pythonic interfaces
  • High-Level APIs: SerializingProducer and DeserializingConsumer with pluggable serialization
  • Schema Registry Integration: Seamless integration with Confluent Schema Registry for schema evolution
  • Serialization Framework: Extensible serialization system with built-in support for common data types

This design provides both performance and flexibility, allowing users to choose between raw performance with the basic APIs or convenience with the high-level serialization-aware APIs.

Capabilities

Core Producer and Consumer

Fundamental Kafka producer and consumer functionality with support for all Kafka features including transactions, exactly-once semantics, and custom partitioning.

class Producer:
    def __init__(self, conf): ...
    def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...
    def poll(self, timeout=-1): ...
    def flush(self, timeout=-1): ...
    def list_topics(self, topic=None, timeout=-1): ...

class Consumer:
    def __init__(self, conf): ...
    def subscribe(self, topics, listener=None): ...
    def poll(self, timeout=-1): ...
    def commit(self, message=None, offsets=None, asynchronous=True): ...
    def list_topics(self, topic=None, timeout=-1): ...

Core Producer and Consumer

Admin Client Operations

Comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, and consumer groups.

class AdminClient:
    def __init__(self, conf): ...
    def create_topics(self, new_topics, **kwargs): ...
    def delete_topics(self, topics, **kwargs): ...
    def create_partitions(self, fs, **kwargs): ...
    def describe_configs(self, resources, **kwargs): ...
    def alter_configs(self, resources, **kwargs): ...

Admin Client

Schema Registry Integration

Complete integration with Confluent Schema Registry supporting Avro, JSON Schema, and Protobuf with automatic schema evolution and compatibility checking.

class SchemaRegistryClient:
    def __init__(self, conf): ...
    def register_schema(self, subject_name, schema, normalize_schemas=False): ...
    def get_latest_version(self, subject_name): ...
    def get_schema(self, schema_id, fetch_max_id=True): ...

class AvroSerializer:
    def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): ...
    def __call__(self, obj, ctx): ...

Schema Registry

Serialization Framework

Pluggable serialization framework with built-in serializers for common data types and support for custom serialization logic.

class SerializingProducer:
    def __init__(self, conf): ...
    def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...

class DeserializingConsumer:  
    def __init__(self, conf): ...
    def poll(self, timeout=-1): ...

class StringSerializer:
    def __init__(self, codec='utf_8'): ...
    def __call__(self, obj, ctx=None): ...

Serialization Framework

Error Handling

Comprehensive error handling with specific exception types for different failure modes and detailed error information.

class KafkaException(Exception): ...
class KafkaError: ...
class ConsumeError(KafkaException): ...
class ProduceError(KafkaException): ...
class SerializationError(Exception): ...

Error Handling

Additional Classes

ThrottleEvent

Contains details about throttled requests from Kafka brokers.

class ThrottleEvent:
    def __init__(self, broker_name, broker_id, throttle_time):
        """
        Create ThrottleEvent instance.
        
        Args:
            broker_name (str): Hostname of the broker that throttled the request
            broker_id (int): Broker ID
            throttle_time (float): Throttle time in seconds
        """

    @property
    def broker_name(self):
        """Hostname of the broker that throttled the request."""

    @property
    def broker_id(self):
        """Broker ID."""

    @property
    def throttle_time(self):
        """Throttle time in seconds."""

Model Classes

Core data model classes for representing Kafka metadata and consumer group information.

class Node:
    def __init__(self, id, host, port, rack=None):
        """
        Represents broker node information.
        
        Args:
            id (int): Node ID
            host (str): Hostname
            port (int): Port number
            rack (str, optional): Rack identifier
        """

class ConsumerGroupTopicPartitions:
    def __init__(self, group_id, topic_partitions=None):
        """
        Consumer group with topic partition information.
        
        Args:
            group_id (str): Consumer group ID
            topic_partitions (list, optional): List of TopicPartition objects
        """

class TopicCollection:
    def __init__(self, topic_names):
        """
        Collection of topic names.
        
        Args:
            topic_names (list): List of topic name strings
        """

class TopicPartitionInfo:
    def __init__(self, id, leader, replicas, isr):
        """
        Partition metadata information.
        
        Args:
            id (int): Partition ID
            leader (Node): Leader broker node
            replicas (list): List of replica nodes
            isr (list): List of in-sync replica nodes
        """

# Enumeration classes
class ConsumerGroupState:
    UNKNOWN = 0
    PREPARING_REBALANCING = 1
    COMPLETING_REBALANCING = 2
    STABLE = 3
    DEAD = 4
    EMPTY = 5

class ConsumerGroupType:
    UNKNOWN = 0
    CONSUMER = 1
    CLASSIC = 2

class IsolationLevel:
    READ_UNCOMMITTED = 0
    READ_COMMITTED = 1

class ElectionType:
    PREFERRED = 0
    UNCLEAN = 1

Constants

# Timestamp types
TIMESTAMP_NOT_AVAILABLE = -1
TIMESTAMP_CREATE_TIME = 0
TIMESTAMP_LOG_APPEND_TIME = 1

# Offset constants
OFFSET_BEGINNING = -2
OFFSET_END = -1
OFFSET_STORED = -1000
OFFSET_INVALID = -1001

Version Information

def version():
    """
    Get confluent-kafka version.
    
    Returns:
        tuple: Version tuple (version_string, version_int)
    """

def libversion():
    """
    Get librdkafka version.
    
    Returns:
        tuple: Version tuple (version_string, version_int)
    """