0
# Consumer Operations
1
2
High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based consumption and manual partition assignment.
3
4
## Capabilities
5
6
### KafkaConsumer
7
8
Main consumer class for consuming records from Kafka topics. Provides automatic group coordination, offset management, and flexible consumption patterns.
9
10
```python { .api }
11
class KafkaConsumer:
12
def __init__(self, *topics, **configs):
13
"""
14
Create a KafkaConsumer instance.
15
16
Args:
17
*topics: Topics to subscribe to initially
18
**configs: Consumer configuration options including:
19
bootstrap_servers (list): List of Kafka brokers
20
group_id (str): Consumer group identifier
21
key_deserializer (callable): Function to deserialize keys
22
value_deserializer (callable): Function to deserialize values
23
auto_offset_reset (str): What to do when no offset ('earliest', 'latest', 'none')
24
enable_auto_commit (bool): Whether to auto-commit offsets
25
auto_commit_interval_ms (int): Auto-commit interval
26
session_timeout_ms (int): Group session timeout
27
heartbeat_interval_ms (int): Heartbeat interval
28
max_poll_records (int): Maximum records per poll
29
max_poll_interval_ms (int): Maximum time between polls
30
client_id (str): Client identifier
31
security_protocol (str): Security protocol
32
ssl_context: SSL context
33
sasl_mechanism (str): SASL mechanism
34
consumer_timeout_ms (int): Consumer timeout
35
"""
36
37
def subscribe(self, topics=None, pattern=None, listener=None):
38
"""
39
Subscribe to topics or topic pattern.
40
41
Args:
42
topics (list): List of topic names to subscribe to
43
pattern (str): Regex pattern for topic matching
44
listener (ConsumerRebalanceListener): Rebalance event listener
45
"""
46
47
def unsubscribe(self):
48
"""Unsubscribe from all topics."""
49
50
def assign(self, partitions):
51
"""
52
Manually assign partitions to consumer.
53
54
Args:
55
partitions (list): List of TopicPartition objects
56
"""
57
58
def assignment(self):
59
"""
60
Get current partition assignment.
61
62
Returns:
63
set: Set of TopicPartition objects currently assigned
64
"""
65
66
def subscription(self):
67
"""
68
Get current topic subscription.
69
70
Returns:
71
set: Set of subscribed topic names
72
"""
73
74
def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
75
"""
76
Fetch records from Kafka.
77
78
Args:
79
timeout_ms (int): Maximum time to wait for records
80
max_records (int): Maximum number of records to return
81
update_offsets (bool): Whether to update fetch positions
82
83
Returns:
84
dict: Dictionary mapping TopicPartition to list of ConsumerRecord
85
"""
86
87
def commit(self, offsets=None):
88
"""
89
Commit offsets synchronously.
90
91
Args:
92
offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
93
"""
94
95
def commit_async(self, offsets=None, callback=None):
96
"""
97
Commit offsets asynchronously.
98
99
Args:
100
offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
101
callback (callable): Callback function for commit result
102
"""
103
104
def committed(self, partition, metadata=False):
105
"""
106
Get committed offset for partition.
107
108
Args:
109
partition (TopicPartition): Partition to check
110
metadata (bool): Whether to return metadata with offset
111
112
Returns:
113
int or OffsetAndMetadata: Committed offset
114
"""
115
116
def position(self, partition):
117
"""
118
Get current position (next fetch offset) for partition.
119
120
Args:
121
partition (TopicPartition): Partition to check
122
123
Returns:
124
int: Current fetch position
125
"""
126
127
def seek(self, partition, offset):
128
"""
129
Seek to specific offset for partition.
130
131
Args:
132
partition (TopicPartition): Partition to seek
133
offset (int): Offset to seek to
134
"""
135
136
def seek_to_beginning(self, *partitions):
137
"""
138
Seek to beginning of partitions.
139
140
Args:
141
*partitions: TopicPartition objects (empty = all assigned)
142
"""
143
144
def seek_to_end(self, *partitions):
145
"""
146
Seek to end of partitions.
147
148
Args:
149
*partitions: TopicPartition objects (empty = all assigned)
150
"""
151
152
def pause(self, *partitions):
153
"""
154
Pause consumption from partitions.
155
156
Args:
157
*partitions: TopicPartition objects to pause
158
"""
159
160
def resume(self, *partitions):
161
"""
162
Resume consumption from previously paused partitions.
163
164
Args:
165
*partitions: TopicPartition objects to resume
166
"""
167
168
def paused(self):
169
"""
170
Get currently paused partitions.
171
172
Returns:
173
set: Set of paused TopicPartition objects
174
"""
175
176
def topics(self):
177
"""
178
Get available topics from cluster.
179
180
Returns:
181
set: Set of available topic names
182
"""
183
184
def partitions_for_topic(self, topic):
185
"""
186
Get available partitions for topic.
187
188
Args:
189
topic (str): Topic name
190
191
Returns:
192
set: Set of partition numbers for topic
193
"""
194
195
def beginning_offsets(self, partitions):
196
"""
197
Get earliest available offsets for partitions.
198
199
Args:
200
partitions (list): List of TopicPartition objects
201
202
Returns:
203
dict: Dictionary mapping TopicPartition to offset
204
"""
205
206
def end_offsets(self, partitions):
207
"""
208
Get latest available offsets for partitions.
209
210
Args:
211
partitions (list): List of TopicPartition objects
212
213
Returns:
214
dict: Dictionary mapping TopicPartition to offset
215
"""
216
217
def close(self):
218
"""Close the consumer and release resources."""
219
220
def highwater(self, partition):
221
"""
222
Get high watermark offset for partition.
223
224
Args:
225
partition (TopicPartition): Partition to check
226
227
Returns:
228
int: High watermark offset
229
"""
230
231
def bootstrap_connected(self):
232
"""
233
Check if consumer has established bootstrap connection.
234
235
Returns:
236
bool: True if connected to at least one bootstrap server
237
"""
238
239
def offsets_for_times(self, timestamps):
240
"""
241
Get offsets for given timestamps.
242
243
Args:
244
timestamps (dict): Dictionary mapping TopicPartition to timestamp
245
246
Returns:
247
dict: Dictionary mapping TopicPartition to OffsetAndTimestamp
248
"""
249
250
def metrics(self, raw=False):
251
"""
252
Get consumer performance metrics.
253
254
Args:
255
raw (bool): If True, return raw metrics dict
256
257
Returns:
258
dict: Consumer performance metrics including fetch rates, lag, and timing
259
"""
260
```
261
262
### Consumer Records and Metadata
263
264
Data structures returned by the consumer for representing consumed records.
265
266
```python { .api }
267
class ConsumerRecord:
268
topic: str # Topic name
269
partition: int # Partition number
270
offset: int # Record offset
271
timestamp: int # Record timestamp
272
timestamp_type: int # Timestamp type
273
key: bytes # Record key (deserialized if deserializer provided)
274
value: bytes # Record value (deserialized if deserializer provided)
275
headers: list # List of (key, value) header tuples
276
checksum: int # Record checksum
277
serialized_key_size: int # Key size in bytes
278
serialized_value_size: int # Value size in bytes
279
280
class ConsumerRebalanceListener:
281
def on_partitions_revoked(self, revoked):
282
"""
283
Called when partitions are revoked from consumer.
284
285
Args:
286
revoked (list): List of TopicPartition objects being revoked
287
"""
288
289
def on_partitions_assigned(self, assigned):
290
"""
291
Called when new partitions are assigned to consumer.
292
293
Args:
294
assigned (list): List of TopicPartition objects being assigned
295
"""
296
```
297
298
## Usage Examples
299
300
### Basic Consumer Usage
301
302
```python
303
from kafka import KafkaConsumer
304
import json
305
306
# Create consumer with JSON deserialization
307
consumer = KafkaConsumer(
308
'my-topic',
309
bootstrap_servers=['localhost:9092'],
310
group_id='my-consumer-group',
311
auto_offset_reset='earliest',
312
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
313
)
314
315
# Consume messages
316
for message in consumer:
317
print(f"Topic: {message.topic}")
318
print(f"Partition: {message.partition}")
319
print(f"Offset: {message.offset}")
320
print(f"Value: {message.value}")
321
```
322
323
### Manual Offset Management
324
325
```python
326
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
327
328
consumer = KafkaConsumer(
329
bootstrap_servers=['localhost:9092'],
330
group_id='manual-commit-group',
331
enable_auto_commit=False # Disable auto-commit
332
)
333
334
consumer.subscribe(['my-topic'])
335
336
try:
337
while True:
338
# Poll for messages
339
message_batch = consumer.poll(timeout_ms=1000)
340
341
for topic_partition, messages in message_batch.items():
342
for message in messages:
343
# Process message
344
print(f"Processing: {message.value}")
345
346
# Manually commit after processing
347
consumer.commit({
348
topic_partition: OffsetAndMetadata(message.offset + 1, "processed")
349
})
350
351
except KeyboardInterrupt:
352
consumer.close()
353
```
354
355
### Manual Partition Assignment
356
357
```python
358
from kafka import KafkaConsumer, TopicPartition
359
360
consumer = KafkaConsumer(
361
bootstrap_servers=['localhost:9092'],
362
group_id=None # No group coordination
363
)
364
365
# Manually assign specific partitions
366
partitions = [
367
TopicPartition('topic-1', 0),
368
TopicPartition('topic-1', 1),
369
TopicPartition('topic-2', 0)
370
]
371
consumer.assign(partitions)
372
373
# Seek to specific offsets
374
consumer.seek(TopicPartition('topic-1', 0), 100)
375
consumer.seek_to_beginning(TopicPartition('topic-1', 1))
376
377
for message in consumer:
378
print(f"Manual assignment: {message.topic}:{message.partition}:{message.offset}")
379
```
380
381
### Consumer with Rebalance Listener
382
383
```python
384
from kafka import KafkaConsumer
385
from kafka.consumer.subscription_state import ConsumerRebalanceListener
386
387
class RebalanceListener(ConsumerRebalanceListener):
388
def __init__(self, consumer):
389
self.consumer = consumer
390
391
def on_partitions_revoked(self, revoked):
392
print(f"Partitions revoked: {revoked}")
393
# Commit current offsets before rebalance
394
self.consumer.commit()
395
396
def on_partitions_assigned(self, assigned):
397
print(f"Partitions assigned: {assigned}")
398
# Reset to beginning for new partitions
399
for partition in assigned:
400
self.consumer.seek_to_beginning(partition)
401
402
consumer = KafkaConsumer(
403
bootstrap_servers=['localhost:9092'],
404
group_id='rebalance-group'
405
)
406
407
listener = RebalanceListener(consumer)
408
consumer.subscribe(['my-topic'], listener=listener)
409
410
for message in consumer:
411
print(f"Consumed: {message.value}")
412
```
413
414
### Batch Processing
415
416
```python
417
from kafka import KafkaConsumer
418
419
consumer = KafkaConsumer(
420
'batch-topic',
421
bootstrap_servers=['localhost:9092'],
422
group_id='batch-processor',
423
max_poll_records=100, # Get up to 100 records per poll
424
enable_auto_commit=False
425
)
426
427
while True:
428
# Get batch of messages
429
message_batch = consumer.poll(timeout_ms=5000, max_records=100)
430
431
if not message_batch:
432
continue
433
434
# Process batch
435
batch_count = 0
436
for topic_partition, messages in message_batch.items():
437
for message in messages:
438
# Process message
439
batch_count += 1
440
441
print(f"Processed batch of {batch_count} messages")
442
443
# Commit batch
444
consumer.commit()
445
```
446
447
### Secure Consumer (SSL + SASL)
448
449
```python
450
from kafka import KafkaConsumer
451
452
consumer = KafkaConsumer(
453
'secure-topic',
454
bootstrap_servers=['secure-broker:9093'],
455
group_id='secure-group',
456
security_protocol='SASL_SSL',
457
sasl_mechanism='SCRAM-SHA-256',
458
sasl_plain_username='myuser',
459
sasl_plain_password='mypassword',
460
ssl_check_hostname=True,
461
ssl_cafile='ca-cert.pem'
462
)
463
464
for message in consumer:
465
print(f"Secure message: {message.value}")
466
```