0
# Asynchronous Execution
1
2
Deferrable task execution through triggers, enabling efficient resource utilization for long-running Snowflake operations without blocking worker slots. This capability allows Airflow workers to handle other tasks while Snowflake queries execute, improving overall system throughput and resource efficiency.
3
4
## Capabilities
5
6
### SQL API Trigger
7
8
Trigger for polling Snowflake SQL API query status in deferrable mode, providing asynchronous monitoring of long-running query execution with configurable polling intervals and comprehensive status reporting.
9
10
```python { .api }
11
class SnowflakeSqlApiTrigger(BaseTrigger):
12
"""
13
Trigger for polling Snowflake SQL API query status in deferrable mode.
14
Monitors query execution progress and triggers task completion when queries finish.
15
"""
16
17
def __init__(
18
self,
19
poll_interval: float,
20
query_ids: list[str],
21
snowflake_conn_id: str,
22
token_life_time: timedelta,
23
token_renewal_delta: timedelta,
24
):
25
"""
26
Initialize SQL API trigger.
27
28
Parameters:
29
- poll_interval: Polling interval in seconds for checking query status
30
- query_ids: List of Snowflake query IDs to monitor
31
- snowflake_conn_id: Snowflake connection ID for API access
32
- token_life_time: JWT token lifetime for authentication
33
- token_renewal_delta: JWT token renewal interval
34
"""
35
```
36
37
#### Core Methods
38
39
```python { .api }
40
def serialize(self) -> tuple[str, dict[str, Any]]:
41
"""
42
Serialize trigger arguments and classpath for persistence.
43
44
Returns:
45
Tuple of (classpath, serialized_arguments) for trigger reconstruction
46
"""
47
48
async def run(self) -> AsyncIterator[TriggerEvent]:
49
"""
50
Wait for Snowflake queries to complete and yield trigger events.
51
Continuously polls query status until all queries complete or fail.
52
53
Yields:
54
TriggerEvent objects containing query status and completion information
55
"""
56
57
async def get_query_status(self, query_id: str) -> dict[str, Any]:
58
"""
59
Get query status asynchronously from Snowflake SQL API.
60
61
Parameters:
62
- query_id: Snowflake query ID to check
63
64
Returns:
65
Dictionary containing query status, metadata, and execution details
66
"""
67
68
def _set_context(self, context):
69
"""
70
Set trigger context (no-op implementation for compatibility).
71
72
Parameters:
73
- context: Trigger execution context
74
"""
75
```
76
77
## Usage Examples
78
79
### Basic Deferrable SQL Execution
80
81
```python
82
from airflow import DAG
83
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
84
from datetime import datetime, timedelta
85
86
with DAG(
87
'deferrable_snowflake_example',
88
start_date=datetime(2024, 1, 1),
89
schedule_interval='@daily',
90
catchup=False
91
) as dag:
92
93
# Long-running data processing with deferrable execution
94
heavy_processing = SnowflakeSqlApiOperator(
95
task_id='heavy_data_processing',
96
snowflake_conn_id='snowflake_prod',
97
sql='''
98
-- Large table creation and transformation
99
CREATE OR REPLACE TABLE analytics.customer_360_view AS
100
SELECT
101
c.customer_id,
102
c.customer_name,
103
c.registration_date,
104
COUNT(DISTINCT o.order_id) as total_orders,
105
SUM(o.order_amount) as lifetime_value,
106
AVG(o.order_amount) as avg_order_value,
107
MAX(o.order_date) as last_order_date,
108
MIN(o.order_date) as first_order_date,
109
COUNT(DISTINCT DATE_TRUNC('month', o.order_date)) as active_months
110
FROM customers c
111
LEFT JOIN orders o ON c.customer_id = o.customer_id
112
WHERE c.registration_date >= '2020-01-01'
113
GROUP BY c.customer_id, c.customer_name, c.registration_date;
114
115
-- Create summary statistics
116
CREATE OR REPLACE TABLE analytics.customer_segments AS
117
SELECT
118
CASE
119
WHEN lifetime_value >= 10000 THEN 'High Value'
120
WHEN lifetime_value >= 1000 THEN 'Medium Value'
121
ELSE 'Low Value'
122
END as segment,
123
COUNT(*) as customer_count,
124
AVG(lifetime_value) as avg_segment_value,
125
AVG(total_orders) as avg_orders_per_customer
126
FROM analytics.customer_360_view
127
GROUP BY 1;
128
''',
129
statement_count=2,
130
deferrable=True, # Enable deferrable execution
131
poll_interval=30, # Check status every 30 seconds
132
warehouse='X_LARGE_WH',
133
database='ANALYTICS'
134
)
135
```
136
137
### Multiple Query Monitoring
138
139
```python
140
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
141
142
# Multiple independent long-running operations
143
parallel_processing = [
144
SnowflakeSqlApiOperator(
145
task_id=f'process_region_{region}',
146
snowflake_conn_id='snowflake_prod',
147
sql=f'''
148
CREATE OR REPLACE TABLE analytics.regional_summary_{region.lower()} AS
149
SELECT
150
DATE_TRUNC('month', order_date) as month,
151
COUNT(*) as total_orders,
152
SUM(order_amount) as total_revenue,
153
COUNT(DISTINCT customer_id) as unique_customers
154
FROM orders
155
WHERE region = '{region}'
156
AND order_date >= '2023-01-01'
157
GROUP BY 1
158
ORDER BY 1;
159
''',
160
statement_count=1,
161
deferrable=True,
162
poll_interval=15, # Faster polling for smaller queries
163
warehouse='LARGE_WH',
164
session_parameters={
165
'QUERY_TAG': f'regional_processing_{region}_{datetime.now().isoformat()}'
166
}
167
)
168
for region in ['NORTH', 'SOUTH', 'EAST', 'WEST']
169
]
170
171
# All regional processing tasks run in parallel without blocking workers
172
for task in parallel_processing:
173
task
174
```
175
176
### Complex ETL Pipeline with Deferrable Tasks
177
178
```python
179
from airflow import DAG
180
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
181
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
182
183
with DAG(
184
'deferrable_etl_pipeline',
185
start_date=datetime(2024, 1, 1),
186
schedule_interval='@daily',
187
catchup=False,
188
max_active_runs=1
189
) as dag:
190
191
# Stage 1: Data ingestion (synchronous - typically fast)
192
ingest_data = CopyFromExternalStageToSnowflakeOperator(
193
task_id='ingest_raw_data',
194
table='raw.daily_transactions',
195
stage='@s3_data_stage',
196
prefix='transactions/{{ ds }}/',
197
file_format='csv_transactions',
198
warehouse='LOADING_WH'
199
)
200
201
# Stage 2: Heavy data processing (deferrable)
202
process_transactions = SnowflakeSqlApiOperator(
203
task_id='process_transactions',
204
snowflake_conn_id='snowflake_prod',
205
sql='''
206
-- Clean and standardize transaction data
207
CREATE OR REPLACE TABLE staging.clean_transactions AS
208
SELECT
209
transaction_id,
210
customer_id,
211
UPPER(TRIM(product_category)) as product_category,
212
ROUND(transaction_amount, 2) as transaction_amount,
213
transaction_date,
214
CASE
215
WHEN payment_method IN ('CREDIT', 'DEBIT', 'CASH') THEN payment_method
216
ELSE 'OTHER'
217
END as payment_method_clean
218
FROM raw.daily_transactions
219
WHERE transaction_amount > 0
220
AND customer_id IS NOT NULL
221
AND transaction_date = '{{ ds }}';
222
223
-- Create enriched transaction view with customer data
224
CREATE OR REPLACE TABLE staging.enriched_transactions AS
225
SELECT
226
t.*,
227
c.customer_segment,
228
c.customer_tier,
229
c.registration_date,
230
DATEDIFF('day', c.registration_date, t.transaction_date) as days_since_registration
231
FROM staging.clean_transactions t
232
JOIN dim.customers c ON t.customer_id = c.customer_id;
233
234
-- Aggregate daily metrics by segment
235
CREATE OR REPLACE TABLE analytics.daily_segment_metrics AS
236
SELECT
237
'{{ ds }}' as metric_date,
238
customer_segment,
239
product_category,
240
payment_method_clean,
241
COUNT(*) as transaction_count,
242
SUM(transaction_amount) as total_revenue,
243
AVG(transaction_amount) as avg_transaction_value,
244
COUNT(DISTINCT customer_id) as unique_customers
245
FROM staging.enriched_transactions
246
GROUP BY customer_segment, product_category, payment_method_clean;
247
''',
248
statement_count=3,
249
deferrable=True,
250
poll_interval=20,
251
warehouse='HEAVY_PROCESSING_WH',
252
token_life_time=timedelta(hours=2) # Extended token lifetime for long operations
253
)
254
255
# Stage 3: ML feature generation (deferrable)
256
generate_ml_features = SnowflakeSqlApiOperator(
257
task_id='generate_ml_features',
258
snowflake_conn_id='snowflake_prod',
259
sql='''
260
-- Generate rolling window features
261
CREATE OR REPLACE TABLE ml.customer_features_{{ ds | regex_replace('-', '_') }} AS
262
SELECT
263
customer_id,
264
'{{ ds }}' as feature_date,
265
-- 7-day rolling features
266
COUNT(*) OVER (
267
PARTITION BY customer_id
268
ORDER BY transaction_date
269
RANGE BETWEEN INTERVAL '7 days' PRECEDING AND CURRENT ROW
270
) as transactions_7d,
271
SUM(transaction_amount) OVER (
272
PARTITION BY customer_id
273
ORDER BY transaction_date
274
RANGE BETWEEN INTERVAL '7 days' PRECEDING AND CURRENT ROW
275
) as revenue_7d,
276
-- 30-day rolling features
277
COUNT(*) OVER (
278
PARTITION BY customer_id
279
ORDER BY transaction_date
280
RANGE BETWEEN INTERVAL '30 days' PRECEDING AND CURRENT ROW
281
) as transactions_30d,
282
SUM(transaction_amount) OVER (
283
PARTITION BY customer_id
284
ORDER BY transaction_date
285
RANGE BETWEEN INTERVAL '30 days' PRECEDING AND CURRENT ROW
286
) as revenue_30d,
287
-- Recency features
288
DATEDIFF('day',
289
LAG(transaction_date) OVER (PARTITION BY customer_id ORDER BY transaction_date),
290
transaction_date
291
) as days_since_last_transaction
292
FROM staging.enriched_transactions
293
ORDER BY customer_id, transaction_date;
294
295
-- Update master feature table
296
MERGE INTO ml.customer_features_master m
297
USING ml.customer_features_{{ ds | regex_replace('-', '_') }} f
298
ON m.customer_id = f.customer_id AND m.feature_date = f.feature_date
299
WHEN MATCHED THEN UPDATE SET
300
transactions_7d = f.transactions_7d,
301
revenue_7d = f.revenue_7d,
302
transactions_30d = f.transactions_30d,
303
revenue_30d = f.revenue_30d,
304
days_since_last_transaction = f.days_since_last_transaction
305
WHEN NOT MATCHED THEN INSERT (
306
customer_id, feature_date, transactions_7d, revenue_7d,
307
transactions_30d, revenue_30d, days_since_last_transaction
308
) VALUES (
309
f.customer_id, f.feature_date, f.transactions_7d, f.revenue_7d,
310
f.transactions_30d, f.revenue_30d, f.days_since_last_transaction
311
);
312
''',
313
statement_count=2,
314
deferrable=True,
315
poll_interval=25,
316
warehouse='ML_WH'
317
)
318
319
# Stage 4: Data quality validation (synchronous - fast)
320
validate_results = SnowflakeSqlApiOperator(
321
task_id='validate_processing_results',
322
snowflake_conn_id='snowflake_prod',
323
sql='''
324
-- Validate record counts match expectations
325
SELECT
326
CASE
327
WHEN staging_count > 0 AND
328
staging_count = analytics_count AND
329
ml_count > 0
330
THEN 'PASSED'
331
ELSE 'FAILED'
332
END as validation_result
333
FROM (
334
SELECT
335
(SELECT COUNT(*) FROM staging.clean_transactions) as staging_count,
336
(SELECT SUM(transaction_count) FROM analytics.daily_segment_metrics WHERE metric_date = '{{ ds }}') as analytics_count,
337
(SELECT COUNT(DISTINCT customer_id) FROM ml.customer_features_master WHERE feature_date = '{{ ds }}') as ml_count
338
);
339
''',
340
statement_count=1,
341
warehouse='VALIDATION_WH'
342
)
343
344
# Define pipeline dependencies
345
ingest_data >> process_transactions >> generate_ml_features >> validate_results
346
```
347
348
### Custom Trigger Event Handling
349
350
```python
351
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
352
353
class CustomSnowflakeOperator(SnowflakeSqlApiOperator):
354
"""Custom operator with enhanced trigger event handling."""
355
356
def execute_complete(self, context, event=None):
357
"""Custom completion handler with detailed logging."""
358
359
# Extract query results from trigger event
360
if event and 'query_results' in event:
361
query_results = event['query_results']
362
363
# Log execution statistics
364
for query_id, result in query_results.items():
365
if result.get('status') == 'SUCCESS':
366
execution_time = result.get('execution_time_ms', 0) / 1000
367
rows_affected = result.get('rows_affected', 0)
368
369
self.log.info(
370
f"Query {query_id} completed successfully: "
371
f"{rows_affected} rows affected in {execution_time:.2f} seconds"
372
)
373
374
# Store metrics for monitoring
375
context['task_instance'].xcom_push(
376
key=f'query_{query_id}_metrics',
377
value={
378
'execution_time_seconds': execution_time,
379
'rows_affected': rows_affected,
380
'status': 'SUCCESS'
381
}
382
)
383
else:
384
self.log.error(f"Query {query_id} failed: {result.get('error_message', 'Unknown error')}")
385
raise Exception(f"Query execution failed: {result.get('error_message')}")
386
387
# Call parent completion handler
388
super().execute_complete(context, event)
389
390
# Usage of custom operator
391
custom_deferrable_task = CustomSnowflakeOperator(
392
task_id='custom_processing_with_metrics',
393
sql='SELECT COUNT(*) FROM large_table WHERE date >= CURRENT_DATE - 30',
394
statement_count=1,
395
deferrable=True,
396
poll_interval=10
397
)
398
```
399
400
## Trigger Event Structure
401
402
The SnowflakeSqlApiTrigger yields TriggerEvent objects with the following structure:
403
404
```python
405
TriggerEvent({
406
"status": "success" | "error",
407
"query_results": {
408
"query_id_1": {
409
"status": "SUCCESS" | "FAILED" | "RUNNING",
410
"execution_time_ms": 1234,
411
"rows_affected": 567,
412
"error_message": "...", # Only present on failure
413
"query_text": "...",
414
"warehouse": "..."
415
},
416
# ... additional query results
417
},
418
"message": "All queries completed successfully" | "Error details"
419
})
420
```
421
422
## Performance Considerations
423
424
### Polling Intervals
425
- **Short intervals (5-15s)**: For queries expected to complete quickly
426
- **Medium intervals (30-60s)**: For typical data processing tasks
427
- **Long intervals (2-5 minutes)**: For very long-running operations (hours)
428
429
### Token Management
430
- Set appropriate `token_life_time` for expected query duration
431
- Use `token_renewal_delta` to ensure tokens don't expire during execution
432
- Monitor token usage for long-running pipelines
433
434
### Resource Optimization
435
- Deferrable tasks free up worker slots for other work
436
- Use appropriate warehouse sizes for the workload
437
- Consider warehouse auto-suspend settings for cost optimization
438
439
## Monitoring and Troubleshooting
440
441
### Query Status Monitoring
442
```sql
443
-- Monitor long-running queries
444
SELECT
445
query_id,
446
query_text,
447
user_name,
448
warehouse_name,
449
start_time,
450
end_time,
451
total_elapsed_time,
452
execution_status
453
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
454
WHERE start_time >= DATEADD(hour, -24, CURRENT_TIMESTAMP())
455
AND execution_status IN ('RUNNING', 'QUEUED')
456
ORDER BY start_time DESC;
457
```
458
459
### Trigger Event Logging
460
All trigger events are logged with detailed information about query execution, including:
461
- Query IDs and status
462
- Execution times and resource usage
463
- Error messages and troubleshooting information
464
- Warehouse and session details
465
466
## Error Handling
467
468
The deferrable execution system provides comprehensive error handling:
469
470
### Query Execution Errors
471
- SQL syntax errors, permission issues, resource constraints
472
- Detailed error messages with Snowflake error codes
473
- Automatic task failure with appropriate error context
474
475
### Trigger Infrastructure Errors
476
- Network connectivity issues during polling
477
- Authentication failures and token expiration
478
- Polling timeout and retry logic
479
480
### Resource Management Errors
481
- Warehouse suspension during query execution
482
- Query queue limits and concurrency restrictions
483
- Memory and compute resource exhaustion
484
485
All errors include detailed logging, Snowflake query IDs for investigation, and clear guidance for resolution.