0
# SQL Operators
1
2
SQL operators execute database operations as tasks within Airflow DAGs. This includes query execution, data validation, conditional workflows, and data transfers between databases.
3
4
## Capabilities
5
6
### Base SQL Operator
7
8
Foundation class providing database hook functionality for SQL operators.
9
10
```python { .api }
11
class BaseSQLOperator:
12
"""
13
Base class providing DB hook functionality for SQL operators.
14
"""
15
16
def get_hook(self):
17
"""
18
Get hook for connection.
19
20
Returns:
21
Database hook instance
22
"""
23
24
def get_db_hook(self):
25
"""
26
Get database hook.
27
28
Returns:
29
Database hook instance
30
"""
31
```
32
33
### SQL Query Execution
34
35
Execute SQL queries and commands within Airflow tasks.
36
37
```python { .api }
38
class SQLExecuteQueryOperator:
39
"""
40
Executes SQL code in a database.
41
42
Args:
43
sql (str or list): SQL query/queries to execute (templated)
44
autocommit (bool): Auto-commit mode for queries (default: False)
45
parameters (Mapping or Iterable, optional): Query parameters (templated)
46
handler (callable, optional): Result handler function (default: fetch_all_handler)
47
output_processor (callable, optional): Function to process results
48
conn_id (str, optional): Database connection ID
49
database (str, optional): Database name to override connection default
50
split_statements (bool, optional): Split multiple statements
51
return_last (bool): Return only last query result (default: True)
52
show_return_value_in_logs (bool): Log returned values (default: False)
53
requires_result_fetch (bool): Ensure results are fetched (default: False)
54
**kwargs: Additional operator arguments
55
"""
56
57
def __init__(self, *, sql, autocommit=False, parameters=None, handler=None,
58
output_processor=None, conn_id=None, database=None, split_statements=None,
59
return_last=True, show_return_value_in_logs=False, requires_result_fetch=False, **kwargs):
60
pass
61
```
62
63
### Data Quality Check Operators
64
65
Operators for validating data quality and consistency.
66
67
```python { .api }
68
class SQLCheckOperator:
69
"""
70
Performs checks using SQL statements.
71
72
Args:
73
sql (str): SQL query that should return a single row (templated)
74
conn_id (str): Database connection ID
75
**kwargs: Additional operator arguments
76
"""
77
78
def __init__(self, sql, conn_id, **kwargs):
79
pass
80
81
class SQLValueCheckOperator:
82
"""
83
Checks that SQL query returns expected value.
84
85
Args:
86
sql (str): SQL query to execute (templated)
87
pass_value: Expected value for comparison
88
tolerance (float, optional): Tolerance for numeric comparisons
89
conn_id (str): Database connection ID
90
**kwargs: Additional operator arguments
91
"""
92
93
def __init__(self, sql, pass_value, tolerance=None, conn_id=None, **kwargs):
94
pass
95
96
class SQLColumnCheckOperator:
97
"""
98
Performs data quality checks on table columns.
99
100
Args:
101
table (str): Table name to check
102
column_mapping (dict): Column checks mapping
103
partition_clause (str, optional): SQL partition clause
104
conn_id (str): Database connection ID
105
**kwargs: Additional operator arguments
106
"""
107
108
def __init__(self, table, column_mapping, partition_clause=None, conn_id=None, **kwargs):
109
pass
110
111
class SQLTableCheckOperator:
112
"""
113
Performs data quality checks on tables.
114
115
Args:
116
table (str): Table name to check
117
checks (dict): Table-level checks mapping
118
partition_clause (str, optional): SQL partition clause
119
conn_id (str): Database connection ID
120
**kwargs: Additional operator arguments
121
"""
122
123
def __init__(self, table, checks, partition_clause=None, conn_id=None, **kwargs):
124
pass
125
126
class SQLIntervalCheckOperator:
127
"""
128
Checks data over time intervals.
129
130
Args:
131
table (str): Table name to check
132
metrics_thresholds (dict): Metrics and threshold definitions
133
date_filter_column (str): Column for date filtering
134
days_back (int): Number of days to look back
135
ratio_formula (str): Formula for ratio calculations
136
conn_id (str): Database connection ID
137
**kwargs: Additional operator arguments
138
"""
139
140
def __init__(self, table, metrics_thresholds, date_filter_column='ds',
141
days_back=-7, ratio_formula='max_over_min', conn_id=None, **kwargs):
142
pass
143
144
class SQLThresholdCheckOperator:
145
"""
146
Checks if metrics are within thresholds.
147
148
Args:
149
sql (str): SQL query returning metrics (templated)
150
min_threshold (float): Minimum threshold value
151
max_threshold (float): Maximum threshold value
152
conn_id (str): Database connection ID
153
**kwargs: Additional operator arguments
154
"""
155
156
def __init__(self, sql, min_threshold, max_threshold, conn_id=None, **kwargs):
157
pass
158
```
159
160
### Conditional Workflow Operators
161
162
Operators for implementing conditional logic based on SQL query results.
163
164
```python { .api }
165
class BranchSQLOperator:
166
"""
167
Branches workflow based on SQL query results.
168
169
Args:
170
sql (str): SQL query to execute (templated)
171
follow_task_ids_if_true (list): Task IDs to follow if condition is true
172
follow_task_ids_if_false (list): Task IDs to follow if condition is false
173
conn_id (str): Database connection ID
174
**kwargs: Additional operator arguments
175
"""
176
177
def __init__(self, sql, follow_task_ids_if_true, follow_task_ids_if_false,
178
conn_id=None, **kwargs):
179
pass
180
```
181
182
### Data Transfer Operators
183
184
Operators for transferring data between different database connections.
185
186
```python { .api }
187
class GenericTransfer:
188
"""
189
Transfers data between different database connections.
190
191
Args:
192
sql (str): SQL query for source data (templated)
193
destination_table (str): Target table name (templated)
194
source_conn_id (str): Source connection ID (templated)
195
destination_conn_id (str): Destination connection ID (templated)
196
source_hook_params (dict, optional): Source hook parameters
197
destination_hook_params (dict, optional): Destination hook parameters
198
preoperator (str or list, optional): SQL to execute before transfer (templated)
199
insert_args (dict, optional): Additional arguments for insert operation (templated)
200
page_size (int, optional): Number of records for paginated mode
201
**kwargs: Additional operator arguments
202
"""
203
204
def __init__(self, *, sql, destination_table, source_conn_id, destination_conn_id,
205
source_hook_params=None, destination_hook_params=None, preoperator=None,
206
insert_args=None, page_size=None, **kwargs):
207
pass
208
```
209
210
### Utility Functions
211
212
```python { .api }
213
def default_output_processor(results):
214
"""
215
Default output processor for query results.
216
217
Args:
218
results: Raw query results
219
220
Returns:
221
Any: Processed results
222
"""
223
```
224
225
## Usage Examples
226
227
### Basic SQL Execution
228
229
```python
230
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
231
232
sql_task = SQLExecuteQueryOperator(
233
task_id='execute_sql',
234
conn_id='my_postgres_conn',
235
sql='INSERT INTO logs (message, timestamp) VALUES (%(msg)s, %(ts)s)',
236
parameters={'msg': 'Task completed', 'ts': '2023-01-01 12:00:00'},
237
autocommit=True
238
)
239
```
240
241
### Data Quality Checks
242
243
```python
244
from airflow.providers.common.sql.operators.sql import (
245
SQLCheckOperator,
246
SQLValueCheckOperator,
247
SQLColumnCheckOperator
248
)
249
250
# Check that query returns non-empty result
251
check_data = SQLCheckOperator(
252
task_id='check_data',
253
conn_id='my_database',
254
sql='SELECT COUNT(*) FROM users WHERE created_date = {{ ds }}'
255
)
256
257
# Check specific value
258
value_check = SQLValueCheckOperator(
259
task_id='check_total',
260
conn_id='my_database',
261
sql='SELECT SUM(amount) FROM orders WHERE date = {{ ds }}',
262
pass_value=10000,
263
tolerance=0.1 # 10% tolerance
264
)
265
266
# Column quality checks
267
column_check = SQLColumnCheckOperator(
268
task_id='check_columns',
269
conn_id='my_database',
270
table='users',
271
column_mapping={
272
'id': {'null_check': {'equal_to': 0}},
273
'email': {'unique_check': {'equal_to': 0}},
274
'age': {'min': {'greater_than': 0}, 'max': {'less_than': 150}}
275
}
276
)
277
```
278
279
### Conditional Workflows
280
281
```python
282
from airflow.providers.common.sql.operators.sql import BranchSQLOperator
283
284
branch_task = BranchSQLOperator(
285
task_id='check_condition',
286
conn_id='my_database',
287
sql='SELECT COUNT(*) FROM new_orders WHERE date = {{ ds }}',
288
follow_task_ids_if_true=['process_orders'],
289
follow_task_ids_if_false=['send_notification']
290
)
291
```
292
293
### Data Transfer
294
295
```python
296
from airflow.providers.common.sql.operators.generic_transfer import GenericTransfer
297
298
transfer_task = GenericTransfer(
299
task_id='transfer_data',
300
sql='SELECT * FROM source_table WHERE date = {{ ds }}',
301
destination_table='target_table',
302
source_conn_id='source_db',
303
destination_conn_id='target_db',
304
preoperator='TRUNCATE TABLE target_table'
305
)
306
```
307
308
### Multiple SQL Statements
309
310
```python
311
sql_multi = SQLExecuteQueryOperator(
312
task_id='multi_sql',
313
conn_id='my_database',
314
sql=[
315
'UPDATE inventory SET last_updated = NOW()',
316
'INSERT INTO audit_log (action, timestamp) VALUES ("inventory_update", NOW())',
317
'CALL update_stats_procedure()'
318
],
319
split_statements=True,
320
autocommit=True
321
)
322
```