0
# Asynchronous Operations
1
2
Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.
3
4
## Capabilities
5
6
### Await Message Sensor
7
8
Deferrable sensor that waits for specific messages in Kafka topics without blocking worker slots. Uses apply functions to test message conditions and supports deferrable execution.
9
10
```python { .api }
11
class AwaitMessageSensor(BaseOperator):
12
"""
13
Sensor that waits for messages in Kafka topics using deferrable execution.
14
15
Attributes:
16
template_fields: tuple = ("topics", "apply_function", "kafka_config_id")
17
ui_color: str = "#e8f4fd"
18
"""
19
20
def __init__(
21
self,
22
topics: Sequence[str],
23
apply_function: str | Callable,
24
kafka_config_id: str = "kafka_default",
25
apply_function_args: Sequence[Any] | None = None,
26
apply_function_kwargs: dict[Any, Any] | None = None,
27
poll_timeout: float = 1.0,
28
poll_interval: float = 5.0,
29
commit_cadence: str = "end_of_batch",
30
max_messages: int | None = None,
31
max_batch_size: int = 1000,
32
**kwargs: Any
33
) -> None:
34
"""
35
Initialize the message sensor.
36
37
Args:
38
topics: List of topics to monitor
39
apply_function: Function to test message conditions (callable or import string)
40
kafka_config_id: Airflow connection ID for Kafka configuration
41
apply_function_args: Arguments to pass to apply function
42
apply_function_kwargs: Keyword arguments to pass to apply function
43
poll_timeout: Timeout for individual polls (seconds)
44
poll_interval: Interval between polls (seconds)
45
commit_cadence: When to commit messages ("end_of_batch", "end_of_operator", "never")
46
max_messages: Maximum number of messages to consume
47
max_batch_size: Maximum batch size for message processing
48
**kwargs: Additional operator arguments
49
"""
50
51
def execute(self, context) -> Any:
52
"""
53
Execute the sensor operation.
54
55
Args:
56
context: Airflow task context
57
58
Returns:
59
Any: Result when condition is met
60
"""
61
62
def execute_complete(self, context, event=None):
63
"""
64
Handle trigger completion.
65
66
Args:
67
context: Airflow task context
68
event: Event data from trigger
69
"""
70
```
71
72
### Await Message Trigger Function Sensor
73
74
Enhanced sensor with custom trigger function for complex event handling. Provides additional event processing capabilities beyond basic condition checking.
75
76
```python { .api }
77
class AwaitMessageTriggerFunctionSensor(BaseOperator):
78
"""
79
Sensor with custom trigger function for advanced message handling.
80
81
Attributes:
82
template_fields: tuple = ("topics", "apply_function", "kafka_config_id")
83
ui_color: str = "#e8f4fd"
84
"""
85
86
def __init__(
87
self,
88
topics: Sequence[str],
89
apply_function: str | Callable,
90
event_triggered_function: Callable,
91
kafka_config_id: str = "kafka_default",
92
apply_function_args: Sequence[Any] | None = None,
93
apply_function_kwargs: dict[Any, Any] | None = None,
94
poll_timeout: float = 1.0,
95
poll_interval: float = 5.0,
96
**kwargs: Any
97
) -> None:
98
"""
99
Initialize the trigger function sensor.
100
101
Args:
102
topics: List of topics to monitor
103
apply_function: Function to test message conditions (callable or import string)
104
event_triggered_function: Function to call when event triggers
105
kafka_config_id: Airflow connection ID for Kafka configuration
106
apply_function_args: Arguments to pass to apply function
107
apply_function_kwargs: Keyword arguments to pass to apply function
108
poll_timeout: Timeout for individual polls (seconds)
109
poll_interval: Interval between polls (seconds)
110
**kwargs: Additional operator arguments
111
"""
112
113
def execute(self, context, event=None) -> Any:
114
"""
115
Execute the sensor operation.
116
117
Args:
118
context: Airflow task context
119
event: Event data
120
121
Returns:
122
Any: Result when condition is met
123
"""
124
```
125
126
### Await Message Trigger
127
128
Low-level trigger for asynchronous message monitoring in Kafka topics. Provides the core deferrable execution mechanism used by sensors.
129
130
```python { .api }
131
class AwaitMessageTrigger(BaseTrigger):
132
"""
133
Trigger for asynchronous message monitoring in Kafka topics.
134
"""
135
136
def __init__(
137
self,
138
topics: Sequence[str],
139
apply_function: str,
140
kafka_config_id: str = "kafka_default",
141
apply_function_args: Sequence[Any] | None = None,
142
apply_function_kwargs: dict[Any, Any] | None = None,
143
poll_timeout: float = 1.0,
144
poll_interval: float = 5.0,
145
**kwargs: Any
146
) -> None:
147
"""
148
Initialize the message trigger.
149
150
Args:
151
topics: List of topics to monitor
152
apply_function: Import string for function to test message conditions
153
kafka_config_id: Airflow connection ID for Kafka configuration
154
apply_function_args: Arguments to pass to apply function
155
apply_function_kwargs: Keyword arguments to pass to apply function
156
poll_timeout: Timeout for individual polls (seconds)
157
poll_interval: Interval between polls (seconds)
158
**kwargs: Additional trigger arguments
159
"""
160
161
def serialize(self) -> tuple[str, dict[str, Any]]:
162
"""
163
Serialize trigger state.
164
165
Returns:
166
tuple: (class_path, serialized_kwargs)
167
"""
168
169
async def run(self):
170
"""
171
Run the trigger asynchronously.
172
173
Yields:
174
TriggerEvent: Event when condition is met
175
"""
176
```
177
178
### Usage Examples
179
180
#### Basic Message Waiting
181
182
```python
183
from airflow import DAG
184
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
185
from datetime import datetime
186
import json
187
188
def check_order_completion(message):
189
"""Check if message indicates order completion."""
190
try:
191
data = json.loads(message.value().decode('utf-8'))
192
return data.get("status") == "completed" and data.get("order_id") == "12345"
193
except Exception:
194
return False
195
196
dag = DAG(
197
"kafka_sensor_example",
198
start_date=datetime(2023, 1, 1),
199
schedule_interval=None,
200
catchup=False
201
)
202
203
wait_for_order = AwaitMessageSensor(
204
task_id="wait_for_order_completion",
205
topics=["order-events"],
206
apply_function=check_order_completion,
207
poll_timeout=1.0,
208
poll_interval=5.0,
209
timeout=300, # 5 minutes timeout
210
kafka_config_id="kafka_default",
211
dag=dag
212
)
213
```
214
215
#### Using Apply Function as Import String
216
217
```python
218
# Define function in separate module (e.g. my_functions.py)
219
def detect_critical_event(message):
220
"""Detect critical events in message stream."""
221
try:
222
data = json.loads(message.value().decode('utf-8'))
223
return data.get("priority") == "critical"
224
except Exception:
225
return False
226
227
# Use import string in sensor
228
critical_sensor = AwaitMessageSensor(
229
task_id="detect_critical",
230
topics=["system-events"],
231
apply_function="my_functions.detect_critical_event", # Import string
232
poll_timeout=1.0,
233
poll_interval=2.0,
234
kafka_config_id="kafka_default"
235
)
236
```
237
238
#### Commit Cadence Configuration
239
240
```python
241
# Different commit strategies
242
sensor_end_of_batch = AwaitMessageSensor(
243
task_id="sensor_batch_commit",
244
topics=["events"],
245
apply_function="my_functions.process_event",
246
commit_cadence="end_of_batch", # Commit after each batch
247
max_batch_size=100,
248
kafka_config_id="kafka_default"
249
)
250
251
sensor_end_of_operator = AwaitMessageSensor(
252
task_id="sensor_operator_commit",
253
topics=["events"],
254
apply_function="my_functions.process_event",
255
commit_cadence="end_of_operator", # Commit at end of execution
256
kafka_config_id="kafka_default"
257
)
258
259
sensor_never_commit = AwaitMessageSensor(
260
task_id="sensor_no_commit",
261
topics=["events"],
262
apply_function="my_functions.process_event",
263
commit_cadence="never", # Never commit (external commit control)
264
kafka_config_id="kafka_default"
265
)
266
```
267
268
#### Custom Trigger Function Sensor
269
270
```python
271
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageTriggerFunctionSensor
272
273
def event_condition(message):
274
"""Check if message meets condition."""
275
data = json.loads(message.value().decode('utf-8'))
276
return data.get("event_type") == "alert"
277
278
def handle_alert(event_data):
279
"""Handle alert when triggered."""
280
print(f"Alert triggered: {event_data}")
281
# Custom alert handling logic
282
return "Alert processed"
283
284
alert_sensor = AwaitMessageTriggerFunctionSensor(
285
task_id="handle_alerts",
286
topics=["alerts"],
287
apply_function=event_condition,
288
event_triggered_function=handle_alert,
289
poll_timeout=0.5,
290
poll_interval=1.0,
291
kafka_config_id="kafka_default"
292
)
293
```
294
295
#### Using Trigger Directly
296
297
```python
298
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
299
from airflow.sensors.base import BaseSensorOperator
300
301
class CustomKafkaSensor(BaseSensorOperator):
302
def __init__(self, topics, apply_function, **kwargs):
303
super().__init__(**kwargs)
304
self.topics = topics
305
self.apply_function = apply_function
306
307
def poke(self, context):
308
# Create and use trigger directly
309
trigger = AwaitMessageTrigger(
310
topics=self.topics,
311
apply_function=self.apply_function,
312
kafka_config_id="kafka_default",
313
poll_timeout=1.0,
314
poll_interval=2.0
315
)
316
317
# Use trigger in custom logic
318
return self._check_trigger_condition(trigger, context)
319
```
320
321
### Error Handling
322
323
The provider includes error handling mechanisms:
324
325
```python { .api }
326
def error_callback(err):
327
"""
328
Error callback function for Kafka consumer errors.
329
330
Args:
331
err: Kafka error object
332
"""
333
```
334
335
Custom exception for authentication failures:
336
337
```python { .api }
338
class KafkaAuthenticationError(Exception):
339
"""Custom exception for Kafka authentication failures."""
340
```
341
342
### Configuration Constants
343
344
```python { .api }
345
VALID_COMMIT_CADENCE = ["end_of_batch", "end_of_operator", "never"]
346
```
347
348
### Best Practices
349
350
1. **Deferrable Execution**: Use sensors for long-running waits to free up worker slots
351
2. **Apply Function Design**: Keep apply functions lightweight and handle exceptions gracefully
352
3. **Commit Strategy**: Choose appropriate commit cadence based on your use case
353
4. **Error Handling**: Implement proper error handling in apply functions
354
5. **Resource Management**: Set reasonable batch sizes and timeouts
355
6. **Import Strings**: Use import strings for apply functions to enable proper serialization
356
7. **Connection Management**: Use appropriate Kafka connection configurations