CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-confluent-kafka

Confluent's Python client for Apache Kafka

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Confluent Kafka Python Client

Confluent's Python client for Apache Kafka, providing high-performance, feature-rich access to Kafka clusters. Built on librdkafka, it offers both low-level producer/consumer APIs and high-level serialization-aware APIs with Schema Registry integration for Avro, JSON Schema, and Protobuf.

Package Information

  • Package Name: confluent-kafka
  • Language: Python
  • Installation: pip install confluent-kafka
  • Version: 2.11.1

Core Imports

from confluent_kafka import (
    Producer, Consumer, Message, TopicPartition, Uuid,
    Node, ConsumerGroupTopicPartitions, ConsumerGroupState, ConsumerGroupType,
    TopicCollection, TopicPartitionInfo, IsolationLevel,
    KafkaException, KafkaError, libversion, version,
    TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME,
    OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID
)

# Additional classes (not in __all__ but available)
from confluent_kafka import ThrottleEvent, ElectionType

For admin operations:

from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource

For high-level serialization-aware APIs:

from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer

For Schema Registry integration:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

Basic Usage

from confluent_kafka import Producer, Consumer, KafkaError

# Basic Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Produce a message
producer.produce('my-topic', key='key1', value='Hello World', callback=delivery_report)
producer.flush()

# Basic Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['my-topic'])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f'End of partition reached {msg.topic()} [{msg.partition()}]')
            else:
                print(f'Error: {msg.error()}')
        else:
            print(f'Received message: key={msg.key()}, value={msg.value()}')
finally:
    consumer.close()

Architecture

The confluent-kafka library is built around a layered architecture:

  • C Extension Core (cimpl): Low-level librdkafka bindings providing high-performance Kafka operations
  • Python Wrappers: Producer, Consumer, and admin clients with Pythonic interfaces
  • High-Level APIs: SerializingProducer and DeserializingConsumer with pluggable serialization
  • Schema Registry Integration: Seamless integration with Confluent Schema Registry for schema evolution
  • Serialization Framework: Extensible serialization system with built-in support for common data types

This design provides both performance and flexibility, allowing users to choose between raw performance with the basic APIs or convenience with the high-level serialization-aware APIs.

Capabilities

Core Producer and Consumer

Fundamental Kafka producer and consumer functionality with support for all Kafka features including transactions, exactly-once semantics, and custom partitioning.

class Producer:
    def __init__(self, conf): ...
    def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...
    def poll(self, timeout=-1): ...
    def flush(self, timeout=-1): ...
    def list_topics(self, topic=None, timeout=-1): ...

class Consumer:
    def __init__(self, conf): ...
    def subscribe(self, topics, listener=None): ...
    def poll(self, timeout=-1): ...
    def commit(self, message=None, offsets=None, asynchronous=True): ...
    def list_topics(self, topic=None, timeout=-1): ...

Core Producer and Consumer

Admin Client Operations

Comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, and consumer groups.

class AdminClient:
    def __init__(self, conf): ...
    def create_topics(self, new_topics, **kwargs): ...
    def delete_topics(self, topics, **kwargs): ...
    def create_partitions(self, fs, **kwargs): ...
    def describe_configs(self, resources, **kwargs): ...
    def alter_configs(self, resources, **kwargs): ...

Admin Client

Schema Registry Integration

Complete integration with Confluent Schema Registry supporting Avro, JSON Schema, and Protobuf with automatic schema evolution and compatibility checking.

class SchemaRegistryClient:
    def __init__(self, conf): ...
    def register_schema(self, subject_name, schema, normalize_schemas=False): ...
    def get_latest_version(self, subject_name): ...
    def get_schema(self, schema_id, fetch_max_id=True): ...

class AvroSerializer:
    def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): ...
    def __call__(self, obj, ctx): ...

Schema Registry

Serialization Framework

Pluggable serialization framework with built-in serializers for common data types and support for custom serialization logic.

class SerializingProducer:
    def __init__(self, conf): ...
    def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...

class DeserializingConsumer:  
    def __init__(self, conf): ...
    def poll(self, timeout=-1): ...

class StringSerializer:
    def __init__(self, codec='utf_8'): ...
    def __call__(self, obj, ctx=None): ...

Serialization Framework

Error Handling

Comprehensive error handling with specific exception types for different failure modes and detailed error information.

class KafkaException(Exception): ...
class KafkaError: ...
class ConsumeError(KafkaException): ...
class ProduceError(KafkaException): ...
class SerializationError(Exception): ...

Error Handling

Additional Classes

ThrottleEvent

Contains details about throttled requests from Kafka brokers.

class ThrottleEvent:
    def __init__(self, broker_name, broker_id, throttle_time):
        """
        Create ThrottleEvent instance.
        
        Args:
            broker_name (str): Hostname of the broker that throttled the request
            broker_id (int): Broker ID
            throttle_time (float): Throttle time in seconds
        """

    @property
    def broker_name(self):
        """Hostname of the broker that throttled the request."""

    @property
    def broker_id(self):
        """Broker ID."""

    @property
    def throttle_time(self):
        """Throttle time in seconds."""

Model Classes

Core data model classes for representing Kafka metadata and consumer group information.

class Node:
    def __init__(self, id, host, port, rack=None):
        """
        Represents broker node information.
        
        Args:
            id (int): Node ID
            host (str): Hostname
            port (int): Port number
            rack (str, optional): Rack identifier
        """

class ConsumerGroupTopicPartitions:
    def __init__(self, group_id, topic_partitions=None):
        """
        Consumer group with topic partition information.
        
        Args:
            group_id (str): Consumer group ID
            topic_partitions (list, optional): List of TopicPartition objects
        """

class TopicCollection:
    def __init__(self, topic_names):
        """
        Collection of topic names.
        
        Args:
            topic_names (list): List of topic name strings
        """

class TopicPartitionInfo:
    def __init__(self, id, leader, replicas, isr):
        """
        Partition metadata information.
        
        Args:
            id (int): Partition ID
            leader (Node): Leader broker node
            replicas (list): List of replica nodes
            isr (list): List of in-sync replica nodes
        """

# Enumeration classes
class ConsumerGroupState:
    UNKNOWN = 0
    PREPARING_REBALANCING = 1
    COMPLETING_REBALANCING = 2
    STABLE = 3
    DEAD = 4
    EMPTY = 5

class ConsumerGroupType:
    UNKNOWN = 0
    CONSUMER = 1
    CLASSIC = 2

class IsolationLevel:
    READ_UNCOMMITTED = 0
    READ_COMMITTED = 1

class ElectionType:
    PREFERRED = 0
    UNCLEAN = 1

Constants

# Timestamp types
TIMESTAMP_NOT_AVAILABLE = -1
TIMESTAMP_CREATE_TIME = 0
TIMESTAMP_LOG_APPEND_TIME = 1

# Offset constants
OFFSET_BEGINNING = -2
OFFSET_END = -1
OFFSET_STORED = -1000
OFFSET_INVALID = -1001

Version Information

def version():
    """
    Get confluent-kafka version.
    
    Returns:
        tuple: Version tuple (version_string, version_int)
    """

def libversion():
    """
    Get librdkafka version.
    
    Returns:
        tuple: Version tuple (version_string, version_int)
    """

docs

admin-client.md

core-producer-consumer.md

error-handling.md

index.md

schema-registry.md

serialization.md

tile.json