0
# Message Production
1
2
Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.
3
4
## Capabilities
5
6
### Produce to Topic Operator
7
8
Airflow operator for producing messages to Kafka topics with flexible producer function support and delivery confirmation options.
9
10
```python { .api }
11
class ProduceToTopicOperator(BaseOperator):
12
"""
13
Operator for producing messages to a Kafka topic.
14
15
Attributes:
16
template_fields: tuple = ("topic", "producer_function_args", "producer_function_kwargs", "kafka_config_id")
17
"""
18
19
def __init__(
20
self,
21
topic: str,
22
producer_function: str | Callable[..., Any],
23
kafka_config_id: str = "kafka_default",
24
producer_function_args: Sequence[Any] | None = None,
25
producer_function_kwargs: dict[Any, Any] | None = None,
26
delivery_callback: str | None = None,
27
synchronous: bool = True,
28
poll_timeout: float = 0,
29
**kwargs: Any
30
) -> None:
31
"""
32
Initialize the producer operator.
33
34
Args:
35
topic: Kafka topic name to produce to
36
producer_function: Function that takes a producer and produces messages
37
kafka_config_id: Airflow connection ID for Kafka configuration
38
producer_function_args: Arguments to pass to producer function
39
producer_function_kwargs: Keyword arguments to pass to producer function
40
delivery_callback: Callback function name for delivery confirmations
41
synchronous: Whether to wait for delivery confirmation
42
poll_timeout: Timeout for polling delivery confirmations
43
**kwargs: Additional operator arguments
44
"""
45
46
def execute(self, context) -> None:
47
"""
48
Execute the producer operation.
49
50
Args:
51
context: Airflow task context
52
"""
53
```
54
55
### Kafka Producer Hook
56
57
Lower-level hook providing direct access to Kafka Producer client for advanced use cases.
58
59
```python { .api }
60
class KafkaProducerHook(KafkaBaseHook):
61
"""
62
A hook for producing messages to Kafka topics.
63
64
Inherits from KafkaBaseHook and provides producer client access.
65
"""
66
67
def __init__(self, kafka_config_id: str = "kafka_default") -> None:
68
"""
69
Initialize the Kafka producer hook.
70
71
Args:
72
kafka_config_id: The connection object to use
73
"""
74
75
def get_producer(self) -> Producer:
76
"""
77
Get Kafka Producer client.
78
79
Returns:
80
Producer: Configured confluent-kafka Producer instance
81
"""
82
83
def _get_client(self, config) -> Producer:
84
"""
85
Get a Kafka Producer client with the given configuration.
86
87
Args:
88
config: Kafka client configuration dictionary
89
90
Returns:
91
Producer: Configured confluent-kafka Producer instance
92
"""
93
```
94
95
### Delivery Callback Functions
96
97
Default and custom callback functions for handling message delivery confirmations.
98
99
```python { .api }
100
def acked(err, msg):
101
"""
102
Default delivery callback for message acknowledgments.
103
104
This callback is automatically called by the producer when a message
105
delivery attempt completes (either successfully or with failure).
106
107
Args:
108
err: Error object if delivery failed, None if successful
109
msg: Message object with delivery details including topic, partition, offset
110
"""
111
```
112
113
### Usage Examples
114
115
#### Basic Message Production
116
117
```python
118
from airflow import DAG
119
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
120
from datetime import datetime
121
122
def simple_producer(producer):
123
"""Produce simple text messages."""
124
messages = ["Hello", "World", "from", "Airflow"]
125
126
for message in messages:
127
producer.produce("my-topic", value=message)
128
129
# Flush to ensure delivery
130
producer.flush()
131
132
dag = DAG(
133
"kafka_producer_example",
134
start_date=datetime(2023, 1, 1),
135
schedule_interval=None,
136
catchup=False
137
)
138
139
produce_task = ProduceToTopicOperator(
140
task_id="produce_messages",
141
topic="my-topic",
142
producer_function=simple_producer,
143
kafka_config_id="kafka_default",
144
dag=dag
145
)
146
```
147
148
#### JSON Message Production
149
150
```python
151
import json
152
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
153
154
def json_producer(producer):
155
"""Produce JSON messages with keys."""
156
data_records = [
157
{"user_id": 1, "action": "login", "timestamp": "2023-01-01T10:00:00Z"},
158
{"user_id": 2, "action": "purchase", "timestamp": "2023-01-01T10:01:00Z"},
159
{"user_id": 1, "action": "logout", "timestamp": "2023-01-01T10:02:00Z"}
160
]
161
162
for record in data_records:
163
key = str(record["user_id"])
164
value = json.dumps(record)
165
166
producer.produce(
167
"user-events",
168
key=key,
169
value=value,
170
headers={"content-type": "application/json"}
171
)
172
173
producer.flush()
174
175
produce_json_task = ProduceToTopicOperator(
176
task_id="produce_json_data",
177
topic="user-events",
178
producer_function=json_producer,
179
kafka_config_id="kafka_default"
180
)
181
```
182
183
#### Parameterized Production
184
185
```python
186
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
187
188
def parameterized_producer(producer, batch_size, message_prefix):
189
"""Produce messages with parameters."""
190
for i in range(batch_size):
191
message = f"{message_prefix}-{i:04d}"
192
producer.produce("batch-topic", value=message)
193
194
producer.flush()
195
print(f"Produced {batch_size} messages with prefix '{message_prefix}'")
196
197
produce_batch_task = ProduceToTopicOperator(
198
task_id="produce_batch",
199
topic="batch-topic",
200
producer_function=parameterized_producer,
201
producer_function_args=[100, "BATCH"], # batch_size=100, prefix="BATCH"
202
kafka_config_id="kafka_default"
203
)
204
```
205
206
#### Custom Delivery Callback
207
208
```python
209
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
210
211
def custom_delivery_callback(err, msg):
212
"""Custom callback for delivery confirmation."""
213
if err is not None:
214
print(f"Message delivery failed: {err}")
215
else:
216
print(f"Message delivered to {msg.topic()}[{msg.partition()}] at offset {msg.offset()}")
217
218
def producer_with_callback(producer):
219
"""Producer using custom delivery callback."""
220
for i in range(5):
221
producer.produce(
222
"callback-topic",
223
value=f"Message {i}",
224
callback=custom_delivery_callback
225
)
226
227
# Poll for delivery reports
228
producer.poll(1.0)
229
producer.flush()
230
231
produce_with_callback = ProduceToTopicOperator(
232
task_id="produce_with_callback",
233
topic="callback-topic",
234
producer_function=producer_with_callback,
235
delivery_callback="custom_delivery_callback",
236
kafka_config_id="kafka_default"
237
)
238
```
239
240
#### Asynchronous Production
241
242
```python
243
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
244
245
def async_producer(producer):
246
"""Asynchronous producer without waiting for confirmations."""
247
for i in range(1000):
248
producer.produce(
249
"high-volume-topic",
250
value=f"Message {i}",
251
key=str(i % 10) # Distribute across partitions
252
)
253
254
# Occasional polling to clear delivery reports
255
if i % 100 == 0:
256
producer.poll(0)
257
258
producer.flush()
259
260
async_produce_task = ProduceToTopicOperator(
261
task_id="async_produce",
262
topic="high-volume-topic",
263
producer_function=async_producer,
264
synchronous=False, # Don't wait for individual confirmations
265
kafka_config_id="kafka_default"
266
)
267
```
268
269
### Using Producer Hook Directly
270
271
```python
272
from airflow.operators.python import PythonOperator
273
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
274
275
def custom_producer_logic():
276
"""Use producer hook for advanced scenarios."""
277
hook = KafkaProducerHook(kafka_config_id="kafka_default")
278
producer = hook.get_producer()
279
280
try:
281
# Custom production logic
282
for i in range(10):
283
future = producer.produce(
284
"custom-topic",
285
value=f"Custom message {i}",
286
partition=i % 3 # Explicit partition assignment
287
)
288
289
# Wait for all messages
290
producer.flush(timeout=10)
291
292
except Exception as e:
293
print(f"Production failed: {e}")
294
raise
295
finally:
296
producer.close()
297
298
hook_task = PythonOperator(
299
task_id="custom_producer",
300
python_callable=custom_producer_logic
301
)
302
```
303
304
### Advanced Configuration
305
306
#### Producer Configuration via Connection
307
308
```python
309
# Connection extra configuration for optimized production
310
{
311
"bootstrap.servers": "kafka:9092",
312
"acks": "all", # Wait for all replicas
313
"retries": "2147483647", # Retry indefinitely
314
"batch.size": "16384", # Batch size in bytes
315
"linger.ms": "5", # Wait up to 5ms for batching
316
"compression.type": "snappy", # Enable compression
317
"max.in.flight.requests.per.connection": "5",
318
"enable.idempotence": "true" # Exactly-once semantics
319
}
320
```
321
322
#### Error Handling in Producer Functions
323
324
```python
325
def robust_producer(producer):
326
"""Producer with comprehensive error handling."""
327
messages_sent = 0
328
failed_messages = []
329
330
def delivery_report(err, msg):
331
nonlocal messages_sent, failed_messages
332
if err is not None:
333
failed_messages.append((msg.key(), msg.value(), str(err)))
334
else:
335
messages_sent += 1
336
337
try:
338
for i in range(100):
339
producer.produce(
340
"robust-topic",
341
key=str(i),
342
value=f"Message {i}",
343
callback=delivery_report
344
)
345
346
# Poll periodically
347
if i % 10 == 0:
348
producer.poll(0.1)
349
350
# Final flush with timeout
351
remaining = producer.flush(timeout=30)
352
if remaining > 0:
353
raise Exception(f"{remaining} messages failed to deliver")
354
355
print(f"Successfully sent {messages_sent} messages")
356
if failed_messages:
357
print(f"Failed messages: {failed_messages}")
358
359
except Exception as e:
360
print(f"Producer error: {e}")
361
raise
362
363
robust_task = ProduceToTopicOperator(
364
task_id="robust_producer",
365
topic="robust-topic",
366
producer_function=robust_producer,
367
kafka_config_id="kafka_default"
368
)
369
```
370
371
### Best Practices
372
373
1. **Batching**: Use `linger.ms` and `batch.size` for optimal throughput
374
2. **Error Handling**: Always implement delivery callbacks for critical data
375
3. **Flushing**: Call `producer.flush()` to ensure message delivery
376
4. **Partitioning**: Use message keys for consistent partition assignment
377
5. **Compression**: Enable compression for better network utilization
378
6. **Idempotence**: Enable idempotence for exactly-once delivery semantics