Confluent's Python client for Apache Kafka
npx @tessl/cli install tessl/pypi-confluent-kafka@2.11.0Confluent's Python client for Apache Kafka, providing high-performance, feature-rich access to Kafka clusters. Built on librdkafka, it offers both low-level producer/consumer APIs and high-level serialization-aware APIs with Schema Registry integration for Avro, JSON Schema, and Protobuf.
pip install confluent-kafkafrom confluent_kafka import (
Producer, Consumer, Message, TopicPartition, Uuid,
Node, ConsumerGroupTopicPartitions, ConsumerGroupState, ConsumerGroupType,
TopicCollection, TopicPartitionInfo, IsolationLevel,
KafkaException, KafkaError, libversion, version,
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME,
OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID
)
# Additional classes (not in __all__ but available)
from confluent_kafka import ThrottleEvent, ElectionTypeFor admin operations:
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResourceFor high-level serialization-aware APIs:
from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializerFor Schema Registry integration:
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializerfrom confluent_kafka import Producer, Consumer, KafkaError
# Basic Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
# Produce a message
producer.produce('my-topic', key='key1', value='Hello World', callback=delivery_report)
producer.flush()
# Basic Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f'End of partition reached {msg.topic()} [{msg.partition()}]')
else:
print(f'Error: {msg.error()}')
else:
print(f'Received message: key={msg.key()}, value={msg.value()}')
finally:
consumer.close()The confluent-kafka library is built around a layered architecture:
cimpl): Low-level librdkafka bindings providing high-performance Kafka operationsThis design provides both performance and flexibility, allowing users to choose between raw performance with the basic APIs or convenience with the high-level serialization-aware APIs.
Fundamental Kafka producer and consumer functionality with support for all Kafka features including transactions, exactly-once semantics, and custom partitioning.
class Producer:
def __init__(self, conf): ...
def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...
def poll(self, timeout=-1): ...
def flush(self, timeout=-1): ...
def list_topics(self, topic=None, timeout=-1): ...
class Consumer:
def __init__(self, conf): ...
def subscribe(self, topics, listener=None): ...
def poll(self, timeout=-1): ...
def commit(self, message=None, offsets=None, asynchronous=True): ...
def list_topics(self, topic=None, timeout=-1): ...Comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, and consumer groups.
class AdminClient:
def __init__(self, conf): ...
def create_topics(self, new_topics, **kwargs): ...
def delete_topics(self, topics, **kwargs): ...
def create_partitions(self, fs, **kwargs): ...
def describe_configs(self, resources, **kwargs): ...
def alter_configs(self, resources, **kwargs): ...Complete integration with Confluent Schema Registry supporting Avro, JSON Schema, and Protobuf with automatic schema evolution and compatibility checking.
class SchemaRegistryClient:
def __init__(self, conf): ...
def register_schema(self, subject_name, schema, normalize_schemas=False): ...
def get_latest_version(self, subject_name): ...
def get_schema(self, schema_id, fetch_max_id=True): ...
class AvroSerializer:
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): ...
def __call__(self, obj, ctx): ...Pluggable serialization framework with built-in serializers for common data types and support for custom serialization logic.
class SerializingProducer:
def __init__(self, conf): ...
def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...
class DeserializingConsumer:
def __init__(self, conf): ...
def poll(self, timeout=-1): ...
class StringSerializer:
def __init__(self, codec='utf_8'): ...
def __call__(self, obj, ctx=None): ...Comprehensive error handling with specific exception types for different failure modes and detailed error information.
class KafkaException(Exception): ...
class KafkaError: ...
class ConsumeError(KafkaException): ...
class ProduceError(KafkaException): ...
class SerializationError(Exception): ...Contains details about throttled requests from Kafka brokers.
class ThrottleEvent:
def __init__(self, broker_name, broker_id, throttle_time):
"""
Create ThrottleEvent instance.
Args:
broker_name (str): Hostname of the broker that throttled the request
broker_id (int): Broker ID
throttle_time (float): Throttle time in seconds
"""
@property
def broker_name(self):
"""Hostname of the broker that throttled the request."""
@property
def broker_id(self):
"""Broker ID."""
@property
def throttle_time(self):
"""Throttle time in seconds."""Core data model classes for representing Kafka metadata and consumer group information.
class Node:
def __init__(self, id, host, port, rack=None):
"""
Represents broker node information.
Args:
id (int): Node ID
host (str): Hostname
port (int): Port number
rack (str, optional): Rack identifier
"""
class ConsumerGroupTopicPartitions:
def __init__(self, group_id, topic_partitions=None):
"""
Consumer group with topic partition information.
Args:
group_id (str): Consumer group ID
topic_partitions (list, optional): List of TopicPartition objects
"""
class TopicCollection:
def __init__(self, topic_names):
"""
Collection of topic names.
Args:
topic_names (list): List of topic name strings
"""
class TopicPartitionInfo:
def __init__(self, id, leader, replicas, isr):
"""
Partition metadata information.
Args:
id (int): Partition ID
leader (Node): Leader broker node
replicas (list): List of replica nodes
isr (list): List of in-sync replica nodes
"""
# Enumeration classes
class ConsumerGroupState:
UNKNOWN = 0
PREPARING_REBALANCING = 1
COMPLETING_REBALANCING = 2
STABLE = 3
DEAD = 4
EMPTY = 5
class ConsumerGroupType:
UNKNOWN = 0
CONSUMER = 1
CLASSIC = 2
class IsolationLevel:
READ_UNCOMMITTED = 0
READ_COMMITTED = 1
class ElectionType:
PREFERRED = 0
UNCLEAN = 1# Timestamp types
TIMESTAMP_NOT_AVAILABLE = -1
TIMESTAMP_CREATE_TIME = 0
TIMESTAMP_LOG_APPEND_TIME = 1
# Offset constants
OFFSET_BEGINNING = -2
OFFSET_END = -1
OFFSET_STORED = -1000
OFFSET_INVALID = -1001def version():
"""
Get confluent-kafka version.
Returns:
tuple: Version tuple (version_string, version_int)
"""
def libversion():
"""
Get librdkafka version.
Returns:
tuple: Version tuple (version_string, version_int)
"""