0
# Message Consumption
1
2
Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.
3
4
## Capabilities
5
6
### Consume From Topic Operator
7
8
Airflow operator for consuming messages from Kafka topics with flexible processing functions and configurable commit strategies.
9
10
```python { .api }
11
class ConsumeFromTopicOperator(BaseOperator):
12
"""
13
Operator for consuming messages from Kafka topics.
14
15
Attributes:
16
template_fields: tuple = ("topics", "apply_function_args", "apply_function_kwargs", "kafka_config_id")
17
"""
18
19
def __init__(
20
self,
21
topics: str | Sequence[str],
22
kafka_config_id: str = "kafka_default",
23
apply_function: Callable[..., Any] | str | None = None,
24
apply_function_batch: Callable[..., Any] | str | None = None,
25
apply_function_args: Sequence[Any] | None = None,
26
apply_function_kwargs: dict[Any, Any] | None = None,
27
commit_cadence: str = "end_of_operator",
28
max_messages: int | None = None,
29
max_batch_size: int = 1000,
30
poll_timeout: float = 60,
31
**kwargs: Any
32
) -> None:
33
"""
34
Initialize the consumer operator.
35
36
Args:
37
topics: Kafka topic name(s) to consume from
38
kafka_config_id: Airflow connection ID for Kafka configuration
39
apply_function: Function to apply to each message
40
apply_function_batch: Function to apply to batches of messages
41
apply_function_args: Arguments to pass to apply function
42
apply_function_kwargs: Keyword arguments to pass to apply function
43
commit_cadence: When to commit offsets ("never", "end_of_batch", "end_of_operator")
44
max_messages: Maximum number of messages to consume
45
max_batch_size: Maximum messages per batch
46
poll_timeout: Timeout for polling messages (seconds)
47
**kwargs: Additional operator arguments
48
"""
49
50
def execute(self, context) -> Any:
51
"""
52
Execute the consumer operation.
53
54
Args:
55
context: Airflow task context
56
57
Returns:
58
Any: Result of apply function(s)
59
"""
60
```
61
62
### Kafka Consumer Hook
63
64
Lower-level hook providing direct access to Kafka Consumer client for advanced use cases.
65
66
```python { .api }
67
class KafkaConsumerHook(KafkaBaseHook):
68
"""
69
A hook for consuming messages from Kafka topics.
70
71
Inherits from KafkaBaseHook and provides consumer client access.
72
"""
73
74
def __init__(self, topics: Sequence[str], kafka_config_id: str = "kafka_default") -> None:
75
"""
76
Initialize the Kafka consumer hook.
77
78
Args:
79
topics: List of topic names to subscribe to
80
kafka_config_id: The connection object to use
81
"""
82
83
def get_consumer(self) -> Consumer:
84
"""
85
Get Kafka Consumer client.
86
87
Returns:
88
Consumer: Configured confluent-kafka Consumer instance
89
"""
90
91
def _get_client(self, config) -> Consumer:
92
"""
93
Get a Kafka Consumer client with the given configuration.
94
95
Args:
96
config: Kafka client configuration dictionary
97
98
Returns:
99
Consumer: Configured confluent-kafka Consumer instance
100
"""
101
```
102
103
### Commit Cadence Options
104
105
```python { .api }
106
VALID_COMMIT_CADENCE = ["never", "end_of_batch", "end_of_operator"]
107
```
108
109
### Error Handling
110
111
```python { .api }
112
class KafkaAuthenticationError(Exception):
113
"""
114
Custom exception for Kafka authentication failures.
115
116
Raised when consumer authentication fails during connection
117
or topic subscription attempts.
118
"""
119
120
def error_callback(err):
121
"""
122
Default error handling callback for consumer operations.
123
124
Args:
125
err: Error object from confluent-kafka consumer
126
"""
127
```
128
129
### Usage Examples
130
131
#### Basic Message Consumption
132
133
```python
134
from airflow import DAG
135
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
136
from datetime import datetime
137
138
def process_message(message):
139
"""Process individual messages."""
140
key = message.key().decode('utf-8') if message.key() else None
141
value = message.value().decode('utf-8')
142
143
print(f"Processing message - Key: {key}, Value: {value}")
144
145
# Return True to indicate successful processing
146
return True
147
148
dag = DAG(
149
"kafka_consumer_example",
150
start_date=datetime(2023, 1, 1),
151
schedule_interval=None,
152
catchup=False
153
)
154
155
consume_task = ConsumeFromTopicOperator(
156
task_id="consume_messages",
157
topics=["my-topic"],
158
apply_function=process_message,
159
max_messages=100,
160
kafka_config_id="kafka_default",
161
dag=dag
162
)
163
```
164
165
#### Batch Processing
166
167
```python
168
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
169
import json
170
171
def process_batch(messages):
172
"""Process messages in batches."""
173
processed_records = []
174
175
for message in messages:
176
try:
177
# Parse JSON message
178
data = json.loads(message.value().decode('utf-8'))
179
180
# Transform data
181
processed_record = {
182
"user_id": data["user_id"],
183
"action": data["action"],
184
"processed_at": datetime.now().isoformat()
185
}
186
processed_records.append(processed_record)
187
188
except json.JSONDecodeError:
189
print(f"Failed to parse message: {message.value()}")
190
continue
191
192
# Bulk insert or process
193
print(f"Processed batch of {len(processed_records)} records")
194
return processed_records
195
196
consume_batch_task = ConsumeFromTopicOperator(
197
task_id="consume_batch",
198
topics=["user-events"],
199
apply_function_batch=process_batch,
200
max_batch_size=500,
201
commit_cadence="end_of_batch",
202
kafka_config_id="kafka_default"
203
)
204
```
205
206
#### Multiple Topics Consumption
207
208
```python
209
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
210
211
def multi_topic_processor(message):
212
"""Process messages from multiple topics."""
213
topic = message.topic()
214
value = message.value().decode('utf-8')
215
216
if topic == "orders":
217
print(f"Processing order: {value}")
218
# Handle order logic
219
elif topic == "payments":
220
print(f"Processing payment: {value}")
221
# Handle payment logic
222
elif topic == "shipments":
223
print(f"Processing shipment: {value}")
224
# Handle shipment logic
225
226
return True
227
228
multi_topic_task = ConsumeFromTopicOperator(
229
task_id="consume_multi_topics",
230
topics=["orders", "payments", "shipments"],
231
apply_function=multi_topic_processor,
232
max_messages=1000,
233
poll_timeout=30,
234
kafka_config_id="kafka_default"
235
)
236
```
237
238
#### Parameterized Processing
239
240
```python
241
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
242
243
def parameterized_processor(message, filter_user_id, output_format):
244
"""Process messages with parameters."""
245
try:
246
data = json.loads(message.value().decode('utf-8'))
247
248
# Filter by user ID if specified
249
if filter_user_id and data.get("user_id") != filter_user_id:
250
return None # Skip this message
251
252
# Format output
253
if output_format == "csv":
254
result = f"{data['user_id']},{data['action']},{data['timestamp']}"
255
else:
256
result = json.dumps(data)
257
258
print(f"Processed: {result}")
259
return result
260
261
except Exception as e:
262
print(f"Error processing message: {e}")
263
return None
264
265
filtered_consume_task = ConsumeFromTopicOperator(
266
task_id="filtered_consume",
267
topics=["user-events"],
268
apply_function=parameterized_processor,
269
apply_function_args=[123, "json"], # filter_user_id=123, output_format="json"
270
max_messages=500,
271
kafka_config_id="kafka_default"
272
)
273
```
274
275
#### Advanced Commit Strategy
276
277
```python
278
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
279
280
def critical_processor(message):
281
"""Process critical messages with manual commit control."""
282
try:
283
# Critical processing logic
284
data = json.loads(message.value().decode('utf-8'))
285
286
# Validate data
287
if not all(k in data for k in ["id", "amount", "currency"]):
288
raise ValueError("Missing required fields")
289
290
# Process financial transaction
291
process_transaction(data)
292
293
print(f"Successfully processed transaction {data['id']}")
294
return True
295
296
except Exception as e:
297
print(f"Failed to process transaction: {e}")
298
# Don't commit this message, it will be retried
299
raise
300
301
critical_consume_task = ConsumeFromTopicOperator(
302
task_id="consume_critical",
303
topics=["financial-transactions"],
304
apply_function=critical_processor,
305
commit_cadence="end_of_operator", # Only commit after all messages processed
306
max_messages=100,
307
poll_timeout=60,
308
kafka_config_id="kafka_prod"
309
)
310
```
311
312
### Using Consumer Hook Directly
313
314
```python
315
from airflow.operators.python import PythonOperator
316
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
317
318
def custom_consumer_logic():
319
"""Use consumer hook for advanced scenarios."""
320
hook = KafkaConsumerHook(
321
topics=["custom-topic"],
322
kafka_config_id="kafka_default"
323
)
324
325
consumer = hook.get_consumer()
326
messages_processed = 0
327
328
try:
329
while messages_processed < 100:
330
msg = consumer.poll(timeout=1.0)
331
332
if msg is None:
333
continue
334
335
if msg.error():
336
print(f"Consumer error: {msg.error()}")
337
continue
338
339
# Custom processing
340
print(f"Received: {msg.value().decode('utf-8')}")
341
messages_processed += 1
342
343
# Manual commit after processing
344
consumer.commit(msg)
345
346
except Exception as e:
347
print(f"Consumer error: {e}")
348
raise
349
finally:
350
consumer.close()
351
352
hook_consumer_task = PythonOperator(
353
task_id="custom_consumer",
354
python_callable=custom_consumer_logic
355
)
356
```
357
358
### Advanced Consumer Configuration
359
360
#### Consumer Group Configuration
361
362
```python
363
# Connection extra configuration for consumer groups
364
{
365
"bootstrap.servers": "kafka:9092",
366
"group.id": "airflow-consumer-group",
367
"auto.offset.reset": "earliest", # or "latest"
368
"enable.auto.commit": "false", # Manual commit control
369
"max.poll.records": "500", # Messages per poll
370
"session.timeout.ms": "30000", # Consumer group session timeout
371
"heartbeat.interval.ms": "3000", # Heartbeat interval
372
"fetch.min.bytes": "1024", # Minimum bytes to fetch
373
"fetch.max.wait.ms": "500" # Maximum wait time for fetch
374
}
375
```
376
377
#### Message Filtering and Transformation
378
379
```python
380
def filter_and_transform(message):
381
"""Filter and transform messages based on content."""
382
try:
383
data = json.loads(message.value().decode('utf-8'))
384
385
# Filter based on message content
386
if data.get("event_type") not in ["purchase", "login"]:
387
return None # Skip message
388
389
# Transform message
390
transformed = {
391
"event_id": data["id"],
392
"user": data["user_id"],
393
"type": data["event_type"],
394
"timestamp": data["timestamp"],
395
"metadata": {
396
"topic": message.topic(),
397
"partition": message.partition(),
398
"offset": message.offset()
399
}
400
}
401
402
return transformed
403
404
except Exception as e:
405
print(f"Error filtering message: {e}")
406
return None
407
408
filter_task = ConsumeFromTopicOperator(
409
task_id="filter_messages",
410
topics=["raw-events"],
411
apply_function=filter_and_transform,
412
commit_cadence="end_of_batch",
413
max_batch_size=100,
414
kafka_config_id="kafka_default"
415
)
416
```
417
418
### Error Handling and Retry Logic
419
420
```python
421
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
422
423
def resilient_processor(message):
424
"""Process messages with retry logic."""
425
max_retries = 3
426
retry_count = 0
427
428
while retry_count < max_retries:
429
try:
430
# Processing logic that might fail
431
data = json.loads(message.value().decode('utf-8'))
432
433
# Simulate external API call
434
result = call_external_api(data)
435
436
print(f"Successfully processed message: {result}")
437
return result
438
439
except Exception as e:
440
retry_count += 1
441
print(f"Attempt {retry_count} failed: {e}")
442
443
if retry_count >= max_retries:
444
# Log to dead letter topic or error handling
445
print(f"Message failed after {max_retries} attempts")
446
return None
447
448
# Wait before retry
449
time.sleep(2 ** retry_count) # Exponential backoff
450
451
resilient_task = ConsumeFromTopicOperator(
452
task_id="resilient_consume",
453
topics=["api-requests"],
454
apply_function=resilient_processor,
455
commit_cadence="end_of_operator",
456
kafka_config_id="kafka_default"
457
)
458
```
459
460
### Performance Optimization
461
462
#### High-Throughput Consumption
463
464
```python
465
def high_throughput_batch_processor(messages):
466
"""Optimized batch processing for high throughput."""
467
# Process in chunks for memory efficiency
468
chunk_size = 100
469
results = []
470
471
for i in range(0, len(messages), chunk_size):
472
chunk = messages[i:i + chunk_size]
473
474
# Parallel processing of chunk
475
chunk_results = []
476
for message in chunk:
477
try:
478
data = json.loads(message.value().decode('utf-8'))
479
# Fast processing logic
480
chunk_results.append(transform_data(data))
481
except Exception as e:
482
print(f"Error in chunk processing: {e}")
483
continue
484
485
results.extend(chunk_results)
486
487
# Optional: Intermediate processing/storage
488
if len(results) >= 1000:
489
bulk_store(results)
490
results = []
491
492
# Final storage
493
if results:
494
bulk_store(results)
495
496
return len(messages)
497
498
high_throughput_task = ConsumeFromTopicOperator(
499
task_id="high_throughput_consume",
500
topics=["high-volume-topic"],
501
apply_function_batch=high_throughput_batch_processor,
502
max_batch_size=2000,
503
poll_timeout=10,
504
commit_cadence="end_of_batch",
505
kafka_config_id="kafka_default"
506
)
507
```
508
509
### Best Practices
510
511
1. **Consumer Groups**: Use meaningful group IDs for consumer coordination
512
2. **Offset Management**: Choose appropriate `auto.offset.reset` and commit strategies
513
3. **Error Handling**: Implement robust error handling with retry logic
514
4. **Memory Management**: Process messages in batches for better memory utilization
515
5. **Monitoring**: Log processing metrics and errors for operational visibility
516
6. **Backpressure**: Use `max_messages` and `poll_timeout` to control consumption rate