0
# SQL Triggers
1
2
SQL triggers enable asynchronous execution of SQL operations without blocking the Airflow scheduler. They provide efficient handling of long-running database operations through Airflow's triggerer component.
3
4
## Capabilities
5
6
### SQL Execute Query Trigger
7
8
Executes SQL statements asynchronously using Airflow's trigger mechanism.
9
10
```python { .api }
11
class SQLExecuteQueryTrigger:
12
"""
13
Executes SQL code asynchronously.
14
15
Args:
16
sql (str or list): SQL statement(s) to execute
17
conn_id (str): Database connection ID
18
hook_params (dict, optional): Additional hook parameters
19
**kwargs: Additional trigger arguments
20
"""
21
22
def __init__(self, sql, conn_id, hook_params=None, **kwargs):
23
pass
24
25
def serialize(self):
26
"""
27
Serialize trigger configuration for storage.
28
29
Returns:
30
tuple: (class_path, kwargs) for trigger reconstruction
31
"""
32
pass
33
34
def get_hook(self):
35
"""
36
Get database hook for connection.
37
38
Returns:
39
Database hook instance
40
"""
41
pass
42
43
async def run(self):
44
"""
45
Execute the SQL asynchronously.
46
47
Yields:
48
TriggerEvent: Results or status updates
49
"""
50
pass
51
```
52
53
## Usage Examples
54
55
### Basic Asynchronous SQL Execution
56
57
```python
58
from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger
59
from airflow.sensors.base import BaseSensorOperator
60
61
class AsyncSQLOperator(BaseSensorOperator):
62
def __init__(self, sql, conn_id, **kwargs):
63
super().__init__(**kwargs)
64
self.sql = sql
65
self.conn_id = conn_id
66
67
def execute(self, context):
68
# Defer to trigger for async execution
69
self.defer(
70
trigger=SQLExecuteQueryTrigger(
71
sql=self.sql,
72
conn_id=self.conn_id
73
),
74
method_name='execute_complete'
75
)
76
77
def execute_complete(self, context, event):
78
# Handle trigger completion
79
if event['status'] == 'success':
80
self.log.info(f"SQL executed successfully: {event['results']}")
81
return event['results']
82
else:
83
raise Exception(f"SQL execution failed: {event['error']}")
84
85
# Use the async operator
86
async_sql = AsyncSQLOperator(
87
task_id='async_sql_execution',
88
sql='SELECT * FROM large_table WHERE date = {{ ds }}',
89
conn_id='my_database'
90
)
91
```
92
93
### Long-Running Query with Progress Updates
94
95
```python
96
from airflow.triggers.base import TriggerEvent
97
from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger
98
99
class LongRunningQueryTrigger(SQLExecuteQueryTrigger):
100
"""Custom trigger with progress reporting."""
101
102
async def run(self):
103
hook = self.get_hook()
104
105
try:
106
# Start the query
107
self.log.info("Starting long-running query...")
108
yield TriggerEvent({'status': 'started', 'message': 'Query started'})
109
110
# Execute query (this could take a long time)
111
results = hook.run(self.sql)
112
113
# Report completion
114
yield TriggerEvent({
115
'status': 'success',
116
'results': results,
117
'message': f'Query completed with {len(results) if results else 0} results'
118
})
119
120
except Exception as e:
121
yield TriggerEvent({
122
'status': 'error',
123
'error': str(e),
124
'message': f'Query failed: {str(e)}'
125
})
126
127
# Use custom trigger
128
custom_trigger_task = AsyncSQLOperator(
129
task_id='long_query',
130
sql='SELECT * FROM very_large_table ORDER BY created_date',
131
conn_id='my_database'
132
)
133
```
134
135
### Batch Processing with Triggers
136
137
```python
138
class BatchSQLTrigger(SQLExecuteQueryTrigger):
139
"""Trigger for batch SQL processing."""
140
141
def __init__(self, sql_statements, conn_id, batch_size=100, **kwargs):
142
super().__init__(sql_statements, conn_id, **kwargs)
143
self.batch_size = batch_size
144
145
async def run(self):
146
hook = self.get_hook()
147
148
try:
149
total_statements = len(self.sql)
150
processed = 0
151
152
# Process in batches
153
for i in range(0, total_statements, self.batch_size):
154
batch = self.sql[i:i + self.batch_size]
155
156
# Execute batch
157
for stmt in batch:
158
hook.run(stmt, autocommit=True)
159
processed += 1
160
161
# Report progress
162
yield TriggerEvent({
163
'status': 'progress',
164
'processed': processed,
165
'total': total_statements,
166
'message': f'Processed {processed}/{total_statements} statements'
167
})
168
169
# Report completion
170
yield TriggerEvent({
171
'status': 'success',
172
'processed': processed,
173
'message': f'All {processed} statements processed successfully'
174
})
175
176
except Exception as e:
177
yield TriggerEvent({
178
'status': 'error',
179
'processed': processed,
180
'error': str(e),
181
'message': f'Batch processing failed at statement {processed}: {str(e)}'
182
})
183
184
# Use batch trigger
185
batch_statements = [
186
"INSERT INTO logs (message) VALUES ('Batch item 1')",
187
"INSERT INTO logs (message) VALUES ('Batch item 2')",
188
# ... many more statements
189
]
190
191
batch_task = AsyncSQLOperator(
192
task_id='batch_processing',
193
sql=batch_statements,
194
conn_id='my_database'
195
)
196
```
197
198
### Conditional Async Execution
199
200
```python
201
class ConditionalSQLTrigger(SQLExecuteQueryTrigger):
202
"""Conditionally execute SQL based on query results."""
203
204
def __init__(self, check_sql, execute_sql, conn_id, condition_func=None, **kwargs):
205
super().__init__(execute_sql, conn_id, **kwargs)
206
self.check_sql = check_sql
207
self.condition_func = condition_func or (lambda x: bool(x))
208
209
async def run(self):
210
hook = self.get_hook()
211
212
try:
213
# First, check condition
214
check_results = hook.get_records(self.check_sql)
215
216
if self.condition_func(check_results):
217
# Condition met, execute main SQL
218
yield TriggerEvent({
219
'status': 'executing',
220
'message': 'Condition met, executing SQL'
221
})
222
223
results = hook.run(self.sql)
224
225
yield TriggerEvent({
226
'status': 'success',
227
'results': results,
228
'message': 'SQL executed successfully'
229
})
230
else:
231
# Condition not met, skip execution
232
yield TriggerEvent({
233
'status': 'skipped',
234
'message': 'Condition not met, skipping SQL execution'
235
})
236
237
except Exception as e:
238
yield TriggerEvent({
239
'status': 'error',
240
'error': str(e),
241
'message': f'Conditional execution failed: {str(e)}'
242
})
243
244
# Use conditional trigger
245
conditional_task = AsyncSQLOperator(
246
task_id='conditional_sql',
247
sql='UPDATE inventory SET processed = true WHERE date = {{ ds }}',
248
conn_id='my_database'
249
)
250
```
251
252
### Monitoring Long-Running Operations
253
254
```python
255
from airflow.providers.common.sql.sensors.sql import SqlSensor
256
257
class AsyncSQLSensor(SqlSensor):
258
"""SQL sensor using async triggers."""
259
260
def execute(self, context):
261
# Use trigger for async monitoring
262
self.defer(
263
trigger=SQLExecuteQueryTrigger(
264
sql=self.sql,
265
conn_id=self.conn_id,
266
hook_params=self.hook_params
267
),
268
method_name='execute_complete'
269
)
270
271
def execute_complete(self, context, event):
272
if event['status'] == 'success':
273
results = event['results']
274
275
# Apply success criteria
276
if self.success and callable(self.success):
277
if self.success(results):
278
self.log.info("Sensor condition met")
279
return True
280
else:
281
# Condition not met, defer again
282
self.defer(
283
trigger=SQLExecuteQueryTrigger(
284
sql=self.sql,
285
conn_id=self.conn_id,
286
hook_params=self.hook_params
287
),
288
method_name='execute_complete'
289
)
290
291
return results
292
else:
293
raise Exception(f"Sensor failed: {event['error']}")
294
295
# Use async sensor
296
async_sensor = AsyncSQLSensor(
297
task_id='async_wait_for_data',
298
conn_id='my_database',
299
sql='SELECT COUNT(*) FROM processing_queue WHERE status = "pending"',
300
success=lambda x: x[0][0] == 0, # Wait for queue to be empty
301
poke_interval=60
302
)
303
```
304
305
## Trigger Configuration
306
307
### Hook Parameters
308
309
```python
310
# Customize hook behavior through hook_params
311
trigger = SQLExecuteQueryTrigger(
312
sql='SELECT * FROM data',
313
conn_id='my_conn',
314
hook_params={
315
'schema': 'custom_schema',
316
'autocommit': True,
317
'isolation_level': 'READ_COMMITTED'
318
}
319
)
320
```
321
322
### Serialization
323
324
Triggers must be serializable to be stored and reconstructed by the triggerer:
325
326
```python
327
# The serialize method returns class path and arguments
328
class_path, kwargs = trigger.serialize()
329
330
# This allows the triggerer to reconstruct the trigger:
331
# trigger_class = import_from_path(class_path)
332
# reconstructed_trigger = trigger_class(**kwargs)
333
```
334
335
## Benefits of Using Triggers
336
337
1. **Non-blocking**: Triggers don't block worker slots while waiting
338
2. **Scalable**: The triggerer can handle many concurrent triggers
339
3. **Resource efficient**: Reduces worker resource consumption
340
4. **Progress reporting**: Can yield intermediate status updates
341
5. **Error handling**: Structured error reporting and recovery