0
# Consumer API
1
2
High-level consumer for consuming records from Kafka topics with comprehensive support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.
3
4
## Capabilities
5
6
### KafkaConsumer
7
8
Main consumer class providing high-level interface for consuming records from Kafka topics. Supports both subscribe (automatic partition assignment) and assign (manual partition assignment) modes.
9
10
```python { .api }
11
class KafkaConsumer:
12
def __init__(self, *topics, **configs):
13
"""
14
Initialize Kafka consumer.
15
16
Parameters:
17
- *topics: str, optional topic names to subscribe to
18
- **configs: consumer configuration options
19
- bootstrap_servers: List[str], broker addresses
20
- group_id: str, consumer group identifier
21
- key_deserializer: Callable, key deserialization function
22
- value_deserializer: Callable, value deserialization function
23
- auto_offset_reset: str, 'earliest' or 'latest' for new groups
24
- enable_auto_commit: bool, automatic offset commits (default: True)
25
- auto_commit_interval_ms: int, auto-commit interval (default: 5000)
26
- session_timeout_ms: int, session timeout (default: 10000)
27
- heartbeat_interval_ms: int, heartbeat interval (default: 3000)
28
- max_poll_records: int, max records per poll (default: 500)
29
- fetch_min_bytes: int, minimum fetch size (default: 1)
30
- fetch_max_wait_ms: int, max fetch wait time (default: 500)
31
- max_partition_fetch_bytes: int, max bytes per partition (default: 1MB)
32
- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
33
- api_version: tuple, broker API version or 'auto'
34
"""
35
36
def subscribe(self, topics=(), pattern=None, listener=None):
37
"""
38
Subscribe to topics with automatic partition assignment.
39
40
Parameters:
41
- topics: List[str], topic names to subscribe to
42
- pattern: str, regex pattern for topic matching
43
- listener: ConsumerRebalanceListener, rebalance callback
44
"""
45
46
def assign(self, partitions):
47
"""
48
Manually assign specific partitions to consumer.
49
50
Parameters:
51
- partitions: List[TopicPartition], partitions to assign
52
"""
53
54
def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
55
"""
56
Fetch records from assigned partitions.
57
58
Parameters:
59
- timeout_ms: int, polling timeout in milliseconds
60
- max_records: int, maximum records to return
61
- update_offsets: bool, whether to update high-water mark
62
63
Returns:
64
- ConsumerRecords: mapping of TopicPartition to list of ConsumerRecord
65
"""
66
67
def commit(self, offsets=None):
68
"""
69
Commit offsets to Kafka.
70
71
Parameters:
72
- offsets: Dict[TopicPartition, OffsetAndMetadata], specific offsets to commit
73
If None, commits current position for all assigned partitions
74
"""
75
76
def commit_async(self, offsets=None, callback=None):
77
"""
78
Asynchronous offset commit.
79
80
Parameters:
81
- offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to commit
82
- callback: Callable, completion callback function
83
"""
84
85
def seek(self, partition, offset):
86
"""
87
Seek to specific offset in partition.
88
89
Parameters:
90
- partition: TopicPartition, target partition
91
- offset: int, target offset
92
"""
93
94
def seek_to_beginning(self, *partitions):
95
"""
96
Seek to beginning of partitions.
97
98
Parameters:
99
- *partitions: TopicPartition, partitions to seek (all assigned if none)
100
"""
101
102
def seek_to_end(self, *partitions):
103
"""
104
Seek to end of partitions.
105
106
Parameters:
107
- *partitions: TopicPartition, partitions to seek (all assigned if none)
108
"""
109
110
def position(self, partition):
111
"""
112
Get current position (next fetch offset) for partition.
113
114
Parameters:
115
- partition: TopicPartition, target partition
116
117
Returns:
118
- int: current position offset
119
"""
120
121
def committed(self, partition):
122
"""
123
Get last committed offset for partition.
124
125
Parameters:
126
- partition: TopicPartition, target partition
127
128
Returns:
129
- OffsetAndMetadata: last committed offset with metadata
130
"""
131
132
def pause(self, *partitions):
133
"""
134
Suspend fetching from partitions.
135
136
Parameters:
137
- *partitions: TopicPartition, partitions to pause
138
"""
139
140
def resume(self, *partitions):
141
"""
142
Resume fetching from partitions.
143
144
Parameters:
145
- *partitions: TopicPartition, partitions to resume
146
"""
147
148
def paused(self):
149
"""
150
Get currently paused partitions.
151
152
Returns:
153
- Set[TopicPartition]: paused partitions
154
"""
155
156
def close(self, autocommit=True):
157
"""
158
Close consumer and clean up resources.
159
160
Parameters:
161
- autocommit: bool, commit offsets before closing
162
"""
163
164
def subscription(self):
165
"""
166
Get current topic subscription.
167
168
Returns:
169
- Set[str]: subscribed topic names
170
"""
171
172
def assignment(self):
173
"""
174
Get current partition assignment.
175
176
Returns:
177
- Set[TopicPartition]: assigned partitions
178
"""
179
180
def beginning_offsets(self, partitions):
181
"""
182
Get earliest available offsets for partitions.
183
184
Parameters:
185
- partitions: List[TopicPartition], target partitions
186
187
Returns:
188
- Dict[TopicPartition, int]: earliest offsets
189
"""
190
191
def end_offsets(self, partitions):
192
"""
193
Get latest offsets for partitions.
194
195
Parameters:
196
- partitions: List[TopicPartition], target partitions
197
198
Returns:
199
- Dict[TopicPartition, int]: latest offsets
200
"""
201
202
def offsets_for_times(self, timestamps):
203
"""
204
Get offsets for specific timestamps.
205
206
Parameters:
207
- timestamps: Dict[TopicPartition, int], timestamp mapping
208
209
Returns:
210
- Dict[TopicPartition, OffsetAndTimestamp]: offset and timestamp info
211
"""
212
213
def metrics(self):
214
"""
215
Get consumer metrics.
216
217
Returns:
218
- Dict[str, float]: current metric values
219
"""
220
```
221
222
### Consumer Rebalance Listener
223
224
Abstract base class for handling partition rebalancing events in consumer groups. Implement this interface to perform cleanup or initialization when partitions are assigned or revoked.
225
226
```python { .api }
227
class ConsumerRebalanceListener:
228
def on_partitions_revoked(self, revoked):
229
"""
230
Called before partitions are reassigned.
231
232
Use this callback to commit offsets and clean up state
233
for partitions that are being revoked.
234
235
Parameters:
236
- revoked: List[TopicPartition], partitions being revoked
237
"""
238
239
def on_partitions_assigned(self, assigned):
240
"""
241
Called after partitions are assigned.
242
243
Use this callback to set up state or seek to specific
244
offsets for newly assigned partitions.
245
246
Parameters:
247
- assigned: List[TopicPartition], partitions being assigned
248
"""
249
```
250
251
### Consumer Records
252
253
Result of consumer poll() operation containing records organized by partition.
254
255
```python { .api }
256
class ConsumerRecords:
257
def __init__(self, record_map):
258
"""
259
Container for poll() results.
260
261
Parameters:
262
- record_map: Dict[TopicPartition, List[ConsumerRecord]]
263
"""
264
265
def __iter__(self):
266
"""Iterate over all records across all partitions."""
267
268
def __len__(self):
269
"""Total number of records across all partitions."""
270
271
def __bool__(self):
272
"""True if contains any records."""
273
274
def records(self, partition):
275
"""
276
Get records for specific partition.
277
278
Parameters:
279
- partition: TopicPartition, target partition
280
281
Returns:
282
- List[ConsumerRecord]: records for partition
283
"""
284
285
def by_topic(self):
286
"""
287
Group records by topic.
288
289
Returns:
290
- Dict[str, List[ConsumerRecord]]: records grouped by topic
291
"""
292
```
293
294
### Consumer Record
295
296
Individual record consumed from Kafka containing message data and metadata.
297
298
```python { .api }
299
class ConsumerRecord:
300
topic: str # Topic name
301
partition: int # Partition number
302
offset: int # Message offset
303
timestamp: int # Message timestamp (milliseconds)
304
timestamp_type: int # Timestamp type (0=CreateTime, 1=LogAppendTime)
305
key: bytes # Message key (raw bytes)
306
value: bytes # Message value (raw bytes)
307
headers: List[Tuple[str, bytes]] # Message headers
308
checksum: int # Message checksum
309
serialized_key_size: int # Serialized key size
310
serialized_value_size: int # Serialized value size
311
leader_epoch: int # Leader epoch
312
```
313
314
## Usage Examples
315
316
### Basic Consumer Group
317
318
```python
319
from kafka import KafkaConsumer
320
import json
321
322
# Create consumer with automatic offset management
323
consumer = KafkaConsumer(
324
'my-topic',
325
bootstrap_servers=['localhost:9092'],
326
group_id='my-group',
327
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
328
auto_offset_reset='earliest',
329
enable_auto_commit=True,
330
auto_commit_interval_ms=1000
331
)
332
333
# Process messages
334
for message in consumer:
335
print(f"Topic: {message.topic}")
336
print(f"Partition: {message.partition}")
337
print(f"Offset: {message.offset}")
338
print(f"Key: {message.key}")
339
print(f"Value: {message.value}")
340
print(f"Timestamp: {message.timestamp}")
341
342
consumer.close()
343
```
344
345
### Manual Partition Assignment
346
347
```python
348
from kafka import KafkaConsumer, TopicPartition
349
350
consumer = KafkaConsumer(
351
bootstrap_servers=['localhost:9092'],
352
group_id=None, # No consumer group
353
value_deserializer=lambda m: m.decode('utf-8')
354
)
355
356
# Manually assign specific partitions
357
partitions = [
358
TopicPartition('topic1', 0),
359
TopicPartition('topic1', 1),
360
TopicPartition('topic2', 0)
361
]
362
consumer.assign(partitions)
363
364
# Seek to specific positions
365
consumer.seek(TopicPartition('topic1', 0), 100)
366
consumer.seek_to_end(TopicPartition('topic1', 1))
367
368
# Poll for messages
369
while True:
370
records = consumer.poll(timeout_ms=1000)
371
for partition, messages in records.items():
372
for message in messages:
373
print(f"Partition {partition}: {message.value}")
374
```
375
376
### Manual Offset Management
377
378
```python
379
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
380
381
consumer = KafkaConsumer(
382
'my-topic',
383
bootstrap_servers=['localhost:9092'],
384
group_id='manual-commit-group',
385
enable_auto_commit=False, # Disable auto-commit
386
auto_offset_reset='earliest'
387
)
388
389
try:
390
while True:
391
records = consumer.poll(timeout_ms=1000)
392
393
for partition, messages in records.items():
394
for message in messages:
395
# Process message
396
print(f"Processing: {message.value}")
397
398
# Manually commit after processing
399
consumer.commit({
400
partition: OffsetAndMetadata(message.offset + 1, None)
401
})
402
403
except KeyboardInterrupt:
404
pass
405
finally:
406
consumer.close()
407
```
408
409
### Rebalance Listener
410
411
```python
412
from kafka import KafkaConsumer
413
from kafka.consumer import ConsumerRebalanceListener
414
import logging
415
416
class MyRebalanceListener(ConsumerRebalanceListener):
417
def __init__(self, consumer):
418
self.consumer = consumer
419
420
def on_partitions_revoked(self, revoked):
421
logging.info(f"Partitions revoked: {revoked}")
422
# Commit current offsets before partitions are reassigned
423
self.consumer.commit()
424
425
def on_partitions_assigned(self, assigned):
426
logging.info(f"Partitions assigned: {assigned}")
427
# Could seek to specific offsets or perform other setup
428
429
consumer = KafkaConsumer(
430
bootstrap_servers=['localhost:9092'],
431
group_id='rebalance-group'
432
)
433
434
listener = MyRebalanceListener(consumer)
435
consumer.subscribe(['my-topic'], listener=listener)
436
437
for message in consumer:
438
print(f"Received: {message.value}")
439
```
440
441
### Batch Processing
442
443
```python
444
from kafka import KafkaConsumer
445
446
consumer = KafkaConsumer(
447
'batch-topic',
448
bootstrap_servers=['localhost:9092'],
449
group_id='batch-group',
450
max_poll_records=100, # Process up to 100 records per poll
451
enable_auto_commit=False
452
)
453
454
def process_batch(records):
455
"""Process a batch of records together."""
456
batch_data = []
457
for record in records:
458
batch_data.append(record.value)
459
460
# Simulate batch processing
461
print(f"Processing batch of {len(batch_data)} records")
462
# ... perform batch operation ...
463
464
return True
465
466
try:
467
while True:
468
# Poll for a batch of records
469
record_batch = consumer.poll(timeout_ms=5000, max_records=50)
470
471
if record_batch:
472
# Flatten records from all partitions
473
all_records = []
474
last_offsets = {}
475
476
for partition, records in record_batch.items():
477
all_records.extend(records)
478
if records:
479
last_offsets[partition] = records[-1].offset + 1
480
481
# Process the batch
482
if process_batch(all_records):
483
# Commit offsets for successfully processed batch
484
offset_data = {
485
partition: OffsetAndMetadata(offset, None)
486
for partition, offset in last_offsets.items()
487
}
488
consumer.commit(offset_data)
489
490
except KeyboardInterrupt:
491
pass
492
finally:
493
consumer.close()
494
```