0
# Producer Operations
1
2
High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. The KafkaProducer is thread-safe and designed for high-throughput scenarios.
3
4
## Capabilities
5
6
### KafkaProducer
7
8
Main producer class for sending records to Kafka topics. Provides asynchronous sending with futures, automatic batching, and configurable retry logic.
9
10
```python { .api }
11
class KafkaProducer:
12
def __init__(self, **configs):
13
"""
14
Create a KafkaProducer instance.
15
16
Args:
17
**configs: Producer configuration options including:
18
bootstrap_servers (list): List of Kafka brokers
19
key_serializer (callable): Function to serialize keys
20
value_serializer (callable): Function to serialize values
21
acks (int|str): Acknowledgment requirements (0, 1, 'all')
22
retries (int): Number of retry attempts
23
batch_size (int): Batch size in bytes
24
linger_ms (int): Time to wait for batching
25
buffer_memory (int): Total memory for buffering
26
compression_type (str): Compression algorithm ('gzip', 'snappy', 'lz4', 'zstd')
27
max_in_flight_requests_per_connection (int): Max unacknowledged requests
28
request_timeout_ms (int): Request timeout
29
retry_backoff_ms (int): Retry backoff time
30
client_id (str): Client identifier
31
security_protocol (str): Security protocol ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
32
ssl_context: SSL context for encrypted connections
33
sasl_mechanism (str): SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'GSSAPI', 'OAUTHBEARER')
34
sasl_plain_username (str): Username for PLAIN SASL
35
sasl_plain_password (str): Password for PLAIN SASL
36
"""
37
38
def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
39
"""
40
Send a record to the specified topic.
41
42
Args:
43
topic (str): The topic to send the record to
44
value: The record value (will be serialized using value_serializer)
45
key: The record key (will be serialized using key_serializer)
46
headers (list): List of (key, value) header tuples
47
partition (int): Specific partition to send to (optional)
48
timestamp_ms (int): Record timestamp in milliseconds (optional)
49
50
Returns:
51
FutureRecordMetadata: Future that resolves to RecordMetadata when send completes
52
"""
53
54
def flush(self, timeout=None):
55
"""
56
Flush all pending records, blocking until complete.
57
58
Args:
59
timeout (float): Maximum time to wait in seconds
60
"""
61
62
def close(self, timeout=None):
63
"""
64
Close the producer and release resources.
65
66
Args:
67
timeout (float): Maximum time to wait for pending sends
68
"""
69
70
def partitions_for(self, topic: str):
71
"""
72
Get available partitions for a topic.
73
74
Args:
75
topic (str): Topic name
76
77
Returns:
78
set: Set of available partition numbers
79
"""
80
81
def bootstrap_connected(self):
82
"""
83
Check if producer has established bootstrap connection.
84
85
Returns:
86
bool: True if connected to at least one bootstrap server
87
"""
88
89
def metrics(self, raw=False):
90
"""
91
Get producer performance metrics.
92
93
Args:
94
raw (bool): If True, return raw metrics dict. If False, return formatted metrics.
95
96
Returns:
97
dict: Producer performance metrics including send rates, batch sizes,
98
buffer usage, and request latencies
99
"""
100
```
101
102
### Future Objects
103
104
The send() method returns future objects that can be used to handle asynchronous results.
105
106
```python { .api }
107
class FutureRecordMetadata:
108
def get(self, timeout=None):
109
"""
110
Get the RecordMetadata result, blocking if necessary.
111
112
Args:
113
timeout (float): Maximum time to wait in seconds
114
115
Returns:
116
RecordMetadata: Metadata about the sent record
117
118
Raises:
119
KafkaError: If send failed
120
"""
121
122
def add_callback(self, callback):
123
"""
124
Add callback function to be called when send completes.
125
126
Args:
127
callback (callable): Function called with (metadata, exception)
128
"""
129
130
def add_errback(self, errback):
131
"""
132
Add error callback for send failures.
133
134
Args:
135
errback (callable): Function called with exception on failure
136
"""
137
138
def is_done(self):
139
"""
140
Check if the send operation is complete.
141
142
Returns:
143
bool: True if send is complete (success or failure)
144
"""
145
146
class RecordMetadata:
147
topic: str # Topic name
148
partition: int # Partition number
149
offset: int # Record offset in partition
150
timestamp: int # Record timestamp
151
checksum: int # Record checksum
152
serialized_key_size: int # Serialized key size in bytes
153
serialized_value_size: int # Serialized value size in bytes
154
```
155
156
## Usage Examples
157
158
### Basic Producer Usage
159
160
```python
161
from kafka import KafkaProducer
162
import json
163
164
# Create producer with JSON serialization
165
producer = KafkaProducer(
166
bootstrap_servers=['localhost:9092'],
167
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
168
key_serializer=lambda k: str(k).encode('utf-8')
169
)
170
171
# Send a message
172
future = producer.send('my-topic', key='user123', value={'action': 'login'})
173
174
# Block until send completes and get metadata
175
metadata = future.get(timeout=10)
176
print(f"Sent to partition {metadata.partition} at offset {metadata.offset}")
177
178
producer.close()
179
```
180
181
### Producer with Callbacks
182
183
```python
184
from kafka import KafkaProducer
185
186
def on_success(metadata):
187
print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
188
189
def on_error(exception):
190
print(f"Send failed: {exception}")
191
192
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
193
194
# Send with callbacks
195
future = producer.send('my-topic', b'message-value')
196
future.add_callback(on_success)
197
future.add_errback(on_error)
198
199
producer.flush()
200
producer.close()
201
```
202
203
### High-Throughput Configuration
204
205
```python
206
from kafka import KafkaProducer
207
208
# Configure for high throughput
209
producer = KafkaProducer(
210
bootstrap_servers=['localhost:9092'],
211
batch_size=16384, # 16KB batches
212
linger_ms=10, # Wait 10ms for batching
213
compression_type='lz4', # Compress batches
214
buffer_memory=33554432, # 32MB buffer
215
max_in_flight_requests_per_connection=5
216
)
217
218
# Send many messages quickly
219
for i in range(1000):
220
producer.send('high-volume-topic', f'message-{i}'.encode())
221
222
producer.flush()
223
producer.close()
224
```
225
226
### Secure Producer (SSL + SASL)
227
228
```python
229
from kafka import KafkaProducer
230
231
producer = KafkaProducer(
232
bootstrap_servers=['secure-broker:9093'],
233
security_protocol='SASL_SSL',
234
sasl_mechanism='SCRAM-SHA-256',
235
sasl_plain_username='myuser',
236
sasl_plain_password='mypassword',
237
ssl_check_hostname=True,
238
ssl_cafile='ca-cert.pem'
239
)
240
241
producer.send('secure-topic', b'encrypted message')
242
producer.close()
243
```
244
245
### Custom Partitioning
246
247
```python
248
from kafka import KafkaProducer
249
from kafka.partitioner import DefaultPartitioner
250
251
class CustomPartitioner:
252
def __init__(self):
253
self.default = DefaultPartitioner()
254
255
def partition(self, topic, key, all_partitions, available_partitions):
256
# Custom logic here
257
if key and key.startswith(b'priority-'):
258
return 0 # Send priority messages to partition 0
259
return self.default.partition(topic, key, all_partitions, available_partitions)
260
261
producer = KafkaProducer(
262
bootstrap_servers=['localhost:9092'],
263
partitioner=CustomPartitioner()
264
)
265
```