0
# Core Producer and Consumer
1
2
The core producer and consumer classes provide fundamental Kafka functionality with high performance through the underlying librdkafka C library. These classes support all Kafka features including transactions, exactly-once semantics, and custom partitioning.
3
4
## Capabilities
5
6
### Producer
7
8
High-performance Kafka producer with support for asynchronous message delivery, custom partitioning, transactions, and delivery guarantees.
9
10
```python { .api }
11
class Producer:
12
def __init__(self, conf):
13
"""
14
Create a new Producer instance.
15
16
Args:
17
conf (dict): Configuration properties for the producer
18
"""
19
20
def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None):
21
"""
22
Produce message to topic.
23
24
Args:
25
topic (str): Topic to produce to
26
value (bytes, str, optional): Message value
27
key (bytes, str, optional): Message key for partitioning
28
partition (int, optional): Specific partition (-1 for automatic)
29
on_delivery (callable, optional): Delivery report callback
30
timestamp (int, optional): Message timestamp (0 for current time)
31
headers (dict, optional): Message headers
32
33
Raises:
34
BufferError: If local producer queue is full
35
KafkaException: For other produce errors
36
"""
37
38
def poll(self, timeout=-1):
39
"""
40
Poll for events and call registered callbacks.
41
42
Args:
43
timeout (float): Maximum time to wait in seconds (-1 for infinite)
44
45
Returns:
46
int: Number of events processed
47
"""
48
49
def flush(self, timeout=-1):
50
"""
51
Wait for all messages to be delivered.
52
53
Args:
54
timeout (float): Maximum time to wait in seconds (-1 for infinite)
55
56
Returns:
57
int: Number of messages still in queue (0 on success)
58
"""
59
60
def purge(self, in_queue=True, in_flight=True, blocking=True):
61
"""
62
Purge messages from internal queues.
63
64
Args:
65
in_queue (bool): Purge messages in local queue
66
in_flight (bool): Purge messages in flight to broker
67
blocking (bool): Block until purge is complete
68
69
Returns:
70
int: Number of messages purged
71
"""
72
73
def abort_transaction(self, timeout=-1):
74
"""
75
Abort ongoing transaction.
76
77
Args:
78
timeout (float): Maximum time to wait in seconds
79
"""
80
81
def begin_transaction(self):
82
"""
83
Begin a new transaction.
84
"""
85
86
def commit_transaction(self, timeout=-1):
87
"""
88
Commit current transaction.
89
90
Args:
91
timeout (float): Maximum time to wait in seconds
92
"""
93
94
def init_transactions(self, timeout=-1):
95
"""
96
Initialize transactions for this producer.
97
98
Args:
99
timeout (float): Maximum time to wait in seconds
100
"""
101
102
def send_offsets_to_transaction(self, positions, group_metadata, timeout=-1):
103
"""
104
Send consumer offsets to transaction.
105
106
Args:
107
positions (list): List of TopicPartition objects with offsets
108
group_metadata (ConsumerGroupMetadata): Consumer group metadata
109
timeout (float): Maximum time to wait in seconds
110
"""
111
112
def list_topics(self, topic=None, timeout=-1):
113
"""
114
Get metadata for topics.
115
116
Args:
117
topic (str, optional): Specific topic name to query
118
timeout (float): Maximum time to wait in seconds
119
120
Returns:
121
ClusterMetadata: Cluster and topic metadata
122
"""
123
```
124
125
### Consumer
126
127
High-performance Kafka consumer with support for consumer groups, manual/automatic offset management, and rebalancing.
128
129
```python { .api }
130
class Consumer:
131
def __init__(self, conf):
132
"""
133
Create a new Consumer instance.
134
135
Args:
136
conf (dict): Configuration properties for the consumer
137
"""
138
139
def subscribe(self, topics, listener=None):
140
"""
141
Subscribe to list of topics for automatic partition assignment.
142
143
Args:
144
topics (list): List of topic names to subscribe to
145
listener (RebalanceCallback, optional): Rebalance callback
146
"""
147
148
def unsubscribe(self):
149
"""
150
Unsubscribe from current topic subscription.
151
"""
152
153
def assign(self, partitions):
154
"""
155
Manually assign partitions to consume from.
156
157
Args:
158
partitions (list): List of TopicPartition objects
159
"""
160
161
def assignment(self):
162
"""
163
Get current partition assignment.
164
165
Returns:
166
list: List of assigned TopicPartition objects
167
"""
168
169
def unassign(self):
170
"""
171
Remove current partition assignment.
172
"""
173
174
def poll(self, timeout=-1):
175
"""
176
Poll for messages.
177
178
Args:
179
timeout (float): Maximum time to wait in seconds (-1 for infinite)
180
181
Returns:
182
Message: Message object or None if timeout
183
"""
184
185
def consume(self, num_messages=1, timeout=-1):
186
"""
187
Consume multiple messages.
188
189
Args:
190
num_messages (int): Maximum number of messages to return
191
timeout (float): Maximum time to wait in seconds
192
193
Returns:
194
list: List of Message objects
195
"""
196
197
def commit(self, message=None, offsets=None, asynchronous=True):
198
"""
199
Commit message offset or specified offsets.
200
201
Args:
202
message (Message, optional): Commit offset for this message
203
offsets (list, optional): List of TopicPartition objects with offsets
204
asynchronous (bool): Commit asynchronously if True
205
206
Returns:
207
list: Committed offsets if synchronous, None if asynchronous
208
"""
209
210
def committed(self, partitions, timeout=-1):
211
"""
212
Get committed offsets for partitions.
213
214
Args:
215
partitions (list): List of TopicPartition objects
216
timeout (float): Maximum time to wait in seconds
217
218
Returns:
219
list: List of TopicPartition objects with committed offsets
220
"""
221
222
def position(self, partitions):
223
"""
224
Get current position (next fetch offset) for partitions.
225
226
Args:
227
partitions (list): List of TopicPartition objects
228
229
Returns:
230
list: List of TopicPartition objects with current positions
231
"""
232
233
def seek(self, partition):
234
"""
235
Seek to offset for partition.
236
237
Args:
238
partition (TopicPartition): Partition with offset to seek to
239
"""
240
241
def pause(self, partitions):
242
"""
243
Pause consumption for partitions.
244
245
Args:
246
partitions (list): List of TopicPartition objects to pause
247
"""
248
249
def resume(self, partitions):
250
"""
251
Resume consumption for partitions.
252
253
Args:
254
partitions (list): List of TopicPartition objects to resume
255
"""
256
257
def get_watermark_offsets(self, partition, timeout=-1, cached=False):
258
"""
259
Get low and high watermark offsets for partition.
260
261
Args:
262
partition (TopicPartition): Partition to query
263
timeout (float): Maximum time to wait in seconds
264
cached (bool): Use cached values if available
265
266
Returns:
267
tuple: (low_offset, high_offset)
268
"""
269
270
def offsets_for_times(self, partitions, timeout=-1):
271
"""
272
Get offsets for timestamps.
273
274
Args:
275
partitions (list): List of TopicPartition objects with timestamps
276
timeout (float): Maximum time to wait in seconds
277
278
Returns:
279
list: List of TopicPartition objects with offsets for timestamps
280
"""
281
282
def close(self):
283
"""
284
Close the consumer and leave consumer group.
285
"""
286
287
def store_offsets(self, message=None, offsets=None):
288
"""
289
Store offset for message or specified offsets.
290
291
Args:
292
message (Message, optional): Store offset for this message
293
offsets (list, optional): List of TopicPartition objects with offsets
294
"""
295
296
def incremental_assign(self, partitions):
297
"""
298
Incrementally add partitions to assignment.
299
300
Args:
301
partitions (list): List of TopicPartition objects to add
302
"""
303
304
def incremental_unassign(self, partitions):
305
"""
306
Incrementally remove partitions from assignment.
307
308
Args:
309
partitions (list): List of TopicPartition objects to remove
310
"""
311
312
def list_topics(self, topic=None, timeout=-1):
313
"""
314
Get metadata for topics.
315
316
Args:
317
topic (str, optional): Specific topic name to query
318
timeout (float): Maximum time to wait in seconds
319
320
Returns:
321
ClusterMetadata: Cluster and topic metadata
322
"""
323
324
def consumer_group_metadata(self):
325
"""
326
Get consumer group metadata for transactional operations.
327
328
Returns:
329
ConsumerGroupMetadata: Consumer group metadata object
330
"""
331
```
332
333
### Message
334
335
Container for Kafka message data and metadata.
336
337
```python { .api }
338
class Message:
339
def error(self):
340
"""
341
Get message error.
342
343
Returns:
344
KafkaError: Error object or None if no error
345
"""
346
347
def key(self):
348
"""
349
Get message key.
350
351
Returns:
352
bytes: Message key or None
353
"""
354
355
def value(self):
356
"""
357
Get message value.
358
359
Returns:
360
bytes: Message value or None
361
"""
362
363
def topic(self):
364
"""
365
Get message topic.
366
367
Returns:
368
str: Topic name
369
"""
370
371
def partition(self):
372
"""
373
Get message partition.
374
375
Returns:
376
int: Partition number
377
"""
378
379
def offset(self):
380
"""
381
Get message offset.
382
383
Returns:
384
int: Message offset
385
"""
386
387
def timestamp(self):
388
"""
389
Get message timestamp.
390
391
Returns:
392
tuple: (timestamp_type, timestamp) where timestamp_type is one of:
393
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME
394
"""
395
396
def headers(self):
397
"""
398
Get message headers.
399
400
Returns:
401
dict: Dictionary of header key-value pairs or None
402
"""
403
404
def latency(self):
405
"""
406
Get message latency (produce time to broker acknowledgement).
407
408
Returns:
409
float: Latency in seconds or None
410
"""
411
412
def leader_epoch(self):
413
"""
414
Get leader epoch for the message.
415
416
Returns:
417
int: Leader epoch or None
418
"""
419
420
def set_key(self, key):
421
"""
422
Set message key.
423
424
Args:
425
key (bytes, str): New message key
426
"""
427
428
def set_value(self, value):
429
"""
430
Set message value.
431
432
Args:
433
value (bytes, str): New message value
434
"""
435
436
def set_headers(self, headers):
437
"""
438
Set message headers.
439
440
Args:
441
headers (dict): Dictionary of header key-value pairs
442
"""
443
```
444
445
### TopicPartition
446
447
Represents a Kafka topic partition with optional offset.
448
449
```python { .api }
450
class TopicPartition:
451
def __init__(self, topic, partition=None, offset=None):
452
"""
453
Create TopicPartition object.
454
455
Args:
456
topic (str): Topic name
457
partition (int, optional): Partition number
458
offset (int, optional): Offset value
459
"""
460
461
@property
462
def topic(self):
463
"""
464
Topic name.
465
466
Returns:
467
str: Topic name
468
"""
469
470
@property
471
def partition(self):
472
"""
473
Partition number.
474
475
Returns:
476
int: Partition number
477
"""
478
479
@property
480
def offset(self):
481
"""
482
Offset value.
483
484
Returns:
485
int: Offset value
486
"""
487
488
@offset.setter
489
def offset(self, value):
490
"""
491
Set offset value.
492
493
Args:
494
value (int): New offset value
495
"""
496
497
@property
498
def metadata(self):
499
"""
500
Partition metadata.
501
502
Returns:
503
str: Metadata string
504
"""
505
506
@property
507
def leader_epoch(self):
508
"""
509
Leader epoch.
510
511
Returns:
512
int: Leader epoch or None
513
"""
514
515
def __hash__(self):
516
"""Hash function for use in sets and dicts."""
517
518
def __eq__(self, other):
519
"""Equality comparison."""
520
521
def __lt__(self, other):
522
"""Less than comparison for sorting."""
523
524
def __str__(self):
525
"""String representation."""
526
```
527
528
### Uuid
529
530
Represents a UUID (Universally Unique Identifier).
531
532
```python { .api }
533
class Uuid:
534
def __init__(self, uuid_str=None):
535
"""
536
Create Uuid object.
537
538
Args:
539
uuid_str (str, optional): UUID string representation
540
"""
541
542
def __str__(self):
543
"""
544
Get string representation of UUID.
545
546
Returns:
547
str: UUID string
548
"""
549
550
def __eq__(self, other):
551
"""Equality comparison."""
552
553
def __hash__(self):
554
"""Hash function for use in sets and dicts."""
555
```
556
557
### Usage Examples
558
559
#### Basic Producer Usage
560
561
```python
562
from confluent_kafka import Producer
563
564
conf = {
565
'bootstrap.servers': 'localhost:9092',
566
'client.id': 'my-producer'
567
}
568
569
producer = Producer(conf)
570
571
def delivery_report(err, msg):
572
"""Called once for each message produced."""
573
if err is not None:
574
print(f'Message delivery failed: {err}')
575
else:
576
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
577
578
# Produce messages
579
for i in range(10):
580
producer.produce('my-topic',
581
key=f'key-{i}',
582
value=f'value-{i}',
583
callback=delivery_report)
584
585
# Wait for all messages to be delivered
586
producer.flush()
587
```
588
589
#### Basic Consumer Usage
590
591
```python
592
from confluent_kafka import Consumer, KafkaError
593
594
conf = {
595
'bootstrap.servers': 'localhost:9092',
596
'group.id': 'my-group',
597
'auto.offset.reset': 'earliest',
598
'enable.auto.commit': True
599
}
600
601
consumer = Consumer(conf)
602
consumer.subscribe(['my-topic'])
603
604
try:
605
while True:
606
msg = consumer.poll(timeout=1.0)
607
if msg is None:
608
continue
609
610
if msg.error():
611
if msg.error().code() == KafkaError._PARTITION_EOF:
612
print(f'End of partition {msg.topic()} [{msg.partition()}]')
613
else:
614
print(f'Error: {msg.error()}')
615
else:
616
print(f'Received: key={msg.key()}, value={msg.value()}, '
617
f'partition={msg.partition()}, offset={msg.offset()}')
618
619
finally:
620
consumer.close()
621
```
622
623
#### Transaction Usage
624
625
```python
626
from confluent_kafka import Producer
627
628
conf = {
629
'bootstrap.servers': 'localhost:9092',
630
'transactional.id': 'my-transactional-id',
631
'enable.idempotence': True
632
}
633
634
producer = Producer(conf)
635
636
# Initialize transactions
637
producer.init_transactions()
638
639
try:
640
# Begin transaction
641
producer.begin_transaction()
642
643
# Produce messages within transaction
644
for i in range(5):
645
producer.produce('my-topic', f'transactional-message-{i}')
646
647
# Commit transaction
648
producer.commit_transaction()
649
print('Transaction committed successfully')
650
651
except Exception as e:
652
print(f'Transaction failed: {e}')
653
producer.abort_transaction()
654
```
655
656
#### Manual Partition Assignment
657
658
```python
659
from confluent_kafka import Consumer, TopicPartition
660
661
conf = {
662
'bootstrap.servers': 'localhost:9092',
663
'group.id': 'my-group',
664
'enable.auto.commit': False
665
}
666
667
consumer = Consumer(conf)
668
669
# Manually assign specific partitions
670
partitions = [
671
TopicPartition('my-topic', 0, offset=100),
672
TopicPartition('my-topic', 1, offset=200)
673
]
674
consumer.assign(partitions)
675
676
try:
677
while True:
678
msg = consumer.poll(1.0)
679
if msg is None:
680
continue
681
682
if not msg.error():
683
print(f'Message: {msg.value()}')
684
# Manual offset commit
685
consumer.commit(message=msg)
686
687
finally:
688
consumer.close()
689
```