0
# Producer API
1
2
High-level producer for publishing records to Kafka topics with comprehensive support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.
3
4
## Capabilities
5
6
### KafkaProducer
7
8
Main producer class providing high-level interface for sending records to Kafka topics. Supports synchronous and asynchronous sending with configurable batching, compression, and delivery guarantees.
9
10
```python { .api }
11
class KafkaProducer:
12
def __init__(self, **configs):
13
"""
14
Initialize Kafka producer.
15
16
Configuration Parameters:
17
- bootstrap_servers: List[str], broker addresses
18
- key_serializer: Callable, key serialization function
19
- value_serializer: Callable, value serialization function
20
- acks: str|int, acknowledgment policy ('0', '1', 'all'/-1)
21
- retries: int, number of retries for failed sends (default: 2147483647)
22
- batch_size: int, batch size in bytes (default: 16384)
23
- linger_ms: int, time to wait for batching (default: 0)
24
- buffer_memory: int, total memory for buffering (default: 33554432)
25
- compression_type: str, 'none', 'gzip', 'snappy', 'lz4', 'zstd'
26
- max_in_flight_requests_per_connection: int, max unacknowledged requests (default: 5)
27
- request_timeout_ms: int, request timeout (default: 30000)
28
- delivery_timeout_ms: int, delivery timeout (default: 120000)
29
- max_request_size: int, max request size (default: 1048576)
30
- send_buffer_bytes: int, TCP send buffer size (default: 131072)
31
- receive_buffer_bytes: int, TCP receive buffer size (default: 32768)
32
- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
33
- api_version: tuple, broker API version or 'auto'
34
- enable_idempotence: bool, enable idempotent producer (default: False)
35
- transactional_id: str, transactional producer ID
36
- partitioner: Callable, custom partitioner function
37
- max_block_ms: int, max time to block on send (default: 60000)
38
"""
39
40
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None):
41
"""
42
Asynchronously send record to topic.
43
44
Parameters:
45
- topic: str, target topic name
46
- value: any, message value (will be serialized)
47
- key: any, message key (will be serialized)
48
- partition: int, specific partition (optional)
49
- timestamp_ms: int, message timestamp in milliseconds
50
- headers: List[Tuple[str, bytes]], message headers
51
52
Returns:
53
- FutureRecordMetadata: future representing the send result
54
"""
55
56
def flush(self, timeout=None):
57
"""
58
Force all buffered records to be sent immediately.
59
60
Parameters:
61
- timeout: float, max time to wait for completion (seconds)
62
63
Raises:
64
- KafkaTimeoutError: if timeout exceeded
65
"""
66
67
def close(self, timeout=None):
68
"""
69
Close producer and clean up resources.
70
71
Parameters:
72
- timeout: float, max time to wait for completion (seconds)
73
"""
74
75
def partitions_for(self, topic):
76
"""
77
Get available partitions for topic.
78
79
Parameters:
80
- topic: str, topic name
81
82
Returns:
83
- Set[int]: available partition numbers
84
"""
85
86
def metrics(self):
87
"""
88
Get producer metrics.
89
90
Returns:
91
- Dict[str, float]: current metric values
92
"""
93
94
def bootstrap_connected(self):
95
"""
96
Check if connected to at least one bootstrap server.
97
98
Returns:
99
- bool: True if connected
100
"""
101
102
# Transactional methods (requires Kafka 0.11+)
103
def init_transactions(self):
104
"""
105
Initialize transactions for transactional producer.
106
Must be called before any other transaction methods.
107
108
Raises:
109
- IllegalStateError: if transactional_id not configured
110
- ProducerFencedError: if another producer with same ID is active
111
"""
112
113
def begin_transaction(self):
114
"""
115
Begin a new transaction.
116
Must call init_transactions() first.
117
118
Raises:
119
- IllegalStateError: if no transaction initialized
120
- ProducerFencedError: if producer is fenced
121
"""
122
123
def send_offsets_to_transaction(self, offsets, consumer_group_id):
124
"""
125
Add consumer offsets to current transaction.
126
127
Parameters:
128
- offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to include
129
- consumer_group_id: str, consumer group ID
130
131
Raises:
132
- IllegalStateError: if no active transaction
133
- ProducerFencedError: if producer is fenced
134
"""
135
136
def commit_transaction(self):
137
"""
138
Commit the current transaction.
139
140
Raises:
141
- IllegalStateError: if no active transaction
142
- ProducerFencedError: if producer is fenced
143
"""
144
145
def abort_transaction(self):
146
"""
147
Abort the current transaction.
148
149
Raises:
150
- IllegalStateError: if no active transaction
151
- ProducerFencedError: if producer is fenced
152
"""
153
```
154
155
### Future Record Metadata
156
157
Future object returned by producer.send() representing an asynchronous send operation.
158
159
```python { .api }
160
class FutureRecordMetadata:
161
def get(self, timeout=None):
162
"""
163
Block until send completes and return metadata.
164
165
Parameters:
166
- timeout: float, max time to wait (seconds)
167
168
Returns:
169
- RecordMetadata: send result metadata
170
171
Raises:
172
- KafkaError: if send failed
173
- KafkaTimeoutError: if timeout exceeded
174
"""
175
176
def add_callback(self, callback):
177
"""
178
Add success callback.
179
180
Parameters:
181
- callback: Callable[[RecordMetadata], None], success callback
182
"""
183
184
def add_errback(self, errback):
185
"""
186
Add error callback.
187
188
Parameters:
189
- errback: Callable[[Exception], None], error callback
190
"""
191
192
def is_done(self):
193
"""
194
Check if send operation completed.
195
196
Returns:
197
- bool: True if completed (success or failure)
198
"""
199
200
def succeeded(self):
201
"""
202
Check if send succeeded.
203
204
Returns:
205
- bool: True if succeeded
206
"""
207
208
def failed(self):
209
"""
210
Check if send failed.
211
212
Returns:
213
- bool: True if failed
214
"""
215
```
216
217
### Record Metadata
218
219
Metadata returned after successful record send containing partition and offset information.
220
221
```python { .api }
222
class RecordMetadata:
223
topic: str # Topic name
224
partition: int # Partition number
225
offset: int # Record offset
226
timestamp: int # Record timestamp
227
checksum: int # Record checksum
228
serialized_key_size: int # Serialized key size
229
serialized_value_size: int # Serialized value size
230
leader_epoch: int # Leader epoch
231
```
232
233
### Partitioner Interface
234
235
Interface for custom partitioning strategies. The default partitioner uses murmur2 hash for keyed messages and round-robin for keyless messages.
236
237
```python { .api }
238
class DefaultPartitioner:
239
def __call__(self, key, all_partitions, available_partitions):
240
"""
241
Select partition for message.
242
243
Parameters:
244
- key: bytes, message key (may be None)
245
- all_partitions: List[int], all partition numbers
246
- available_partitions: List[int], available partition numbers
247
248
Returns:
249
- int: selected partition number
250
"""
251
252
def murmur2(data):
253
"""
254
Pure Python murmur2 hash implementation.
255
256
Parameters:
257
- data: bytes, data to hash
258
259
Returns:
260
- int: hash value
261
"""
262
```
263
264
### Serializer Interface
265
266
Abstract base classes for custom key and value serialization.
267
268
```python { .api }
269
class Serializer:
270
def serialize(self, topic, value):
271
"""
272
Serialize value to bytes.
273
274
Parameters:
275
- topic: str, topic name
276
- value: any, value to serialize
277
278
Returns:
279
- bytes: serialized value
280
"""
281
282
def close(self):
283
"""Clean up resources."""
284
```
285
286
## Usage Examples
287
288
### Basic Producer
289
290
```python
291
from kafka import KafkaProducer
292
import json
293
294
# Create producer with JSON serialization
295
producer = KafkaProducer(
296
bootstrap_servers=['localhost:9092'],
297
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
298
key_serializer=lambda k: k.encode('utf-8') if k else None
299
)
300
301
# Send message asynchronously
302
future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')
303
304
# Block for acknowledgment
305
try:
306
record_metadata = future.get(timeout=10)
307
print(f"Message sent to topic {record_metadata.topic} "
308
f"partition {record_metadata.partition} "
309
f"offset {record_metadata.offset}")
310
except Exception as e:
311
print(f"Send failed: {e}")
312
313
producer.close()
314
```
315
316
### Fire and Forget
317
318
```python
319
from kafka import KafkaProducer
320
321
producer = KafkaProducer(
322
bootstrap_servers=['localhost:9092'],
323
value_serializer=str.encode,
324
acks=0 # Fire and forget
325
)
326
327
# Send many messages without waiting for acknowledgment
328
for i in range(1000):
329
producer.send('events', value=f'Event {i}')
330
331
# Force send all buffered messages
332
producer.flush()
333
producer.close()
334
```
335
336
### Synchronous Sending
337
338
```python
339
from kafka import KafkaProducer
340
from kafka.errors import KafkaError
341
342
producer = KafkaProducer(
343
bootstrap_servers=['localhost:9092'],
344
value_serializer=str.encode,
345
acks='all', # Wait for all replicas
346
retries=3
347
)
348
349
def send_sync(topic, value, key=None):
350
"""Send message synchronously with error handling."""
351
try:
352
future = producer.send(topic, value=value, key=key)
353
record_metadata = future.get(timeout=30)
354
return record_metadata
355
except KafkaError as e:
356
print(f"Failed to send message: {e}")
357
return None
358
359
# Send messages synchronously
360
metadata = send_sync('orders', 'Order #12345', key='customer-123')
361
if metadata:
362
print(f"Order sent successfully to offset {metadata.offset}")
363
364
producer.close()
365
```
366
367
### Asynchronous with Callbacks
368
369
```python
370
from kafka import KafkaProducer
371
import time
372
373
producer = KafkaProducer(
374
bootstrap_servers=['localhost:9092'],
375
value_serializer=str.encode
376
)
377
378
def on_success(record_metadata):
379
print(f"Message sent successfully: topic={record_metadata.topic} "
380
f"partition={record_metadata.partition} offset={record_metadata.offset}")
381
382
def on_error(exception):
383
print(f"Message send failed: {exception}")
384
385
# Send with callbacks
386
for i in range(10):
387
future = producer.send('async-topic', value=f'Message {i}')
388
future.add_callback(on_success)
389
future.add_errback(on_error)
390
391
# Give time for callbacks to complete
392
time.sleep(2)
393
producer.close()
394
```
395
396
### Batching and Compression
397
398
```python
399
from kafka import KafkaProducer
400
401
# Configure for high throughput with batching and compression
402
producer = KafkaProducer(
403
bootstrap_servers=['localhost:9092'],
404
value_serializer=str.encode,
405
406
# Batching configuration
407
batch_size=32768, # 32KB batches
408
linger_ms=100, # Wait up to 100ms to fill batch
409
410
# Compression
411
compression_type='lz4',
412
413
# Memory and throughput
414
buffer_memory=67108864, # 64MB buffer
415
max_in_flight_requests_per_connection=10
416
)
417
418
# Send many messages - will be batched and compressed
419
for i in range(10000):
420
producer.send('high-throughput', value=f'Batch message {i}')
421
422
producer.flush()
423
producer.close()
424
```
425
426
### Custom Partitioner
427
428
```python
429
from kafka import KafkaProducer
430
import hashlib
431
432
def custom_partitioner(key, all_partitions, available_partitions):
433
"""Custom partitioner using SHA-256 hash."""
434
if key is None:
435
# Round-robin for messages without keys
436
partition = hash(time.time()) % len(available_partitions)
437
return available_partitions[partition]
438
else:
439
# Hash-based partitioning for keyed messages
440
hash_value = int(hashlib.sha256(key).hexdigest(), 16)
441
return all_partitions[hash_value % len(all_partitions)]
442
443
producer = KafkaProducer(
444
bootstrap_servers=['localhost:9092'],
445
value_serializer=str.encode,
446
key_serializer=str.encode,
447
partitioner=custom_partitioner
448
)
449
450
# Messages with same key will go to same partition
451
producer.send('partitioned-topic', key='user-123', value='Event A')
452
producer.send('partitioned-topic', key='user-123', value='Event B')
453
producer.send('partitioned-topic', key='user-456', value='Event C')
454
455
producer.close()
456
```
457
458
### Idempotent Producer
459
460
```python
461
from kafka import KafkaProducer
462
463
# Enable idempotence for exactly-once semantics
464
producer = KafkaProducer(
465
bootstrap_servers=['localhost:9092'],
466
value_serializer=str.encode,
467
enable_idempotence=True,
468
acks='all',
469
retries=10,
470
max_in_flight_requests_per_connection=1 # Required for idempotence
471
)
472
473
# Producer will automatically retry and deduplicate
474
for i in range(100):
475
producer.send('exactly-once-topic', value=f'Idempotent message {i}')
476
477
producer.close()
478
```
479
480
### Transactional Producer
481
482
```python
483
from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
484
485
# Configure transactional producer
486
producer = KafkaProducer(
487
bootstrap_servers=['localhost:9092'],
488
value_serializer=str.encode,
489
transactional_id='my-transactional-id', # Required for transactions
490
enable_idempotence=True, # Required for transactions
491
acks='all' # Required for transactions
492
)
493
494
# Initialize transactions (must be called once)
495
producer.init_transactions()
496
497
try:
498
# Begin transaction
499
producer.begin_transaction()
500
501
# Send messages as part of transaction
502
producer.send('orders', value='Order #1001')
503
producer.send('inventory', value='Item #5001 reserved')
504
505
# Include consumer offsets in transaction (consume-transform-produce pattern)
506
consumer_offsets = {
507
TopicPartition('input-topic', 0): OffsetAndMetadata(100, None)
508
}
509
producer.send_offsets_to_transaction(consumer_offsets, 'my-consumer-group')
510
511
# Commit transaction - all messages become visible atomically
512
producer.commit_transaction()
513
print("Transaction committed successfully")
514
515
except Exception as e:
516
print(f"Transaction failed: {e}")
517
# Abort transaction - no messages become visible
518
producer.abort_transaction()
519
520
finally:
521
producer.close()
522
```
523
524
### Custom Serializer
525
526
```python
527
from kafka import KafkaProducer
528
from kafka.serializer import Serializer
529
import pickle
530
import json
531
532
class PickleSerializer(Serializer):
533
"""Custom serializer using pickle."""
534
535
def serialize(self, topic, value):
536
if value is None:
537
return None
538
return pickle.dumps(value)
539
540
class JSONSerializer(Serializer):
541
"""Custom JSON serializer with error handling."""
542
543
def serialize(self, topic, value):
544
if value is None:
545
return None
546
try:
547
return json.dumps(value).encode('utf-8')
548
except (TypeError, ValueError) as e:
549
raise ValueError(f"Cannot serialize {value} to JSON: {e}")
550
551
producer = KafkaProducer(
552
bootstrap_servers=['localhost:9092'],
553
key_serializer=str.encode,
554
value_serializer=JSONSerializer()
555
)
556
557
# Send complex objects
558
producer.send('json-topic',
559
key='order-123',
560
value={'order_id': 123, 'items': ['item1', 'item2'], 'total': 99.99})
561
562
producer.close()
563
```
564
565
### Error Handling and Retries
566
567
```python
568
from kafka import KafkaProducer
569
from kafka.errors import KafkaError, KafkaTimeoutError, MessageSizeTooLargeError
570
571
producer = KafkaProducer(
572
bootstrap_servers=['localhost:9092'],
573
value_serializer=str.encode,
574
retries=5,
575
retry_backoff_ms=1000,
576
request_timeout_ms=30000
577
)
578
579
def robust_send(topic, value, key=None, max_retries=3):
580
"""Send with custom retry logic and error handling."""
581
582
for attempt in range(max_retries + 1):
583
try:
584
future = producer.send(topic, value=value, key=key)
585
record_metadata = future.get(timeout=30)
586
return record_metadata
587
588
except MessageSizeTooLargeError:
589
print(f"Message too large for topic {topic}")
590
return None
591
592
except KafkaTimeoutError:
593
print(f"Timeout on attempt {attempt + 1}")
594
if attempt == max_retries:
595
print("Max retries reached, giving up")
596
return None
597
time.sleep(2 ** attempt) # Exponential backoff
598
599
except KafkaError as e:
600
print(f"Kafka error on attempt {attempt + 1}: {e}")
601
if not e.retriable or attempt == max_retries:
602
return None
603
time.sleep(1)
604
605
return None
606
607
# Use robust sending
608
result = robust_send('critical-topic', 'Important message')
609
if result:
610
print(f"Message sent successfully: offset {result.offset}")
611
else:
612
print("Failed to send message after all retries")
613
614
producer.close()
615
```
616
617
## Performance Considerations
618
619
### Throughput Optimization
620
621
```python
622
# High-throughput producer configuration
623
producer = KafkaProducer(
624
bootstrap_servers=['localhost:9092'],
625
value_serializer=str.encode,
626
627
# Increase batch size for better throughput
628
batch_size=65536, # 64KB batches
629
linger_ms=50, # Small delay to fill batches
630
631
# Use fast compression
632
compression_type='lz4', # Fast compression
633
634
# Increase memory and concurrent requests
635
buffer_memory=134217728, # 128MB buffer
636
max_in_flight_requests_per_connection=10,
637
638
# Reduce acknowledgment requirements
639
acks=1, # Only wait for leader
640
641
# Increase network buffers
642
send_buffer_bytes=262144, # 256KB send buffer
643
receive_buffer_bytes=65536 # 64KB receive buffer
644
)
645
```
646
647
### Latency Optimization
648
649
```python
650
# Low-latency producer configuration
651
producer = KafkaProducer(
652
bootstrap_servers=['localhost:9092'],
653
value_serializer=str.encode,
654
655
# Minimize batching delays
656
batch_size=1, # Send immediately
657
linger_ms=0, # No delay
658
659
# No compression for speed
660
compression_type='none',
661
662
# Faster acknowledgments
663
acks=1,
664
665
# Reduce timeout values
666
request_timeout_ms=10000, # 10 second timeout
667
delivery_timeout_ms=30000 # 30 second delivery timeout
668
)
669
```