0
# Job Monitoring Sensor
1
2
The `DbtCloudJobRunSensor` provides monitoring capabilities for dbt Cloud job runs within Airflow workflows. It can operate in both traditional polling mode and efficient deferrable mode, making it suitable for various monitoring scenarios and resource constraints.
3
4
## Capabilities
5
6
### Job Run Status Monitoring
7
8
The sensor monitors dbt Cloud job run status and waits for completion before allowing downstream tasks to proceed.
9
10
```python { .api }
11
class DbtCloudJobRunSensor:
12
def __init__(
13
self,
14
dbt_cloud_conn_id: str = "dbt_cloud_default",
15
run_id: int,
16
account_id: int | None = None,
17
deferrable: bool = False,
18
**kwargs
19
):
20
"""
21
Monitor the status of a dbt Cloud job run.
22
23
Args:
24
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
25
run_id: dbt Cloud job run ID to monitor
26
account_id: dbt Cloud account ID (defaults to connection default)
27
deferrable: Use async execution mode for resource efficiency
28
**kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
29
"""
30
31
def poke(self, context: Context) -> bool:
32
"""
33
Check if the job run has reached a terminal status.
34
35
Args:
36
context: Airflow task execution context
37
38
Returns:
39
bool: True if job run is complete (success/failure), False if still running
40
41
Raises:
42
DbtCloudJobRunException: If job run fails or is cancelled
43
"""
44
45
def execute(self, context: Context) -> None:
46
"""
47
Execute the sensor (used for deferrable mode).
48
49
Args:
50
context: Airflow task execution context
51
"""
52
53
def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
54
"""
55
Complete execution for deferrable sensors.
56
57
Args:
58
context: Airflow task execution context
59
event: Trigger event containing job status
60
61
Returns:
62
int: dbt Cloud job run ID
63
"""
64
65
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
66
"""
67
Generate OpenLineage metadata facets for data lineage tracking.
68
69
Args:
70
task_instance: Airflow task instance
71
72
Returns:
73
OperatorLineage: OpenLineage facets for lineage tracking
74
"""
75
```
76
77
## Usage Examples
78
79
### Basic Job Run Monitoring
80
81
```python
82
from airflow import DAG
83
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
84
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
85
from datetime import datetime, timedelta
86
87
dag = DAG(
88
'dbt_workflow_with_monitoring',
89
start_date=datetime(2024, 1, 1),
90
schedule_interval='@daily',
91
)
92
93
# Execute dbt job
94
run_dbt_job = DbtCloudRunJobOperator(
95
task_id='run_dbt_job',
96
job_id=12345,
97
wait_for_termination=False, # Don't wait in operator
98
dag=dag,
99
)
100
101
# Monitor job completion with sensor
102
wait_for_completion = DbtCloudJobRunSensor(
103
task_id='wait_for_dbt_completion',
104
dbt_cloud_conn_id='dbt_cloud_default',
105
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
106
timeout=3600, # 1 hour timeout
107
poke_interval=60, # Check every minute
108
dag=dag,
109
)
110
111
run_dbt_job >> wait_for_completion
112
```
113
114
### Deferrable Sensor for Resource Efficiency
115
116
```python
117
# Use deferrable mode for long-running jobs
118
wait_for_long_job = DbtCloudJobRunSensor(
119
task_id='wait_for_long_dbt_job',
120
run_id="{{ task_instance.xcom_pull(task_ids='run_long_job') }}",
121
deferrable=True, # Enable async monitoring
122
timeout=14400, # 4 hours timeout
123
poke_interval=300, # Check every 5 minutes
124
dag=dag,
125
)
126
```
127
128
### Multiple Job Monitoring
129
130
```python
131
from airflow.operators.dummy import DummyOperator
132
133
# Monitor multiple parallel dbt jobs
134
start = DummyOperator(task_id='start', dag=dag)
135
136
# Start multiple jobs
137
job_runs = []
138
for i, job_id in enumerate([111, 222, 333]):
139
run_job = DbtCloudRunJobOperator(
140
task_id=f'run_job_{i+1}',
141
job_id=job_id,
142
wait_for_termination=False,
143
dag=dag,
144
)
145
146
wait_job = DbtCloudJobRunSensor(
147
task_id=f'wait_job_{i+1}',
148
run_id=f"{{{{ task_instance.xcom_pull(task_ids='run_job_{i+1}') }}}}",
149
deferrable=True,
150
dag=dag,
151
)
152
153
start >> run_job >> wait_job
154
job_runs.append(wait_job)
155
156
# Continue after all jobs complete
157
all_complete = DummyOperator(task_id='all_jobs_complete', dag=dag)
158
job_runs >> all_complete
159
```
160
161
### Sensor with Custom Configuration
162
163
```python
164
# Sensor with extended timeout and custom polling
165
monitor_critical_job = DbtCloudJobRunSensor(
166
task_id='monitor_critical_job',
167
run_id="{{ task_instance.xcom_pull(task_ids='run_critical_models') }}",
168
account_id=12345,
169
timeout=28800, # 8 hours for critical job
170
poke_interval=120, # Check every 2 minutes
171
mode='poke', # Traditional polling mode
172
dag=dag,
173
)
174
```
175
176
### Error Handling and Alerting
177
178
```python
179
from airflow.operators.python import PythonOperator
180
from airflow.operators.email import EmailOperator
181
182
def check_job_failure(**context):
183
"""Custom logic for handling dbt job failures."""
184
run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_job')
185
# Add custom failure analysis logic
186
return f"dbt job run {run_id} monitoring failed"
187
188
# Sensor with failure handling
189
monitor_with_alerts = DbtCloudJobRunSensor(
190
task_id='monitor_with_alerts',
191
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
192
timeout=3600,
193
on_failure_callback=check_job_failure,
194
dag=dag,
195
)
196
197
# Email alert on sensor failure
198
send_failure_alert = EmailOperator(
199
task_id='send_failure_alert',
200
to=['data-team@company.com'],
201
subject='dbt Cloud Job Monitoring Failed',
202
html_content='dbt Cloud job monitoring failed. Please check the logs.',
203
trigger_rule='one_failed', # Trigger only on upstream failure
204
dag=dag,
205
)
206
207
monitor_with_alerts >> send_failure_alert
208
```
209
210
### Conditional Logic Based on Job Status
211
212
```python
213
from airflow.operators.python import BranchPythonOperator
214
215
def decide_next_task(**context):
216
"""Branch logic based on job completion."""
217
try:
218
# Sensor succeeds, continue with success path
219
return 'process_results'
220
except Exception:
221
# Job failed, go to failure handling
222
return 'handle_failure'
223
224
# Conditional workflow based on monitoring result
225
branch_on_result = BranchPythonOperator(
226
task_id='branch_on_result',
227
python_callable=decide_next_task,
228
dag=dag,
229
)
230
231
process_success = DummyOperator(task_id='process_results', dag=dag)
232
handle_failure = DummyOperator(task_id='handle_failure', dag=dag)
233
234
wait_for_completion >> branch_on_result
235
branch_on_result >> [process_success, handle_failure]
236
```
237
238
### Integration with Data Quality Checks
239
240
```python
241
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator
242
243
# Monitor job, then validate results
244
wait_for_models = DbtCloudJobRunSensor(
245
task_id='wait_for_models',
246
run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",
247
dag=dag,
248
)
249
250
# Download test results for validation
251
get_test_results = DbtCloudGetJobRunArtifactOperator(
252
task_id='get_test_results',
253
run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",
254
path='run_results.json',
255
dag=dag,
256
)
257
258
def validate_test_results(**context):
259
"""Validate dbt test results from downloaded artifact."""
260
# Read and validate test results
261
pass
262
263
validate_tests = PythonOperator(
264
task_id='validate_tests',
265
python_callable=validate_test_results,
266
dag=dag,
267
)
268
269
wait_for_models >> get_test_results >> validate_tests
270
```
271
272
### Cross-DAG Dependencies
273
274
```python
275
from airflow.sensors.external_task import ExternalTaskSensor
276
277
# Wait for dbt job in another DAG
278
wait_external_dbt = ExternalTaskSensor(
279
task_id='wait_external_dbt',
280
external_dag_id='upstream_dbt_dag',
281
external_task_id='wait_for_dbt_completion',
282
dag=dag,
283
)
284
285
# Then run local dbt job
286
run_downstream_job = DbtCloudRunJobOperator(
287
task_id='run_downstream_job',
288
job_id=67890,
289
dag=dag,
290
)
291
292
wait_external_dbt >> run_downstream_job
293
```
294
295
## Configuration Options
296
297
### Sensor Parameters
298
299
The sensor inherits from `BaseSensorOperator` and supports all standard sensor configuration:
300
301
- **timeout**: Maximum time to wait for job completion (seconds)
302
- **poke_interval**: Time between status checks in polling mode (seconds)
303
- **mode**: Execution mode ('poke' for polling, 'reschedule' for rescheduling)
304
- **soft_fail**: Whether to mark as skipped instead of failed on timeout
305
- **deferrable**: Enable async execution for resource efficiency
306
307
### Template Fields
308
309
The sensor supports Airflow templating for dynamic values:
310
311
- `dbt_cloud_conn_id`
312
- `run_id`
313
- `account_id`
314
315
## Best Practices
316
317
### Resource Efficiency
318
- Use `deferrable=True` for long-running jobs to free up worker slots
319
- Set appropriate `poke_interval` to balance responsiveness and API load
320
- Use `mode='reschedule'` for very long jobs to avoid blocking workers
321
322
### Error Handling
323
- Set reasonable `timeout` values based on expected job duration
324
- Use `soft_fail=True` for non-critical monitoring tasks
325
- Implement custom failure callbacks for alert and recovery logic
326
327
### Monitoring Patterns
328
- Combine sensors with operators that don't wait (`wait_for_termination=False`)
329
- Use sensors for complex conditional logic based on job outcomes
330
- Monitor multiple parallel jobs with individual sensors for better error isolation
331
332
## Types
333
334
```python { .api }
335
from typing import Any, Dict
336
from airflow.sensors.base import BaseSensorOperator
337
from airflow.utils.context import Context
338
339
# Sensor inherits from BaseSensorOperator
340
# All standard sensor configuration options are available
341
```