Confluent's Python client for Apache Kafka
npx @tessl/cli install tessl/pypi-confluent-kafka@2.11.00
# Confluent Kafka Python Client
1
2
Confluent's Python client for Apache Kafka, providing high-performance, feature-rich access to Kafka clusters. Built on librdkafka, it offers both low-level producer/consumer APIs and high-level serialization-aware APIs with Schema Registry integration for Avro, JSON Schema, and Protobuf.
3
4
## Package Information
5
6
- **Package Name**: confluent-kafka
7
- **Language**: Python
8
- **Installation**: `pip install confluent-kafka`
9
- **Version**: 2.11.1
10
11
## Core Imports
12
13
```python
14
from confluent_kafka import (
15
Producer, Consumer, Message, TopicPartition, Uuid,
16
Node, ConsumerGroupTopicPartitions, ConsumerGroupState, ConsumerGroupType,
17
TopicCollection, TopicPartitionInfo, IsolationLevel,
18
KafkaException, KafkaError, libversion, version,
19
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME,
20
OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID
21
)
22
23
# Additional classes (not in __all__ but available)
24
from confluent_kafka import ThrottleEvent, ElectionType
25
```
26
27
For admin operations:
28
29
```python
30
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource
31
```
32
33
For high-level serialization-aware APIs:
34
35
```python
36
from confluent_kafka import SerializingProducer, DeserializingConsumer
37
from confluent_kafka.serialization import StringSerializer, StringDeserializer
38
```
39
40
For Schema Registry integration:
41
42
```python
43
from confluent_kafka.schema_registry import SchemaRegistryClient
44
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
45
```
46
47
## Basic Usage
48
49
```python
50
from confluent_kafka import Producer, Consumer, KafkaError
51
52
# Basic Producer
53
producer = Producer({'bootstrap.servers': 'localhost:9092'})
54
55
def delivery_report(err, msg):
56
if err is not None:
57
print(f'Message delivery failed: {err}')
58
else:
59
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
60
61
# Produce a message
62
producer.produce('my-topic', key='key1', value='Hello World', callback=delivery_report)
63
producer.flush()
64
65
# Basic Consumer
66
consumer = Consumer({
67
'bootstrap.servers': 'localhost:9092',
68
'group.id': 'my-group',
69
'auto.offset.reset': 'earliest'
70
})
71
72
consumer.subscribe(['my-topic'])
73
74
try:
75
while True:
76
msg = consumer.poll(1.0)
77
if msg is None:
78
continue
79
if msg.error():
80
if msg.error().code() == KafkaError._PARTITION_EOF:
81
print(f'End of partition reached {msg.topic()} [{msg.partition()}]')
82
else:
83
print(f'Error: {msg.error()}')
84
else:
85
print(f'Received message: key={msg.key()}, value={msg.value()}')
86
finally:
87
consumer.close()
88
```
89
90
## Architecture
91
92
The confluent-kafka library is built around a layered architecture:
93
94
- **C Extension Core (`cimpl`)**: Low-level librdkafka bindings providing high-performance Kafka operations
95
- **Python Wrappers**: Producer, Consumer, and admin clients with Pythonic interfaces
96
- **High-Level APIs**: SerializingProducer and DeserializingConsumer with pluggable serialization
97
- **Schema Registry Integration**: Seamless integration with Confluent Schema Registry for schema evolution
98
- **Serialization Framework**: Extensible serialization system with built-in support for common data types
99
100
This design provides both performance and flexibility, allowing users to choose between raw performance with the basic APIs or convenience with the high-level serialization-aware APIs.
101
102
## Capabilities
103
104
### Core Producer and Consumer
105
106
Fundamental Kafka producer and consumer functionality with support for all Kafka features including transactions, exactly-once semantics, and custom partitioning.
107
108
```python { .api }
109
class Producer:
110
def __init__(self, conf): ...
111
def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...
112
def poll(self, timeout=-1): ...
113
def flush(self, timeout=-1): ...
114
def list_topics(self, topic=None, timeout=-1): ...
115
116
class Consumer:
117
def __init__(self, conf): ...
118
def subscribe(self, topics, listener=None): ...
119
def poll(self, timeout=-1): ...
120
def commit(self, message=None, offsets=None, asynchronous=True): ...
121
def list_topics(self, topic=None, timeout=-1): ...
122
```
123
124
[Core Producer and Consumer](./core-producer-consumer.md)
125
126
### Admin Client Operations
127
128
Comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, and consumer groups.
129
130
```python { .api }
131
class AdminClient:
132
def __init__(self, conf): ...
133
def create_topics(self, new_topics, **kwargs): ...
134
def delete_topics(self, topics, **kwargs): ...
135
def create_partitions(self, fs, **kwargs): ...
136
def describe_configs(self, resources, **kwargs): ...
137
def alter_configs(self, resources, **kwargs): ...
138
```
139
140
[Admin Client](./admin-client.md)
141
142
### Schema Registry Integration
143
144
Complete integration with Confluent Schema Registry supporting Avro, JSON Schema, and Protobuf with automatic schema evolution and compatibility checking.
145
146
```python { .api }
147
class SchemaRegistryClient:
148
def __init__(self, conf): ...
149
def register_schema(self, subject_name, schema, normalize_schemas=False): ...
150
def get_latest_version(self, subject_name): ...
151
def get_schema(self, schema_id, fetch_max_id=True): ...
152
153
class AvroSerializer:
154
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): ...
155
def __call__(self, obj, ctx): ...
156
```
157
158
[Schema Registry](./schema-registry.md)
159
160
### Serialization Framework
161
162
Pluggable serialization framework with built-in serializers for common data types and support for custom serialization logic.
163
164
```python { .api }
165
class SerializingProducer:
166
def __init__(self, conf): ...
167
def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None): ...
168
169
class DeserializingConsumer:
170
def __init__(self, conf): ...
171
def poll(self, timeout=-1): ...
172
173
class StringSerializer:
174
def __init__(self, codec='utf_8'): ...
175
def __call__(self, obj, ctx=None): ...
176
```
177
178
[Serialization Framework](./serialization.md)
179
180
### Error Handling
181
182
Comprehensive error handling with specific exception types for different failure modes and detailed error information.
183
184
```python { .api }
185
class KafkaException(Exception): ...
186
class KafkaError: ...
187
class ConsumeError(KafkaException): ...
188
class ProduceError(KafkaException): ...
189
class SerializationError(Exception): ...
190
```
191
192
[Error Handling](./error-handling.md)
193
194
### Additional Classes
195
196
#### ThrottleEvent
197
198
Contains details about throttled requests from Kafka brokers.
199
200
```python { .api }
201
class ThrottleEvent:
202
def __init__(self, broker_name, broker_id, throttle_time):
203
"""
204
Create ThrottleEvent instance.
205
206
Args:
207
broker_name (str): Hostname of the broker that throttled the request
208
broker_id (int): Broker ID
209
throttle_time (float): Throttle time in seconds
210
"""
211
212
@property
213
def broker_name(self):
214
"""Hostname of the broker that throttled the request."""
215
216
@property
217
def broker_id(self):
218
"""Broker ID."""
219
220
@property
221
def throttle_time(self):
222
"""Throttle time in seconds."""
223
```
224
225
## Model Classes
226
227
Core data model classes for representing Kafka metadata and consumer group information.
228
229
```python { .api }
230
class Node:
231
def __init__(self, id, host, port, rack=None):
232
"""
233
Represents broker node information.
234
235
Args:
236
id (int): Node ID
237
host (str): Hostname
238
port (int): Port number
239
rack (str, optional): Rack identifier
240
"""
241
242
class ConsumerGroupTopicPartitions:
243
def __init__(self, group_id, topic_partitions=None):
244
"""
245
Consumer group with topic partition information.
246
247
Args:
248
group_id (str): Consumer group ID
249
topic_partitions (list, optional): List of TopicPartition objects
250
"""
251
252
class TopicCollection:
253
def __init__(self, topic_names):
254
"""
255
Collection of topic names.
256
257
Args:
258
topic_names (list): List of topic name strings
259
"""
260
261
class TopicPartitionInfo:
262
def __init__(self, id, leader, replicas, isr):
263
"""
264
Partition metadata information.
265
266
Args:
267
id (int): Partition ID
268
leader (Node): Leader broker node
269
replicas (list): List of replica nodes
270
isr (list): List of in-sync replica nodes
271
"""
272
273
# Enumeration classes
274
class ConsumerGroupState:
275
UNKNOWN = 0
276
PREPARING_REBALANCING = 1
277
COMPLETING_REBALANCING = 2
278
STABLE = 3
279
DEAD = 4
280
EMPTY = 5
281
282
class ConsumerGroupType:
283
UNKNOWN = 0
284
CONSUMER = 1
285
CLASSIC = 2
286
287
class IsolationLevel:
288
READ_UNCOMMITTED = 0
289
READ_COMMITTED = 1
290
291
class ElectionType:
292
PREFERRED = 0
293
UNCLEAN = 1
294
```
295
296
## Constants
297
298
```python { .api }
299
# Timestamp types
300
TIMESTAMP_NOT_AVAILABLE = -1
301
TIMESTAMP_CREATE_TIME = 0
302
TIMESTAMP_LOG_APPEND_TIME = 1
303
304
# Offset constants
305
OFFSET_BEGINNING = -2
306
OFFSET_END = -1
307
OFFSET_STORED = -1000
308
OFFSET_INVALID = -1001
309
```
310
311
## Version Information
312
313
```python { .api }
314
def version():
315
"""
316
Get confluent-kafka version.
317
318
Returns:
319
tuple: Version tuple (version_string, version_int)
320
"""
321
322
def libversion():
323
"""
324
Get librdkafka version.
325
326
Returns:
327
tuple: Version tuple (version_string, version_int)
328
"""
329
```