0
# Async Job Trigger
1
2
The `DbtCloudRunJobTrigger` provides asynchronous monitoring capabilities for dbt Cloud job runs in deferrable Airflow tasks. This trigger enables efficient resource utilization by monitoring job status without occupying worker slots, making it ideal for long-running dbt Cloud jobs.
3
4
## Capabilities
5
6
### Async Job Status Monitoring
7
8
The trigger runs asynchronously to monitor dbt Cloud job status and emit events when job states change.
9
10
```python { .api }
11
class DbtCloudRunJobTrigger:
12
def __init__(
13
self,
14
conn_id: str,
15
run_id: int,
16
end_time: float,
17
poll_interval: float,
18
account_id: int | None
19
):
20
"""
21
Async trigger for monitoring dbt Cloud job status.
22
23
Args:
24
conn_id: Airflow connection ID for dbt Cloud
25
run_id: dbt Cloud job run ID to monitor
26
end_time: Unix timestamp when monitoring should timeout
27
poll_interval: Seconds between status checks
28
account_id: dbt Cloud account ID (optional)
29
"""
30
31
def serialize(self) -> tuple[str, dict[str, Any]]:
32
"""
33
Serialize trigger for persistence across Airflow restarts.
34
35
Returns:
36
tuple[str, dict]: (class_path, kwargs) for trigger reconstruction
37
"""
38
39
async def run(self) -> AsyncIterator[TriggerEvent]:
40
"""
41
Main async execution loop that monitors job status.
42
43
Yields:
44
TriggerEvent: Events containing job status updates and completion
45
"""
46
47
async def is_still_running(self, hook: DbtCloudHook) -> bool:
48
"""
49
Check if the job run is still in progress.
50
51
Args:
52
hook: DbtCloudHook instance for API communication
53
54
Returns:
55
bool: True if job is still running, False if terminal state reached
56
"""
57
```
58
59
## Usage Examples
60
61
### Basic Deferrable Operator Usage
62
63
The trigger is typically used internally by deferrable operators, but understanding its behavior helps with debugging and advanced configurations.
64
65
```python
66
from airflow import DAG
67
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
68
from datetime import datetime
69
70
dag = DAG(
71
'deferrable_dbt_workflow',
72
start_date=datetime(2024, 1, 1),
73
schedule_interval='@daily',
74
)
75
76
# The trigger is automatically used when deferrable=True
77
run_dbt_job = DbtCloudRunJobOperator(
78
task_id='run_dbt_job',
79
job_id=12345,
80
deferrable=True, # This enables the trigger
81
timeout=7200, # 2 hours
82
check_interval=300, # Check every 5 minutes
83
dag=dag,
84
)
85
```
86
87
### Sensor with Trigger
88
89
```python
90
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
91
92
# Sensor using the trigger for async monitoring
93
monitor_job = DbtCloudJobRunSensor(
94
task_id='monitor_dbt_job',
95
run_id="{{ task_instance.xcom_pull(task_ids='start_job') }}",
96
deferrable=True, # Enables trigger usage
97
timeout=14400, # 4 hours
98
poke_interval=180, # Check every 3 minutes
99
dag=dag,
100
)
101
```
102
103
### Custom Trigger Configuration
104
105
While the trigger is typically managed automatically, understanding its configuration helps with troubleshooting:
106
107
```python
108
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
109
import time
110
111
# Example of trigger parameters (normally handled automatically)
112
trigger = DbtCloudRunJobTrigger(
113
conn_id='dbt_cloud_default',
114
run_id=98765,
115
end_time=time.time() + 3600, # 1 hour from now
116
poll_interval=60, # Check every minute
117
account_id=12345
118
)
119
```
120
121
### Monitoring Trigger Events
122
123
```python
124
from airflow.operators.python import PythonOperator
125
126
def handle_dbt_completion(**context):
127
"""Handle completion event from dbt trigger."""
128
event = context.get('event', {})
129
run_id = event.get('run_id')
130
status = event.get('status')
131
132
print(f"dbt job run {run_id} completed with status: {status}")
133
134
if status == 'success':
135
return "Job completed successfully"
136
else:
137
raise Exception(f"Job failed with status: {status}")
138
139
# Process trigger completion
140
process_completion = PythonOperator(
141
task_id='process_completion',
142
python_callable=handle_dbt_completion,
143
dag=dag,
144
)
145
146
run_dbt_job >> process_completion
147
```
148
149
## Trigger Event Structure
150
151
The trigger emits events with the following structure:
152
153
```python
154
{
155
"status": "success" | "error" | "cancelled" | "timeout",
156
"run_id": int,
157
"message": str,
158
"job_run_url": str # Optional URL to dbt Cloud UI
159
}
160
```
161
162
### Event Types
163
164
#### Success Event
165
```python
166
{
167
"status": "success",
168
"run_id": 12345,
169
"message": "dbt Cloud job run completed successfully",
170
"job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"
171
}
172
```
173
174
#### Error Event
175
```python
176
{
177
"status": "error",
178
"run_id": 12345,
179
"message": "dbt Cloud job run failed",
180
"job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"
181
}
182
```
183
184
#### Timeout Event
185
```python
186
{
187
"status": "timeout",
188
"run_id": 12345,
189
"message": "dbt Cloud job run monitoring timed out after 3600 seconds"
190
}
191
```
192
193
## Advanced Usage Patterns
194
195
### Multiple Job Monitoring
196
197
```python
198
# Monitor multiple jobs with separate triggers (automatic)
199
jobs = [111, 222, 333]
200
201
for i, job_id in enumerate(jobs):
202
run_job = DbtCloudRunJobOperator(
203
task_id=f'run_job_{i}',
204
job_id=job_id,
205
deferrable=True, # Each gets its own trigger
206
wait_for_termination=True,
207
dag=dag,
208
)
209
```
210
211
### Trigger with Custom Polling Intervals
212
213
```python
214
# Fast polling for critical jobs
215
critical_job = DbtCloudRunJobOperator(
216
task_id='critical_job',
217
job_id=99999,
218
deferrable=True,
219
check_interval=30, # Check every 30 seconds
220
timeout=1800, # 30 minutes max
221
dag=dag,
222
)
223
224
# Slow polling for batch jobs
225
batch_job = DbtCloudRunJobOperator(
226
task_id='batch_job',
227
job_id=11111,
228
deferrable=True,
229
check_interval=600, # Check every 10 minutes
230
timeout=28800, # 8 hours max
231
dag=dag,
232
)
233
```
234
235
### Error Handling with Triggers
236
237
```python
238
def handle_trigger_failure(**context):
239
"""Custom error handling for trigger failures."""
240
task_instance = context['task_instance']
241
exception = context.get('exception')
242
243
if 'timeout' in str(exception).lower():
244
print("Job monitoring timed out - job may still be running")
245
# Implement custom timeout handling
246
else:
247
print(f"Job monitoring failed: {exception}")
248
# Implement custom error handling
249
250
long_running_job = DbtCloudRunJobOperator(
251
task_id='long_running_job',
252
job_id=55555,
253
deferrable=True,
254
timeout=43200, # 12 hours
255
on_failure_callback=handle_trigger_failure,
256
dag=dag,
257
)
258
```
259
260
### Resource-Efficient Workflows
261
262
```python
263
from airflow.operators.dummy import DummyOperator
264
265
# Start multiple long-running jobs without blocking workers
266
start = DummyOperator(task_id='start', dag=dag)
267
268
# All these jobs run concurrently without occupying worker slots
269
deferrable_jobs = []
270
for i in range(10): # 10 parallel jobs
271
job = DbtCloudRunJobOperator(
272
task_id=f'dbt_job_{i}',
273
job_id=10000 + i,
274
deferrable=True, # Uses trigger - no worker blocking
275
dag=dag,
276
)
277
start >> job
278
deferrable_jobs.append(job)
279
280
# Proceed after all complete
281
finish = DummyOperator(task_id='finish', dag=dag)
282
deferrable_jobs >> finish
283
```
284
285
## Performance Considerations
286
287
### Trigger Efficiency
288
- **Resource Usage**: Triggers run in the Airflow triggerer process, not worker processes
289
- **Scale**: Hundreds of triggers can run concurrently with minimal resource impact
290
- **Persistence**: Triggers survive Airflow restarts and maintain state
291
292
### Optimal Configuration
293
- **Poll Interval**: Balance between responsiveness and API load
294
- Critical jobs: 30-60 seconds
295
- Regular jobs: 60-300 seconds
296
- Batch jobs: 300-600 seconds
297
- **Timeout**: Set based on expected job duration plus buffer
298
- Add 50-100% buffer to expected runtime
299
- Consider downstream task dependencies
300
301
### Best Practices
302
- Use deferrable mode for jobs longer than 5 minutes
303
- Set reasonable poll intervals to avoid API rate limits
304
- Monitor trigger performance in Airflow UI's "Triggers" view
305
- Use triggers for I/O-bound operations (API polling, file watching)
306
307
## Debugging Triggers
308
309
### Trigger Logs
310
```python
311
# Check trigger logs in Airflow UI:
312
# Admin > Logs > Triggerer Logs
313
# Or via CLI: airflow logs triggerer
314
```
315
316
### Common Issues
317
1. **Connection Errors**: Check dbt Cloud connection configuration
318
2. **Timeout Issues**: Verify timeout values and job duration expectations
319
3. **API Rate Limits**: Adjust poll intervals if hitting rate limits
320
4. **Trigger Restart**: Triggers resume after Airflow restarts automatically
321
322
## Types
323
324
```python { .api }
325
from typing import Any, AsyncIterator, Dict, Tuple
326
from airflow.triggers.base import BaseTrigger, TriggerEvent
327
328
class TriggerEvent:
329
"""Event emitted by triggers to notify task completion."""
330
payload: Dict[str, Any]
331
332
# The trigger inherits from BaseTrigger
333
# Standard trigger serialization and async patterns apply
334
```