0
# Job Execution Operators
1
2
The dbt Cloud provider includes three main operators for job execution and management. These operators provide high-level abstractions for common dbt Cloud operations within Airflow DAGs, handling job execution scheduling, artifact retrieval, and job discovery.
3
4
## Capabilities
5
6
### Job Execution Operator
7
8
The `DbtCloudRunJobOperator` executes dbt Cloud jobs with comprehensive configuration options and monitoring capabilities.
9
10
```python { .api }
11
class DbtCloudRunJobOperator:
12
def __init__(
13
self,
14
dbt_cloud_conn_id: str = "dbt_cloud_default",
15
job_id: int | None = None,
16
project_name: str | None = None,
17
environment_name: str | None = None,
18
job_name: str | None = None,
19
account_id: int | None = None,
20
trigger_reason: str | None = None,
21
steps_override: list[str] | None = None,
22
schema_override: str | None = None,
23
wait_for_termination: bool = True,
24
timeout: int = 60 * 60 * 24 * 7,
25
check_interval: int = 60,
26
additional_run_config: dict[str, Any] | None = None,
27
reuse_existing_run: bool = False,
28
retry_from_failure: bool = False,
29
deferrable: bool = False,
30
**kwargs
31
):
32
"""
33
Execute a dbt Cloud job.
34
35
Args:
36
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
37
job_id: dbt Cloud job ID to execute (mutually exclusive with name-based lookup)
38
project_name: Project name for job lookup (requires environment_name and job_name)
39
environment_name: Environment name for job lookup (requires project_name and job_name)
40
job_name: Job name for job lookup (requires project_name and environment_name)
41
account_id: dbt Cloud account ID (defaults to connection default)
42
trigger_reason: Reason for triggering the job (max 255 characters)
43
steps_override: Custom list of dbt commands to run
44
schema_override: Override default schema/dataset name
45
wait_for_termination: Whether to wait for job completion
46
timeout: Maximum time to wait for job completion (seconds)
47
check_interval: Time between status checks (seconds)
48
additional_run_config: Additional configuration parameters
49
reuse_existing_run: Use existing run if job is already running
50
retry_from_failure: Resume from last failure point if retrying
51
deferrable: Use async execution mode
52
"""
53
54
def execute(self, context: Context) -> int:
55
"""
56
Execute the dbt Cloud job.
57
58
Args:
59
context: Airflow task execution context
60
61
Returns:
62
int: dbt Cloud job run ID
63
64
Raises:
65
DbtCloudJobRunException: If job execution fails
66
"""
67
68
def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
69
"""
70
Complete execution for deferrable tasks.
71
72
Args:
73
context: Airflow task execution context
74
event: Trigger event containing job status
75
76
Returns:
77
int: dbt Cloud job run ID
78
"""
79
80
def on_kill(self) -> None:
81
"""Cancel the running dbt Cloud job when task is killed."""
82
83
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
84
"""
85
Generate OpenLineage metadata facets for data lineage tracking.
86
87
Args:
88
task_instance: Airflow task instance
89
90
Returns:
91
OperatorLineage: OpenLineage facets for lineage tracking
92
"""
93
```
94
95
### Artifact Retrieval Operator
96
97
The `DbtCloudGetJobRunArtifactOperator` downloads artifacts from completed dbt Cloud job runs.
98
99
```python { .api }
100
class DbtCloudGetJobRunArtifactOperator:
101
def __init__(
102
self,
103
dbt_cloud_conn_id: str = "dbt_cloud_default",
104
run_id: int,
105
path: str,
106
account_id: int | None = None,
107
step: int | None = None,
108
output_file_name: str | None = None,
109
**kwargs
110
):
111
"""
112
Download artifacts from a dbt Cloud job run.
113
114
Args:
115
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
116
run_id: dbt Cloud job run ID to retrieve artifacts from
117
path: Path to the artifact (e.g., 'manifest.json', 'run_results.json', 'catalog.json')
118
account_id: dbt Cloud account ID (defaults to connection default)
119
step: Specific step number to retrieve artifact from
120
output_file_name: Local filename to save artifact (defaults to artifact path basename)
121
"""
122
123
def execute(self, context: Context) -> str:
124
"""
125
Download the specified artifact.
126
127
Args:
128
context: Airflow task execution context
129
130
Returns:
131
str: Local path to the downloaded artifact file
132
"""
133
```
134
135
### Job Listing Operator
136
137
The `DbtCloudListJobsOperator` retrieves information about jobs in a dbt Cloud account or project.
138
139
```python { .api }
140
class DbtCloudListJobsOperator:
141
def __init__(
142
self,
143
dbt_cloud_conn_id: str = "dbt_cloud_default",
144
account_id: int | None = None,
145
project_id: int | None = None,
146
order_by: str | None = None,
147
**kwargs
148
):
149
"""
150
List jobs in a dbt Cloud account or project.
151
152
Args:
153
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
154
account_id: dbt Cloud account ID (defaults to connection default)
155
project_id: Filter jobs by project ID
156
order_by: Field to order results by (e.g., 'name', 'created_at')
157
"""
158
159
def execute(self, context: Context) -> list:
160
"""
161
Retrieve the list of jobs.
162
163
Args:
164
context: Airflow task execution context
165
166
Returns:
167
list: List of dbt Cloud job IDs (integers)
168
"""
169
```
170
171
## Usage Examples
172
173
### Basic Job Execution
174
175
```python
176
from airflow import DAG
177
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
178
from datetime import datetime, timedelta
179
180
dag = DAG(
181
'dbt_transform_dag',
182
start_date=datetime(2024, 1, 1),
183
schedule_interval='@daily',
184
)
185
186
# Execute dbt job by ID
187
run_dbt_models = DbtCloudRunJobOperator(
188
task_id='run_dbt_models',
189
dbt_cloud_conn_id='dbt_cloud_prod',
190
job_id=12345,
191
trigger_reason='Daily scheduled transformation',
192
timeout=3600,
193
dag=dag,
194
)
195
```
196
197
### Job Execution with Name-based Lookup
198
199
```python
200
# Execute job by project/environment/job names
201
run_staging_models = DbtCloudRunJobOperator(
202
task_id='run_staging_models',
203
project_name='analytics_project',
204
environment_name='production',
205
job_name='staging_models_daily',
206
trigger_reason='Staging data refresh',
207
steps_override=['dbt run --models tag:staging', 'dbt test --models tag:staging'],
208
dag=dag,
209
)
210
```
211
212
### Advanced Job Configuration
213
214
```python
215
# Execute with advanced configuration
216
run_full_pipeline = DbtCloudRunJobOperator(
217
task_id='run_full_pipeline',
218
job_id=54321,
219
schema_override='analytics_{{ ds_nodash }}', # Dynamic schema based on execution date
220
additional_run_config={
221
'threads': 8,
222
'target': 'prod',
223
'vars': {
224
'start_date': '{{ ds }}',
225
'end_date': '{{ next_ds }}'
226
}
227
},
228
retry_from_failure=True,
229
reuse_existing_run=False,
230
dag=dag,
231
)
232
```
233
234
### Deferrable Job Execution
235
236
```python
237
# Use deferrable execution for resource efficiency
238
run_long_job = DbtCloudRunJobOperator(
239
task_id='run_long_dbt_job',
240
job_id=99999,
241
deferrable=True, # Enable async execution
242
timeout=14400, # 4 hours
243
check_interval=300, # Check every 5 minutes
244
dag=dag,
245
)
246
```
247
248
### Artifact Retrieval
249
250
```python
251
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator
252
253
# Download job artifacts after completion
254
download_manifest = DbtCloudGetJobRunArtifactOperator(
255
task_id='download_manifest',
256
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
257
path='manifest.json',
258
output_file_name='dbt_manifest_{{ ds }}.json',
259
dag=dag,
260
)
261
262
download_run_results = DbtCloudGetJobRunArtifactOperator(
263
task_id='download_run_results',
264
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
265
path='run_results.json',
266
dag=dag,
267
)
268
269
# Set task dependencies
270
run_dbt_models >> [download_manifest, download_run_results]
271
```
272
273
### Job Discovery
274
275
```python
276
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudListJobsOperator
277
278
# List all jobs in an account
279
list_all_jobs = DbtCloudListJobsOperator(
280
task_id='list_all_jobs',
281
account_id=12345,
282
order_by='name',
283
dag=dag,
284
)
285
286
# List jobs for specific project
287
list_project_jobs = DbtCloudListJobsOperator(
288
task_id='list_project_jobs',
289
project_id=67890,
290
dag=dag,
291
)
292
```
293
294
### Error Handling and Monitoring
295
296
```python
297
from airflow.operators.python import PythonOperator
298
299
def handle_dbt_failure(**context):
300
"""Custom error handling for dbt job failures."""
301
run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_models')
302
if run_id:
303
print(f"dbt job run {run_id} failed - triggering cleanup")
304
# Add custom failure handling logic here
305
306
# Add failure callback
307
run_dbt_models = DbtCloudRunJobOperator(
308
task_id='run_dbt_models',
309
job_id=12345,
310
on_failure_callback=handle_dbt_failure,
311
dag=dag,
312
)
313
```
314
315
### Data Lineage Integration
316
317
```python
318
# When using with OpenLineage provider for data lineage
319
# The operator automatically generates lineage metadata
320
run_with_lineage = DbtCloudRunJobOperator(
321
task_id='run_with_lineage',
322
job_id=12345,
323
# OpenLineage integration happens automatically
324
# when apache-airflow-providers-openlineage >= 2.3.0 is installed
325
dag=dag,
326
)
327
```
328
329
## Template Fields
330
331
All operators support Airflow templating for dynamic values:
332
333
### DbtCloudRunJobOperator Template Fields
334
- `dbt_cloud_conn_id`
335
- `job_id`
336
- `project_name`
337
- `environment_name`
338
- `job_name`
339
- `account_id`
340
- `trigger_reason`
341
- `steps_override`
342
- `schema_override`
343
- `additional_run_config`
344
345
### DbtCloudGetJobRunArtifactOperator Template Fields
346
- `dbt_cloud_conn_id`
347
- `run_id`
348
- `path`
349
- `account_id`
350
- `output_file_name`
351
352
### DbtCloudListJobsOperator Template Fields
353
- `account_id`
354
- `project_id`
355
356
## Types
357
358
```python { .api }
359
from typing import Any, Dict, List, Optional
360
from airflow.models import BaseOperator
361
from airflow.models.baseoperatorlink import BaseOperatorLink
362
from airflow.utils.context import Context
363
364
class DbtCloudRunJobOperatorLink(BaseOperatorLink):
365
"""Operator link for monitoring job runs in dbt Cloud UI."""
366
name = "Monitor Job Run"
367
368
def get_link(self, operator: BaseOperator, *, ti_key=None) -> str:
369
"""
370
Generate link to dbt Cloud job run monitoring page.
371
372
Args:
373
operator: The operator instance
374
ti_key: Task instance key
375
376
Returns:
377
str: URL to dbt Cloud job run page
378
"""
379
```