Pure Python client for Apache Kafka with producer/consumer APIs and admin capabilities
npx @tessl/cli install tessl/pypi-kafka-python-ng@2.2.00
# Kafka Python NG
1
2
A pure Python client library for Apache Kafka that provides high-level producer and consumer APIs, admin functionality, and full protocol support. Designed for compatibility with Kafka brokers from version 0.8.0 to 2.6+, offering Pythonic interfaces for distributed stream processing applications.
3
4
## Package Information
5
6
- **Package Name**: kafka-python-ng
7
- **Language**: Python
8
- **Installation**: `pip install kafka-python-ng`
9
- **Requirements**: Python >= 3.8
10
11
## Core Imports
12
13
```python
14
import kafka
15
```
16
17
For specific functionality:
18
19
```python
20
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
21
```
22
23
For data structures and error handling:
24
25
```python
26
from kafka.structs import TopicPartition, OffsetAndMetadata
27
from kafka.errors import KafkaError
28
from kafka.consumer.subscription_state import ConsumerRebalanceListener
29
from kafka.conn import BrokerConnection
30
from kafka.serializer import Serializer, Deserializer
31
```
32
33
For admin operations:
34
35
```python
36
from kafka.admin import NewTopic, NewPartitions, ConfigResource, ConfigResourceType
37
```
38
39
## Basic Usage
40
41
### Producer
42
43
```python
44
from kafka import KafkaProducer
45
import json
46
47
# Create producer with JSON serialization
48
producer = KafkaProducer(
49
bootstrap_servers=['localhost:9092'],
50
value_serializer=lambda v: json.dumps(v).encode('utf-8')
51
)
52
53
# Send messages
54
producer.send('my-topic', {'key': 'value'})
55
producer.flush()
56
producer.close()
57
```
58
59
### Consumer
60
61
```python
62
from kafka import KafkaConsumer
63
import json
64
65
# Create consumer
66
consumer = KafkaConsumer(
67
'my-topic',
68
bootstrap_servers=['localhost:9092'],
69
auto_offset_reset='earliest',
70
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
71
)
72
73
# Consume messages
74
for message in consumer:
75
print(f"Topic: {message.topic}, Value: {message.value}")
76
```
77
78
### Admin Client
79
80
```python
81
from kafka import KafkaAdminClient
82
from kafka.admin import NewTopic
83
84
# Create admin client
85
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
86
87
# Create topics
88
topic = NewTopic(name='test-topic', num_partitions=3, replication_factor=1)
89
admin.create_topics([topic])
90
```
91
92
## Architecture
93
94
The kafka-python-ng library is organized around three main client types:
95
96
- **KafkaProducer**: Thread-safe producer for publishing records with batching, compression, and retry logic
97
- **KafkaConsumer**: High-level consumer with automatic group coordination, partition assignment, and offset management
98
- **KafkaAdminClient**: Administrative operations for topics, configs, consumer groups, and ACLs
99
100
The library uses an async I/O foundation with selector-based networking, automatic metadata management, and comprehensive error handling. It supports all major Kafka features including SASL authentication, SSL encryption, multiple compression formats, and transactional semantics.
101
102
## Capabilities
103
104
### Producer API
105
106
High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. Supports asynchronous sending with futures and callback handling.
107
108
```python { .api }
109
class KafkaProducer:
110
def __init__(self, **configs): ...
111
def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None): ...
112
def flush(self, timeout=None): ...
113
def close(self, timeout=None): ...
114
```
115
116
[Producer Operations](./producer.md)
117
118
### Consumer API
119
120
High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based and manual partition assignment.
121
122
```python { .api }
123
class KafkaConsumer:
124
def __init__(self, *topics, **configs): ...
125
def subscribe(self, topics, pattern=None, listener=None): ...
126
def poll(self, timeout_ms=0, max_records=None, update_offsets=True): ...
127
def commit(self, offsets=None): ...
128
def close(self): ...
129
```
130
131
[Consumer Operations](./consumer.md)
132
133
### Admin API
134
135
Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs).
136
137
```python { .api }
138
class KafkaAdminClient:
139
def __init__(self, **configs): ...
140
def create_topics(self, topic_list, timeout_ms=None, validate_only=False): ...
141
def delete_topics(self, topics, timeout_ms=None): ...
142
def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False): ...
143
```
144
145
[Administrative Operations](./admin.md)
146
147
### Configuration and Connection Management
148
149
Connection management, SSL/SASL authentication, and client configuration options for connecting to Kafka clusters with various security configurations.
150
151
```python { .api }
152
class BrokerConnection:
153
def __init__(self, host, port, **configs): ...
154
def connect(self, timeout=None): ...
155
def close(self): ...
156
```
157
158
[Connection and Configuration](./connection.md)
159
160
### Data Structures and Types
161
162
Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.
163
164
```python { .api }
165
class TopicPartition:
166
def __init__(self, topic: str, partition: int): ...
167
168
class OffsetAndMetadata:
169
def __init__(self, offset: int, metadata: str): ...
170
```
171
172
[Data Structures](./structs.md)
173
174
### Error Handling
175
176
Comprehensive exception hierarchy for handling various Kafka-related errors including network issues, protocol errors, and authentication failures.
177
178
```python { .api }
179
class KafkaError(RuntimeError):
180
retriable: bool
181
invalid_metadata: bool
182
183
class NoBrokersAvailable(KafkaError): ...
184
class CommitFailedError(KafkaError): ...
185
```
186
187
[Error Handling](./errors.md)
188
189
### Serialization
190
191
Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes.
192
193
```python { .api }
194
class Serializer:
195
def serialize(self, topic: str, value) -> bytes: ...
196
def close(self): ...
197
198
class Deserializer:
199
def deserialize(self, topic: str, bytes_: bytes): ...
200
def close(self): ...
201
```
202
203
[Serialization](./serialization.md)
204
205
## Types
206
207
```python { .api }
208
# Core data structures
209
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
210
OffsetAndMetadata = namedtuple('OffsetAndMetadata', ['offset', 'metadata'])
211
BrokerMetadata = namedtuple('BrokerMetadata', ['nodeId', 'host', 'port', 'rack'])
212
PartitionMetadata = namedtuple('PartitionMetadata', ['topic', 'partition', 'leader', 'replicas', 'isr', 'error'])
213
OffsetAndTimestamp = namedtuple('OffsetAndTimestamp', ['offset', 'timestamp'])
214
215
# Admin types
216
class NewTopic:
217
def __init__(self, name: str, num_partitions: int, replication_factor: int, **configs): ...
218
219
class NewPartitions:
220
def __init__(self, total_count: int, new_assignments=None): ...
221
222
class ConfigResource:
223
def __init__(self, resource_type, name: str): ...
224
225
# Consumer callback interface
226
class ConsumerRebalanceListener:
227
def on_partitions_revoked(self, revoked): ...
228
def on_partitions_assigned(self, assigned): ...
229
230
# Connection management
231
class BrokerConnection:
232
def __init__(self, host: str, port: int, **configs): ...
233
def connect(self, timeout=None): ...
234
def close(self): ...
235
236
# Serialization interfaces
237
class Serializer:
238
def serialize(self, topic: str, value) -> bytes: ...
239
def close(self): ...
240
241
class Deserializer:
242
def deserialize(self, topic: str, bytes_: bytes): ...
243
def close(self): ...
244
245
# Future type for async operations
246
class FutureRecordMetadata:
247
def get(self, timeout=None): ...
248
def is_done(self) -> bool: ...
249
def add_callback(self, callback): ...
250
def add_errback(self, errback): ...
251
```