0
# SQL Sensors
1
2
SQL sensors monitor database conditions and states by periodically executing SQL queries until specified criteria are met. They enable data-driven workflow orchestration based on database changes.
3
4
## Capabilities
5
6
### SQL Sensor
7
8
Monitors database state by repeatedly executing SQL queries until success criteria are met.
9
10
```python { .api }
11
class SqlSensor:
12
"""
13
Runs SQL statement repeatedly until criteria is met.
14
15
Args:
16
conn_id (str): Database connection ID
17
sql (str): SQL statement to execute (templated)
18
parameters (Mapping, optional): Query parameters (templated)
19
success (callable, optional): Success criteria function
20
failure (callable, optional): Failure criteria function
21
selector (callable): Function to transform result row (default: itemgetter(0))
22
fail_on_empty (bool): Fail if query returns no rows (default: False)
23
hook_params (Mapping, optional): Additional hook parameters
24
**kwargs: Additional sensor arguments (poke_interval, timeout, etc.)
25
"""
26
27
def __init__(self, *, conn_id, sql, parameters=None, success=None, failure=None,
28
selector=None, fail_on_empty=False, hook_params=None, **kwargs):
29
pass
30
31
def poke(self, context):
32
"""
33
Check if sensor condition is met.
34
35
Args:
36
context (dict): Airflow task context
37
38
Returns:
39
bool: True if condition is met, False otherwise
40
"""
41
pass
42
```
43
44
## Usage Examples
45
46
### Basic File Monitoring
47
48
```python
49
from airflow.providers.common.sql.sensors.sql import SqlSensor
50
51
# Wait for new records to appear
52
file_sensor = SqlSensor(
53
task_id='wait_for_new_data',
54
conn_id='my_database',
55
sql='SELECT COUNT(*) FROM uploads WHERE date = {{ ds }} AND status = "completed"',
56
success=lambda x: x[0][0] > 0, # Success when count > 0
57
poke_interval=60, # Check every minute
58
timeout=3600 # Timeout after 1 hour
59
)
60
```
61
62
### Data Quality Monitoring
63
64
```python
65
# Wait for data quality checks to pass
66
quality_sensor = SqlSensor(
67
task_id='wait_for_quality',
68
conn_id='my_database',
69
sql='''
70
SELECT
71
SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as null_emails,
72
SUM(CASE WHEN age < 0 OR age > 150 THEN 1 ELSE 0 END) as invalid_ages
73
FROM users
74
WHERE created_date = {{ ds }}
75
''',
76
success=lambda x: x[0][0] == 0 and x[0][1] == 0, # No null emails or invalid ages
77
poke_interval=300, # Check every 5 minutes
78
timeout=7200 # Timeout after 2 hours
79
)
80
```
81
82
### Complex Condition Monitoring
83
84
```python
85
# Wait for processing to complete with custom success criteria
86
def check_processing_complete(records):
87
\"\"\"Check if all processing stages are complete.\"\"\"
88
if not records:
89
return False
90
91
row = records[0]
92
pending_count = row[0]
93
error_count = row[1]
94
95
# Success when no pending items and no errors
96
return pending_count == 0 and error_count == 0
97
98
processing_sensor = SqlSensor(
99
task_id='wait_for_processing',
100
conn_id='my_database',
101
sql='''
102
SELECT
103
COUNT(*) FILTER (WHERE status = 'pending') as pending,
104
COUNT(*) FILTER (WHERE status = 'error') as errors
105
FROM job_queue
106
WHERE batch_id = {{ params.batch_id }}
107
''',
108
parameters={'batch_id': '{{ dag_run.conf.batch_id }}'},
109
success=check_processing_complete,
110
poke_interval=120, # Check every 2 minutes
111
timeout=10800 # Timeout after 3 hours
112
)
113
```
114
115
### Monitoring with Failure Conditions
116
117
```python
118
# Monitor with both success and failure criteria
119
def check_success(records):
120
return records and records[0][0] >= 1000 # At least 1000 processed
121
122
def check_failure(records):
123
return records and records[0][1] > 10 # More than 10 errors
124
125
monitor_sensor = SqlSensor(
126
task_id='monitor_batch_job',
127
conn_id='my_database',
128
sql='''
129
SELECT
130
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
131
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
132
FROM batch_processing
133
WHERE job_id = {{ params.job_id }}
134
''',
135
success=check_success,
136
failure=check_failure, # Fail task if too many errors
137
poke_interval=180,
138
timeout=14400
139
)
140
```
141
142
### Parameterized Monitoring
143
144
```python
145
# Use templated parameters for dynamic monitoring
146
dynamic_sensor = SqlSensor(
147
task_id='wait_for_threshold',
148
conn_id='my_database',
149
sql='''
150
SELECT COUNT(*)
151
FROM events
152
WHERE event_type = %(event_type)s
153
AND timestamp >= %(start_time)s
154
AND timestamp <= %(end_time)s
155
''',
156
parameters={
157
'event_type': '{{ dag_run.conf.event_type }}',
158
'start_time': '{{ ds }} 00:00:00',
159
'end_time': '{{ ds }} 23:59:59'
160
},
161
success=lambda x: x[0][0] >= 100, # At least 100 events
162
poke_interval=300
163
)
164
```
165
166
### Database Connection Monitoring
167
168
```python
169
# Monitor database connectivity and basic health
170
health_sensor = SqlSensor(
171
task_id='check_db_health',
172
conn_id='my_database',
173
sql='SELECT 1', # Simple connectivity check
174
success=lambda x: x is not None and len(x) > 0,
175
failure=lambda x: x is None,
176
poke_interval=30,
177
timeout=300
178
)
179
```
180
181
## Success and Failure Criteria Functions
182
183
Success and failure criteria are callable functions that receive the query results and return a boolean:
184
185
```python
186
def success_criteria(records):
187
\"\"\"
188
Define success condition.
189
190
Args:
191
records (list): Query result records (list of tuples)
192
193
Returns:
194
bool: True if success condition is met
195
\"\"\"
196
return len(records) > 0 and records[0][0] > threshold
197
198
def failure_criteria(records):
199
\"\"\"
200
Define failure condition.
201
202
Args:
203
records (list): Query result records (list of tuples)
204
205
Returns:
206
bool: True if failure condition is met (task should fail)
207
\"\"\"
208
return records and records[0][1] > error_threshold
209
```
210
211
## Common Patterns
212
213
### Count-based Monitoring
214
```python
215
# Wait for specific record count
216
success=lambda x: x and x[0][0] >= expected_count
217
```
218
219
### Threshold Monitoring
220
```python
221
# Wait for metric to exceed threshold
222
success=lambda x: x and x[0][0] > threshold_value
223
```
224
225
### Status-based Monitoring
226
```python
227
# Wait for specific status
228
success=lambda x: x and x[0][0] == 'COMPLETED'
229
```
230
231
### Empty Result Handling
232
```python
233
# Handle cases where query might return no results
234
success=lambda x: x is not None and len(x) > 0 and x[0][0] > 0
235
```