0
# kafka-python
1
2
A pure Python client library for Apache Kafka distributed stream processing system. Provides comprehensive support for producers, consumers, and administrative operations with compatibility across Kafka broker versions 0.8.0 to 2.6+.
3
4
## Version Compatibility
5
6
- **Core Features**: Compatible with Kafka 0.8.0+
7
- **Consumer Groups**: Requires Kafka 0.9.0+ for coordinated consumer groups
8
- **Idempotent Producer**: Requires Kafka 0.11.0+
9
- **Transactional Producer**: Requires Kafka 0.11.0+
10
- **Admin API**: Requires Kafka 0.10.1.0+
11
- **Headers Support**: Requires Kafka 0.11.0+
12
- **Exactly-Once Semantics**: Requires Kafka 0.11.0+
13
14
## Package Information
15
16
- **Package Name**: kafka-python
17
- **Language**: Python
18
- **Installation**: `pip install kafka-python`
19
- **Optional Dependencies**:
20
- `pip install kafka-python[crc32c]` - Faster CRC32C validation
21
- `pip install kafka-python[lz4]` - LZ4 compression support
22
- `pip install kafka-python[snappy]` - Snappy compression support
23
- `pip install kafka-python[zstd]` - Zstandard compression support
24
25
## Core Imports
26
27
Basic imports for common usage:
28
29
```python
30
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
31
```
32
33
Topic and partition management:
34
35
```python
36
from kafka import TopicPartition, OffsetAndMetadata
37
```
38
39
Consumer coordination:
40
41
```python
42
from kafka.consumer import ConsumerRebalanceListener
43
```
44
45
Administrative objects:
46
47
```python
48
from kafka.admin import NewTopic, NewPartitions, ConfigResource, ACL
49
```
50
51
Low-level client and connection:
52
53
```python
54
from kafka import KafkaClient, BrokerConnection
55
```
56
57
Serialization interfaces:
58
59
```python
60
from kafka import Serializer, Deserializer
61
```
62
63
## Basic Usage
64
65
### Simple Producer
66
67
```python
68
from kafka import KafkaProducer
69
import json
70
71
# Configure producer
72
producer = KafkaProducer(
73
bootstrap_servers=['localhost:9092'],
74
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
75
key_serializer=lambda k: k.encode('utf-8') if k else None
76
)
77
78
# Send message
79
future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')
80
81
# Block for acknowledgment
82
record_metadata = future.get(timeout=10)
83
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")
84
85
# Clean shutdown
86
producer.close()
87
```
88
89
### Simple Consumer
90
91
```python
92
from kafka import KafkaConsumer
93
import json
94
95
# Configure consumer
96
consumer = KafkaConsumer(
97
'my-topic',
98
bootstrap_servers=['localhost:9092'],
99
group_id='my-consumer-group',
100
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
101
key_deserializer=lambda k: k.decode('utf-8') if k else None,
102
auto_offset_reset='earliest'
103
)
104
105
# Consume messages
106
for message in consumer:
107
print(f"Received: key={message.key}, value={message.value}")
108
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}")
109
110
# Clean shutdown
111
consumer.close()
112
```
113
114
### Administrative Operations
115
116
```python
117
from kafka import KafkaAdminClient
118
from kafka.admin import NewTopic, ConfigResource, ConfigResourceType
119
120
# Configure admin client
121
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
122
123
# Create topic
124
topic = NewTopic(name='new-topic', num_partitions=3, replication_factor=1)
125
admin.create_topics([topic])
126
127
# List topics
128
topics = admin.list_topics()
129
print(f"Available topics: {list(topics)}")
130
131
# Clean shutdown
132
admin.close()
133
```
134
135
## Architecture
136
137
kafka-python follows a layered architecture designed for both simplicity and extensibility:
138
139
- **High-Level APIs**: `KafkaProducer`, `KafkaConsumer`, `KafkaAdminClient` provide simple interfaces for common operations
140
- **Low-Level Client**: `KafkaClient` handles protocol communication and connection management
141
- **Connection Layer**: `BrokerConnection` manages individual TCP connections with proper error handling and reconnection
142
- **Protocol Layer**: Complete implementation of Kafka wire protocol with support for all API versions
143
- **Pluggable Components**: Serializers, partitioners, and metrics collectors can be customized or replaced
144
145
This design enables everything from simple fire-and-forget message sending to complex transactional processing and cluster administration.
146
147
## Capabilities
148
149
### Message Production
150
151
High-level producer for publishing records to Kafka topics with support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.
152
153
```python { .api }
154
class KafkaProducer:
155
def __init__(self, **configs): ...
156
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None): ...
157
def flush(self, timeout=None): ...
158
def close(self, timeout=None): ...
159
```
160
161
[Producer API](./producer.md)
162
163
### Message Consumption
164
165
High-level consumer for consuming records from Kafka topics with support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.
166
167
```python { .api }
168
class KafkaConsumer:
169
def __init__(self, *topics, **configs): ...
170
def subscribe(self, topics=(), pattern=None, listener=None): ...
171
def assign(self, partitions): ...
172
def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
173
def commit(self, offsets=None): ...
174
def seek(self, partition, offset): ...
175
```
176
177
```python { .api }
178
class ConsumerRebalanceListener:
179
def on_partitions_revoked(self, revoked): ...
180
def on_partitions_assigned(self, assigned): ...
181
```
182
183
[Consumer API](./consumer.md)
184
185
### Cluster Administration
186
187
Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.
188
189
```python { .api }
190
class KafkaAdminClient:
191
def __init__(self, **configs): ...
192
def create_topics(self, topic_requests, timeout_ms=None, validate_only=False): ...
193
def delete_topics(self, topics, timeout_ms=None): ...
194
def list_topics(self, timeout_ms=None): ...
195
def describe_topics(self, topics, timeout_ms=None): ...
196
def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False): ...
197
def describe_configs(self, config_resources, timeout_ms=None): ...
198
def alter_configs(self, config_resources, timeout_ms=None): ...
199
```
200
201
[Administrative API](./admin.md)
202
203
### Data Structures and Metadata
204
205
Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.
206
207
```python { .api }
208
TopicPartition = NamedTuple('TopicPartition', [('topic', str), ('partition', int)])
209
OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [('offset', int), ('metadata', str), ('leader_epoch', int)])
210
BrokerMetadata = NamedTuple('BrokerMetadata', [('nodeId', int), ('host', str), ('port', int), ('rack', str)])
211
```
212
213
[Data Structures](./structures.md)
214
215
### Error Handling and Exceptions
216
217
Comprehensive error handling with over 100 exception classes mapping all Kafka protocol errors, client-side errors, and authorization failures with appropriate retry semantics.
218
219
```python { .api }
220
class KafkaError(Exception):
221
retriable: bool
222
invalid_metadata: bool
223
224
class KafkaTimeoutError(KafkaError): ...
225
class KafkaConnectionError(KafkaError): ...
226
class TopicAuthorizationFailedError(AuthorizationError): ...
227
```
228
229
[Error Handling](./errors.md)
230
231
### Low-Level Client and Connection Management
232
233
Low-level client for direct protocol communication and connection management, providing fine-grained control over cluster interactions and metadata handling.
234
235
```python { .api }
236
class KafkaClient:
237
def __init__(self, **configs): ...
238
def bootstrap_connected(self): ...
239
def check_version(self, node_id=None, timeout=2, strict=False): ...
240
def cluster(self): ...
241
242
class BrokerConnection:
243
def __init__(self, host, port, **configs): ...
244
def connect(self): ...
245
def connected(self): ...
246
def close(self): ...
247
```
248
249
### Serialization Framework
250
251
Abstract base classes for implementing custom serializers and deserializers with pluggable serialization strategies.
252
253
```python { .api }
254
class Serializer:
255
def serialize(self, topic, value): ...
256
def close(self): ...
257
258
class Deserializer:
259
def deserialize(self, topic, bytes_): ...
260
def close(self): ...
261
```
262
263
## Authentication and Security
264
265
kafka-python supports multiple authentication mechanisms:
266
267
- **SASL/PLAIN**: Simple username/password authentication
268
- **SASL/SCRAM**: Challenge-response authentication with SHA-256/SHA-512
269
- **SASL/GSSAPI**: Kerberos authentication (Unix systems)
270
- **SASL/OAUTHBEARER**: OAuth 2.0 Bearer token authentication
271
- **AWS MSK IAM**: AWS-specific IAM authentication for Amazon MSK
272
- **SSL/TLS**: Encrypted connections with certificate validation
273
274
## Compression and Serialization
275
276
Built-in support for multiple compression algorithms:
277
278
- **gzip**: Standard compression with configurable levels
279
- **snappy**: Fast compression optimized for speed
280
- **lz4**: High-speed compression with Kafka-specific framing
281
- **zstd**: Modern compression with excellent compression ratios
282
283
Extensible serialization framework with abstract base classes for custom serializers and deserializers.
284
285
## Types
286
287
### Core Configuration Types
288
289
```python { .api }
290
# Producer configuration dictionary with string keys and various value types
291
ProducerConfig = Dict[str, Any]
292
293
# Consumer configuration dictionary with string keys and various value types
294
ConsumerConfig = Dict[str, Any]
295
296
# Admin client configuration dictionary with string keys and various value types
297
AdminConfig = Dict[str, Any]
298
```
299
300
### Message Types
301
302
```python { .api }
303
# Producer message future returned by send()
304
class FutureRecordMetadata:
305
def get(self, timeout=None): ...
306
def add_callback(self, callback): ...
307
def add_errback(self, errback): ...
308
309
# Consumer record received from poll()
310
class ConsumerRecord:
311
topic: str
312
partition: int
313
offset: int
314
timestamp: int
315
timestamp_type: int
316
key: bytes
317
value: bytes
318
headers: List[Tuple[str, bytes]]
319
```