0
# dbt Cloud Integration (Legacy)
1
2
Original dbt Cloud integration providing job execution, asset loading, and operations. This module supports the legacy dbt Cloud API integration patterns and is maintained for backward compatibility.
3
4
## Capabilities
5
6
### Cloud Resources
7
8
#### dbt_cloud_resource
9
10
Factory function for creating a dbt Cloud resource with API authentication.
11
12
```python { .api }
13
def dbt_cloud_resource(
14
api_token: str,
15
account_id: int,
16
disable_schedule_on_trigger: bool = True
17
) -> ResourceDefinition:
18
"""
19
Create a dbt Cloud resource.
20
21
Parameters:
22
- api_token: dbt Cloud API token for authentication
23
- account_id: dbt Cloud account ID
24
- disable_schedule_on_trigger: Whether to disable dbt Cloud schedules when triggering jobs
25
26
Returns:
27
ResourceDefinition for dbt Cloud integration
28
"""
29
```
30
31
#### DbtCloudResource
32
33
Legacy dbt Cloud resource class for job execution and management.
34
35
```python { .api }
36
class DbtCloudResource:
37
"""
38
Legacy dbt Cloud resource for job execution.
39
40
Attributes:
41
- api_token: dbt Cloud API token
42
- account_id: dbt Cloud account ID
43
- disable_schedule_on_trigger: Schedule disable behavior
44
"""
45
46
def run_job_and_poll(
47
self,
48
job_id: int,
49
cause: str = "Triggered by Dagster",
50
steps_override: Optional[List[str]] = None,
51
schema_override: Optional[str] = None,
52
poll_interval: int = 10,
53
poll_timeout: Optional[int] = None
54
) -> DbtCloudOutput:
55
"""
56
Run a dbt Cloud job and poll for completion.
57
58
Parameters:
59
- job_id: dbt Cloud job ID to execute
60
- cause: Description of why the job was triggered
61
- steps_override: Custom dbt commands to run instead of job default
62
- schema_override: Override the target schema
63
- poll_interval: Seconds between polling attempts
64
- poll_timeout: Maximum seconds to wait for completion
65
66
Returns:
67
DbtCloudOutput containing run results and metadata
68
"""
69
70
def get_job(self, job_id: int) -> dict:
71
"""
72
Get dbt Cloud job details.
73
74
Parameters:
75
- job_id: dbt Cloud job ID
76
77
Returns:
78
Job details dictionary from dbt Cloud API
79
"""
80
81
def get_run(self, run_id: int) -> dict:
82
"""
83
Get dbt Cloud run details.
84
85
Parameters:
86
- run_id: dbt Cloud run ID
87
88
Returns:
89
Run details dictionary from dbt Cloud API
90
"""
91
```
92
93
#### DbtCloudClientResource
94
95
Configurable resource implementation for dbt Cloud API client.
96
97
```python { .api }
98
class DbtCloudClientResource(ConfigurableResource):
99
"""
100
Configurable dbt Cloud client resource.
101
102
Attributes:
103
- api_token: dbt Cloud API token for authentication
104
- account_id: dbt Cloud account ID
105
- disable_schedule_on_trigger: Whether to disable schedules on trigger
106
"""
107
108
api_token: str
109
account_id: int
110
disable_schedule_on_trigger: bool = True
111
112
def run_job_and_poll(
113
self,
114
job_id: int,
115
**kwargs
116
) -> DbtCloudOutput:
117
"""
118
Run and poll dbt Cloud job.
119
120
Parameters:
121
- job_id: dbt Cloud job ID
122
- **kwargs: Additional job execution parameters
123
124
Returns:
125
DbtCloudOutput with execution results
126
"""
127
```
128
129
### Asset Loading
130
131
#### load_assets_from_dbt_cloud_job
132
133
Creates Dagster assets from a dbt Cloud job configuration.
134
135
```python { .api }
136
def load_assets_from_dbt_cloud_job(
137
dbt_cloud: ResourceDefinition,
138
job_id: int,
139
node_info_to_asset_key: Callable[[dict], AssetKey] = default_node_info_to_asset_key,
140
node_info_to_group_fn: Callable[[dict], Optional[str]] = lambda _: None,
141
node_info_to_freshness_policy_fn: Callable[[dict], Optional[FreshnessPolicy]] = lambda _: None,
142
partitions_def: Optional[PartitionsDefinition] = None,
143
partition_key_to_vars_fn: Optional[Callable[[str], dict]] = None,
144
op_tags: Optional[dict] = None
145
) -> Sequence[AssetsDefinition]:
146
"""
147
Load assets from dbt Cloud job.
148
149
Parameters:
150
- dbt_cloud: dbt Cloud resource definition
151
- job_id: dbt Cloud job ID to load assets from
152
- node_info_to_asset_key: Function to generate asset keys from dbt nodes
153
- node_info_to_group_fn: Function to determine asset groups
154
- node_info_to_freshness_policy_fn: Function to set freshness policies
155
- partitions_def: Partitioning definition for assets
156
- partition_key_to_vars_fn: Function to map partition keys to dbt vars
157
- op_tags: Tags to apply to generated ops
158
159
Returns:
160
Sequence of AssetsDefinition objects
161
"""
162
```
163
164
### Operations
165
166
#### dbt_cloud_run_op
167
168
Operation for executing dbt Cloud jobs within Dagster pipelines.
169
170
```python { .api }
171
def dbt_cloud_run_op(
172
context: OpExecutionContext,
173
dbt_cloud: DbtCloudResource
174
) -> DbtCloudOutput:
175
"""
176
Execute dbt Cloud job operation.
177
178
Parameters:
179
- context: Dagster operation execution context
180
- dbt_cloud: dbt Cloud resource instance
181
182
Returns:
183
DbtCloudOutput containing job execution results
184
"""
185
```
186
187
### Output Types
188
189
#### DbtCloudOutput
190
191
Output type for dbt Cloud operations containing run results and metadata.
192
193
```python { .api }
194
class DbtCloudOutput:
195
"""
196
Output from dbt Cloud job execution.
197
198
Attributes:
199
- run_details: Complete run details from dbt Cloud API
200
- is_successful: Whether the run completed successfully
201
- run_id: dbt Cloud run ID
202
- job_id: dbt Cloud job ID
203
"""
204
205
run_details: dict
206
is_successful: bool
207
208
@property
209
def run_id(self) -> int:
210
"""Get the dbt Cloud run ID."""
211
212
@property
213
def job_id(self) -> int:
214
"""Get the dbt Cloud job ID."""
215
216
@property
217
def run_status(self) -> DbtCloudRunStatus:
218
"""Get the run status enum."""
219
220
def get_artifact(self, artifact_name: str) -> Optional[dict]:
221
"""
222
Get a specific artifact from the run.
223
224
Parameters:
225
- artifact_name: Name of the artifact to retrieve
226
227
Returns:
228
Artifact dictionary or None if not found
229
"""
230
```
231
232
#### DbtCloudRunStatus
233
234
Enumeration of possible dbt Cloud run statuses.
235
236
```python { .api }
237
class DbtCloudRunStatus(Enum):
238
"""
239
dbt Cloud run status enumeration.
240
"""
241
QUEUED = 1
242
STARTING = 2
243
RUNNING = 3
244
SUCCESS = 10
245
ERROR = 20
246
CANCELLED = 30
247
```
248
249
## Usage Examples
250
251
### Basic dbt Cloud Job Execution
252
253
```python
254
from dagster import job, op, Definitions
255
from dagster_dbt.cloud import dbt_cloud_resource, dbt_cloud_run_op
256
257
# Create resource
258
dbt_cloud = dbt_cloud_resource(
259
api_token="dbt_api_token_here",
260
account_id=12345
261
)
262
263
@op(required_resource_keys={"dbt_cloud"})
264
def run_dbt_cloud_job(context):
265
dbt_cloud = context.resources.dbt_cloud
266
return dbt_cloud.run_job_and_poll(
267
job_id=67890,
268
cause="Triggered by Dagster pipeline"
269
)
270
271
@job(resource_defs={"dbt_cloud": dbt_cloud})
272
def dbt_cloud_job():
273
run_dbt_cloud_job()
274
275
defs = Definitions(jobs=[dbt_cloud_job])
276
```
277
278
### Loading Assets from dbt Cloud Job
279
280
```python
281
from dagster import Definitions
282
from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_job
283
284
dbt_cloud = dbt_cloud_resource(
285
api_token="dbt_api_token_here",
286
account_id=12345
287
)
288
289
# Load assets from dbt Cloud job
290
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
291
dbt_cloud=dbt_cloud,
292
job_id=67890
293
)
294
295
defs = Definitions(
296
assets=dbt_cloud_assets,
297
resources={"dbt_cloud": dbt_cloud}
298
)
299
```
300
301
### Custom Asset Key Mapping
302
303
```python
304
from dagster import AssetKey
305
from dagster_dbt.cloud import load_assets_from_dbt_cloud_job, dbt_cloud_resource
306
307
def custom_asset_key_fn(node_info: dict) -> AssetKey:
308
"""Custom function to generate asset keys from dbt nodes."""
309
return AssetKey([
310
"dbt_cloud",
311
node_info["database"],
312
node_info["schema"],
313
node_info["name"]
314
])
315
316
def custom_group_fn(node_info: dict) -> str:
317
"""Custom function to determine asset groups."""
318
return node_info.get("config", {}).get("materialized", "default")
319
320
dbt_cloud = dbt_cloud_resource(
321
api_token="dbt_api_token_here",
322
account_id=12345
323
)
324
325
assets = load_assets_from_dbt_cloud_job(
326
dbt_cloud=dbt_cloud,
327
job_id=67890,
328
node_info_to_asset_key=custom_asset_key_fn,
329
node_info_to_group_fn=custom_group_fn
330
)
331
```
332
333
## Migration to dbt Cloud v2
334
335
The legacy dbt Cloud integration is maintained for backward compatibility, but new projects should use the dbt Cloud v2 integration for improved features:
336
337
- Better resource management with `DbtCloudCredentials` and `DbtCloudWorkspace`
338
- Asset specifications with `load_dbt_cloud_asset_specs`
339
- Modern `@dbt_cloud_assets` decorator
340
- Polling sensors with `build_dbt_cloud_polling_sensor`
341
342
See [dbt Cloud v2](./dbt-cloud-v2.md) for the recommended modern integration.