0
# SQL Operators and Data Quality
1
2
Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities. These operators provide the core task execution layer for Snowflake operations in Airflow DAGs.
3
4
## Capabilities
5
6
### SQL API Operator
7
8
Primary operator for executing multiple SQL statements using Snowflake's SQL API, with support for asynchronous execution, deferrable mode, and comprehensive parameter binding.
9
10
```python { .api }
11
class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
12
"""
13
Execute multiple SQL statements using Snowflake SQL API.
14
Supports asynchronous execution and deferrable mode for efficient resource utilization.
15
"""
16
17
LIFETIME = timedelta(minutes=59) # JWT Token lifetime
18
RENEWAL_DELTA = timedelta(minutes=54) # JWT Token renewal time
19
template_fields: Sequence[str] # Includes snowflake_conn_id and SQL fields
20
conn_id_field = "snowflake_conn_id"
21
22
def __init__(
23
self,
24
*,
25
snowflake_conn_id: str = "snowflake_default",
26
warehouse: str | None = None,
27
database: str | None = None,
28
role: str | None = None,
29
schema: str | None = None,
30
authenticator: str | None = None,
31
session_parameters: dict[str, Any] | None = None,
32
poll_interval: int = 5,
33
statement_count: int = 0,
34
token_life_time: timedelta = LIFETIME,
35
token_renewal_delta: timedelta = RENEWAL_DELTA,
36
bindings: dict[str, Any] | None = None,
37
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
38
snowflake_api_retry_args: dict[str, Any] | None = None,
39
**kwargs: Any,
40
) -> None:
41
"""
42
Initialize SQL API operator.
43
44
Parameters:
45
- snowflake_conn_id: Snowflake connection ID
46
- warehouse: Snowflake warehouse name
47
- database: Snowflake database name
48
- role: Snowflake role name
49
- schema: Snowflake schema name
50
- authenticator: Authentication method
51
- session_parameters: Session-level parameters
52
- poll_interval: Polling interval for async execution (seconds)
53
- statement_count: Number of SQL statements (0 for auto-detect)
54
- token_life_time: JWT token lifetime
55
- token_renewal_delta: JWT token renewal interval
56
- bindings: Parameter bindings for SQL statements
57
- deferrable: Enable deferrable execution mode
58
- snowflake_api_retry_args: API retry configuration
59
"""
60
```
61
62
#### Execution Methods
63
64
```python { .api }
65
def execute(self, context: Context) -> None:
66
"""
67
Execute the SQL statements.
68
69
Parameters:
70
- context: Airflow task execution context
71
"""
72
73
def poll_on_queries(self):
74
"""
75
Poll on requested queries for completion status.
76
Used in synchronous execution mode.
77
"""
78
79
def execute_complete(
80
self,
81
context: Context,
82
event: dict[str, str | list[str]] | None = None
83
) -> None:
84
"""
85
Callback method when trigger fires in deferrable mode.
86
87
Parameters:
88
- context: Airflow task execution context
89
- event: Event data from trigger
90
"""
91
```
92
93
### Data Quality Check Operators
94
95
Specialized operators for performing data quality validations and monitoring database state with configurable thresholds and alerting.
96
97
#### Basic Check Operator
98
99
```python { .api }
100
class SnowflakeCheckOperator(SQLCheckOperator):
101
"""
102
Perform a check against Snowflake database.
103
Expects a SQL query that returns a single row for boolean evaluation.
104
"""
105
106
template_fields: Sequence[str] = ["sql", "snowflake_conn_id"]
107
template_ext: Sequence[str] = (".sql",)
108
ui_color = "#ededed"
109
conn_id_field = "snowflake_conn_id"
110
111
def __init__(
112
self,
113
*,
114
sql: str,
115
snowflake_conn_id: str = "snowflake_default",
116
parameters: Iterable | Mapping[str, Any] | None = None,
117
warehouse: str | None = None,
118
database: str | None = None,
119
role: str | None = None,
120
schema: str | None = None,
121
authenticator: str | None = None,
122
session_parameters: dict | None = None,
123
**kwargs,
124
) -> None:
125
"""
126
Initialize check operator.
127
128
Parameters:
129
- sql: SQL query returning single boolean result
130
- snowflake_conn_id: Snowflake connection ID
131
- parameters: Query parameters for parameterized SQL
132
- warehouse: Snowflake warehouse name
133
- database: Snowflake database name
134
- role: Snowflake role name
135
- schema: Snowflake schema name
136
- authenticator: Authentication method
137
- session_parameters: Session-level parameters
138
"""
139
```
140
141
#### Value Check Operator
142
143
```python { .api }
144
class SnowflakeValueCheckOperator(SQLValueCheckOperator):
145
"""
146
Perform a simple check using SQL code against a specified value.
147
Supports tolerance levels for numeric comparisons.
148
"""
149
150
template_fields: Sequence[str] = ["sql", "pass_value", "snowflake_conn_id"]
151
conn_id_field = "snowflake_conn_id"
152
153
def __init__(
154
self,
155
*,
156
sql: str,
157
pass_value: Any,
158
tolerance: Any = None,
159
snowflake_conn_id: str = "snowflake_default",
160
parameters: Iterable | Mapping[str, Any] | None = None,
161
warehouse: str | None = None,
162
database: str | None = None,
163
role: str | None = None,
164
schema: str | None = None,
165
authenticator: str | None = None,
166
session_parameters: dict | None = None,
167
**kwargs,
168
) -> None:
169
"""
170
Initialize value check operator.
171
172
Parameters:
173
- sql: SQL query returning single value for comparison
174
- pass_value: Expected value for comparison
175
- tolerance: Tolerance for numeric comparisons (absolute or percentage)
176
- snowflake_conn_id: Snowflake connection ID
177
- parameters: Query parameters for parameterized SQL
178
- warehouse: Snowflake warehouse name
179
- database: Snowflake database name
180
- role: Snowflake role name
181
- schema: Snowflake schema name
182
- authenticator: Authentication method
183
- session_parameters: Session-level parameters
184
"""
185
```
186
187
#### Interval Check Operator
188
189
```python { .api }
190
class SnowflakeIntervalCheckOperator(SQLIntervalCheckOperator):
191
"""
192
Check that metrics are within tolerance of values from days_back before.
193
Useful for detecting anomalies in time series data.
194
"""
195
196
template_fields: Sequence[str] = ["table", "metrics_thresholds", "snowflake_conn_id"]
197
conn_id_field = "snowflake_conn_id"
198
199
def __init__(
200
self,
201
*,
202
table: str,
203
metrics_thresholds: dict,
204
date_filter_column: str = "ds",
205
days_back: SupportsAbs[int] = -7,
206
snowflake_conn_id: str = "snowflake_default",
207
warehouse: str | None = None,
208
database: str | None = None,
209
role: str | None = None,
210
schema: str | None = None,
211
authenticator: str | None = None,
212
session_parameters: dict | None = None,
213
**kwargs,
214
) -> None:
215
"""
216
Initialize interval check operator.
217
218
Parameters:
219
- table: Table name to check
220
- metrics_thresholds: Dictionary of metric_name -> threshold_dict
221
- date_filter_column: Column name for date filtering
222
- days_back: Number of days back for comparison (negative integer)
223
- snowflake_conn_id: Snowflake connection ID
224
- warehouse: Snowflake warehouse name
225
- database: Snowflake database name
226
- role: Snowflake role name
227
- schema: Snowflake schema name
228
- authenticator: Authentication method
229
- session_parameters: Session-level parameters
230
"""
231
```
232
233
## Usage Examples
234
235
### Basic SQL Execution
236
237
```python
238
from airflow import DAG
239
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
240
from datetime import datetime, timedelta
241
242
with DAG(
243
'snowflake_sql_example',
244
start_date=datetime(2024, 1, 1),
245
schedule_interval='@daily',
246
catchup=False
247
) as dag:
248
249
# Execute multiple SQL statements
250
create_and_load = SnowflakeSqlApiOperator(
251
task_id='create_and_load_data',
252
snowflake_conn_id='snowflake_prod',
253
sql='''
254
-- Create staging table
255
CREATE OR REPLACE TABLE staging.daily_sales AS
256
SELECT
257
date_trunc('day', transaction_date) as sale_date,
258
region,
259
SUM(amount) as total_sales,
260
COUNT(*) as transaction_count
261
FROM raw.transactions
262
WHERE transaction_date >= '{{ ds }}'
263
AND transaction_date < '{{ next_ds }}'
264
GROUP BY 1, 2;
265
266
-- Update summary table
267
MERGE INTO analytics.sales_summary s
268
USING staging.daily_sales ds ON s.sale_date = ds.sale_date AND s.region = ds.region
269
WHEN MATCHED THEN UPDATE SET
270
total_sales = ds.total_sales,
271
transaction_count = ds.transaction_count
272
WHEN NOT MATCHED THEN INSERT (sale_date, region, total_sales, transaction_count)
273
VALUES (ds.sale_date, ds.region, ds.total_sales, ds.transaction_count);
274
''',
275
statement_count=2,
276
warehouse='ANALYTICS_WH',
277
database='ANALYTICS_DB'
278
)
279
```
280
281
### Asynchronous Execution
282
283
```python
284
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
285
286
# Large data processing with deferrable execution
287
process_large_dataset = SnowflakeSqlApiOperator(
288
task_id='process_large_dataset',
289
snowflake_conn_id='snowflake_prod',
290
sql='''
291
CREATE OR REPLACE TABLE analytics.customer_360 AS
292
SELECT
293
c.customer_id,
294
c.customer_name,
295
COUNT(DISTINCT o.order_id) as total_orders,
296
SUM(o.order_amount) as lifetime_value,
297
MAX(o.order_date) as last_order_date,
298
AVG(o.order_amount) as avg_order_value
299
FROM customers c
300
LEFT JOIN orders o ON c.customer_id = o.customer_id
301
WHERE o.order_date >= '2020-01-01'
302
GROUP BY c.customer_id, c.customer_name;
303
''',
304
statement_count=1,
305
deferrable=True, # Enable async execution
306
poll_interval=30, # Check every 30 seconds
307
warehouse='HEAVY_COMPUTE_WH'
308
)
309
```
310
311
### Data Quality Checks
312
313
```python
314
from airflow.providers.snowflake.operators.snowflake import (
315
SnowflakeCheckOperator,
316
SnowflakeValueCheckOperator,
317
SnowflakeIntervalCheckOperator
318
)
319
320
# Basic data quality check
321
data_freshness_check = SnowflakeCheckOperator(
322
task_id='check_data_freshness',
323
snowflake_conn_id='snowflake_prod',
324
sql='''
325
SELECT COUNT(*) > 0
326
FROM raw.transactions
327
WHERE date_trunc('day', transaction_date) = '{{ ds }}'
328
''',
329
warehouse='ANALYTICS_WH'
330
)
331
332
# Value validation with tolerance
333
revenue_check = SnowflakeValueCheckOperator(
334
task_id='validate_daily_revenue',
335
snowflake_conn_id='snowflake_prod',
336
sql='''
337
SELECT SUM(amount)
338
FROM raw.transactions
339
WHERE date_trunc('day', transaction_date) = '{{ ds }}'
340
''',
341
pass_value=50000, # Expected minimum daily revenue
342
tolerance=0.1, # 10% tolerance
343
warehouse='ANALYTICS_WH'
344
)
345
346
# Time series anomaly detection
347
anomaly_check = SnowflakeIntervalCheckOperator(
348
task_id='detect_sales_anomalies',
349
snowflake_conn_id='snowflake_prod',
350
table='analytics.daily_sales_summary',
351
metrics_thresholds={
352
'total_sales': {'min_threshold': 0.8, 'max_threshold': 1.2}, # ±20% from historical
353
'order_count': {'min_threshold': 0.7, 'max_threshold': 1.3} # ±30% from historical
354
},
355
date_filter_column='sale_date',
356
days_back=-7, # Compare with same day last week
357
warehouse='ANALYTICS_WH'
358
)
359
```
360
361
### Parameterized Queries
362
363
```python
364
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
365
366
# Using parameter bindings
367
parameterized_query = SnowflakeSqlApiOperator(
368
task_id='parameterized_analysis',
369
snowflake_conn_id='snowflake_prod',
370
sql='''
371
SELECT
372
region,
373
COUNT(*) as customer_count,
374
AVG(lifetime_value) as avg_ltv
375
FROM analytics.customer_360
376
WHERE lifetime_value >= ?
377
AND last_order_date >= ?
378
GROUP BY region
379
ORDER BY avg_ltv DESC;
380
''',
381
statement_count=1,
382
bindings={
383
'1': 1000, # Minimum lifetime value
384
'2': '2024-01-01' # Minimum last order date
385
},
386
warehouse='ANALYTICS_WH'
387
)
388
```
389
390
### Error Handling and Retry Configuration
391
392
```python
393
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
394
395
# Custom retry configuration for API calls
396
robust_operation = SnowflakeSqlApiOperator(
397
task_id='robust_data_processing',
398
snowflake_conn_id='snowflake_prod',
399
sql='''
400
CREATE OR REPLACE TABLE staging.processed_data AS
401
SELECT * FROM raw.data_stream
402
WHERE processed_at IS NULL;
403
404
UPDATE raw.data_stream
405
SET processed_at = CURRENT_TIMESTAMP()
406
WHERE id IN (SELECT id FROM staging.processed_data);
407
''',
408
statement_count=2,
409
snowflake_api_retry_args={
410
'retries': 3,
411
'backoff_factor': 2,
412
'status_forcelist': [500, 502, 503, 504]
413
},
414
# Airflow task retry configuration
415
retries=2,
416
retry_delay=timedelta(minutes=5),
417
warehouse='PROCESSING_WH'
418
)
419
```
420
421
## Template Variables
422
423
All operators support Airflow's template variables and macros:
424
425
- `{{ ds }}`: Execution date as YYYY-MM-DD
426
- `{{ ts }}`: Execution timestamp
427
- `{{ next_ds }}`: Next execution date
428
- `{{ params }}`: User-defined parameters
429
- `{{ var.value.variable_name }}`: Airflow variables
430
431
## Performance Optimization
432
433
### Warehouse Management
434
435
```python
436
# Use appropriate warehouse sizes
437
large_compute_task = SnowflakeSqlApiOperator(
438
task_id='heavy_processing',
439
warehouse='X_LARGE_WH', # Scale up for heavy workloads
440
sql='SELECT * FROM huge_table_join_operation',
441
statement_count=1
442
)
443
444
# Auto-suspend warehouses after use
445
cleanup_task = SnowflakeSqlApiOperator(
446
task_id='suspend_warehouse',
447
sql='ALTER WAREHOUSE X_LARGE_WH SUSPEND',
448
statement_count=1
449
)
450
451
large_compute_task >> cleanup_task
452
```
453
454
### Connection Pooling
455
456
Use connection pooling for high-frequency operations by reusing connections across tasks in the same worker process.
457
458
## Error Handling
459
460
All operators provide comprehensive error handling with detailed exception information:
461
462
- **SQL Execution Errors**: Syntax errors, constraint violations, permission issues
463
- **Connection Errors**: Authentication failures, network timeouts, warehouse suspension
464
- **Resource Errors**: Warehouse capacity limits, query complexity limits
465
- **API Errors**: Rate limiting, malformed requests, service unavailable
466
467
Error messages include Snowflake-specific error codes and suggestions for resolution.