Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration
npx @tessl/cli install tessl/pypi-apache-airflow-providers-dbt-cloud@4.4.00
# Apache Airflow Providers dbt Cloud
1
2
An Apache Airflow provider package that enables seamless integration with dbt Cloud for orchestrating data transformation workflows. This provider offers comprehensive connectivity to dbt Cloud's analytics engineering platform, allowing teams to trigger dbt Cloud jobs, monitor their status, retrieve artifacts, and integrate with data lineage systems within Airflow DAGs.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-dbt-cloud
7
- **Package Type**: pip
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-dbt-cloud`
10
- **Minimum Airflow Version**: 2.10.0
11
12
## Core Imports
13
14
```python
15
from airflow.providers.dbt.cloud.hooks.dbt import (
16
DbtCloudHook,
17
DbtCloudJobRunStatus,
18
DbtCloudJobRunException,
19
DbtCloudResourceLookupError,
20
JobRunInfo
21
)
22
from airflow.providers.dbt.cloud.operators.dbt import (
23
DbtCloudRunJobOperator,
24
DbtCloudGetJobRunArtifactOperator,
25
DbtCloudListJobsOperator
26
)
27
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
28
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
29
```
30
31
## Basic Usage
32
33
```python
34
from datetime import datetime, timedelta
35
from airflow import DAG
36
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
37
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
38
39
# Define default arguments
40
default_args = {
41
'owner': 'data-team',
42
'depends_on_past': False,
43
'start_date': datetime(2024, 1, 1),
44
'email_on_failure': False,
45
'email_on_retry': False,
46
'retries': 1,
47
'retry_delay': timedelta(minutes=5),
48
}
49
50
# Create DAG
51
dag = DAG(
52
'dbt_cloud_workflow',
53
default_args=default_args,
54
description='Run dbt Cloud transformation job',
55
schedule_interval=timedelta(hours=6),
56
catchup=False,
57
)
58
59
# Run a dbt Cloud job
60
run_dbt_job = DbtCloudRunJobOperator(
61
task_id='run_dbt_models',
62
dbt_cloud_conn_id='dbt_cloud_default',
63
job_id=12345,
64
check_interval=60,
65
timeout=3600,
66
dag=dag,
67
)
68
69
# Monitor job completion with sensor
70
wait_for_completion = DbtCloudJobRunSensor(
71
task_id='wait_for_dbt_completion',
72
dbt_cloud_conn_id='dbt_cloud_default',
73
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
74
timeout=3600,
75
poke_interval=60,
76
dag=dag,
77
)
78
79
run_dbt_job >> wait_for_completion
80
```
81
82
## Architecture
83
84
The dbt Cloud provider follows Airflow's standard provider pattern with four main component types:
85
86
- **Hooks**: Low-level interface for direct dbt Cloud API communication
87
- **Operators**: High-level task implementations for common dbt Cloud operations
88
- **Sensors**: Monitoring tasks that wait for specific dbt Cloud job states
89
- **Triggers**: Async components for deferrable operations and efficient resource usage
90
91
The provider integrates with Airflow's connection system for secure credential management and supports both synchronous and asynchronous (deferrable) task execution patterns.
92
93
## Capabilities
94
95
### dbt Cloud Hook
96
97
Low-level interface for comprehensive dbt Cloud API interaction, providing methods for account management, project operations, job execution, and artifact retrieval.
98
99
```python { .api }
100
class DbtCloudHook:
101
def __init__(self, dbt_cloud_conn_id: str = "dbt_cloud_default"): ...
102
def get_conn(self) -> Session: ...
103
def test_connection(self) -> tuple[bool, str]: ...
104
def trigger_job_run(self, job_id: int, cause: str, **kwargs) -> Response: ...
105
def get_job_run_status(self, run_id: int, **kwargs) -> int: ...
106
def wait_for_job_run_status(self, run_id: int, **kwargs) -> bool: ...
107
```
108
109
[dbt Cloud Hook](./hooks.md)
110
111
### Job Execution Operators
112
113
Operators for triggering and managing dbt Cloud job executions with comprehensive configuration options and integration capabilities.
114
115
```python { .api }
116
class DbtCloudRunJobOperator:
117
def __init__(
118
self,
119
job_id: int | None = None,
120
project_name: str | None = None,
121
environment_name: str | None = None,
122
job_name: str | None = None,
123
**kwargs
124
): ...
125
126
class DbtCloudGetJobRunArtifactOperator:
127
def __init__(self, run_id: int, path: str, **kwargs): ...
128
129
class DbtCloudListJobsOperator:
130
def __init__(self, account_id: int | None = None, **kwargs): ...
131
```
132
133
[Job Execution Operators](./operators.md)
134
135
### Job Monitoring Sensor
136
137
Sensor for monitoring dbt Cloud job run status with support for both polling and deferrable execution modes.
138
139
```python { .api }
140
class DbtCloudJobRunSensor:
141
def __init__(
142
self,
143
run_id: int,
144
account_id: int | None = None,
145
deferrable: bool = False,
146
**kwargs
147
): ...
148
```
149
150
[Job Monitoring Sensor](./sensors.md)
151
152
### Async Job Trigger
153
154
Async trigger for efficient monitoring of dbt Cloud job status in deferrable tasks.
155
156
```python { .api }
157
class DbtCloudRunJobTrigger:
158
def __init__(
159
self,
160
conn_id: str,
161
run_id: int,
162
end_time: float,
163
poll_interval: float,
164
account_id: int | None
165
): ...
166
```
167
168
[Async Job Trigger](./triggers.md)
169
170
### OpenLineage Integration
171
172
Automatic data lineage tracking integration with OpenLineage for comprehensive visibility across dbt transformations and Airflow workflows.
173
174
```python { .api }
175
def generate_openlineage_events_from_dbt_cloud_run(
176
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor,
177
task_instance: TaskInstance
178
) -> OperatorLineage: ...
179
```
180
181
[OpenLineage Integration](./openlineage.md)
182
183
## Connection Configuration
184
185
Create a dbt Cloud connection in Airflow with the following parameters:
186
187
- **Connection Id**: `dbt_cloud_default` (or custom name)
188
- **Connection Type**: `dbt Cloud`
189
- **Host**: `cloud.getdbt.com` (or your custom dbt Cloud instance, defaults to cloud.getdbt.com)
190
- **Login**: Account ID (optional, used as fallback when account_id not provided in methods)
191
- **Password**: Your dbt Cloud API token (required)
192
- **Extra**: JSON configuration for additional options:
193
```json
194
{
195
"account_id": 12345,
196
"proxies": {
197
"http": "http://proxy.company.com:8080",
198
"https": "https://proxy.company.com:8080"
199
}
200
}
201
```
202
203
### Connection Form UI Fields
204
205
The connection form in Airflow UI uses the following field mappings:
206
- **Account ID** (labeled as "Login"): Default account ID
207
- **API Token** (labeled as "Password"): dbt Cloud API token
208
- **Tenant** (labeled as "Host"): dbt Cloud instance URL
209
- **Extra**: Optional JSON-formatted configuration
210
211
## Types
212
213
```python { .api }
214
from enum import Enum
215
from typing import TypedDict, Sequence
216
from airflow.exceptions import AirflowException
217
from requests import PreparedRequest
218
219
class DbtCloudJobRunStatus(Enum):
220
QUEUED = 1
221
STARTING = 2
222
RUNNING = 3
223
SUCCESS = 10
224
ERROR = 20
225
CANCELLED = 30
226
NON_TERMINAL_STATUSES = (QUEUED, STARTING, RUNNING)
227
TERMINAL_STATUSES = (SUCCESS, ERROR, CANCELLED)
228
229
@classmethod
230
def check_is_valid(cls, statuses: int | Sequence[int] | set[int]) -> None: ...
231
232
@classmethod
233
def is_terminal(cls, status: int) -> bool: ...
234
235
class JobRunInfo(TypedDict):
236
account_id: int | None
237
run_id: int
238
239
class DbtCloudJobRunException(AirflowException):
240
"""Exception raised when a dbt Cloud job run fails."""
241
242
class DbtCloudResourceLookupError(AirflowException):
243
"""Exception raised when a dbt Cloud resource cannot be found."""
244
245
class TokenAuth:
246
"""Helper class for Auth when executing requests."""
247
def __init__(self, token: str): ...
248
def __call__(self, request: PreparedRequest) -> PreparedRequest: ...
249
```