0
# dbt Cloud v2 Integration
1
2
Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors. This is the recommended approach for new dbt Cloud integrations, providing better composability and more flexible configuration options.
3
4
## Capabilities
5
6
### Credentials and Workspace Management
7
8
#### DbtCloudCredentials
9
10
Dataclass for dbt Cloud API authentication credentials.
11
12
```python { .api }
13
@dataclass
14
class DbtCloudCredentials(Resolvable):
15
"""
16
dbt Cloud API credentials.
17
18
Attributes:
19
- api_token: dbt Cloud API token for authentication
20
- account_id: dbt Cloud account ID
21
"""
22
23
api_token: str
24
account_id: int
25
```
26
27
#### DbtCloudWorkspace
28
29
Configurable resource representing a dbt Cloud workspace with project and environment context.
30
31
```python { .api }
32
@dataclass
33
class DbtCloudWorkspace(ConfigurableResource):
34
"""
35
dbt Cloud workspace resource for project and environment management.
36
37
Attributes:
38
- credentials: DbtCloudCredentials instance
39
- project_id: dbt Cloud project ID
40
- environment_id: dbt Cloud environment ID
41
"""
42
43
credentials: DbtCloudCredentials
44
project_id: int
45
environment_id: int
46
47
def run_job(
48
self,
49
job_id: int,
50
cause: str = "Triggered by Dagster",
51
steps_override: Optional[List[str]] = None,
52
schema_override: Optional[str] = None,
53
git_sha: Optional[str] = None,
54
**kwargs
55
) -> DbtCloudRun:
56
"""
57
Run a dbt Cloud job.
58
59
Parameters:
60
- job_id: dbt Cloud job ID to execute
61
- cause: Description of why the job was triggered
62
- steps_override: Custom dbt commands to run
63
- schema_override: Override target schema
64
- git_sha: Specific git SHA to run against
65
- **kwargs: Additional job trigger parameters
66
67
Returns:
68
DbtCloudRun object representing the triggered run
69
"""
70
71
def get_job(self, job_id: int) -> DbtCloudJob:
72
"""
73
Get dbt Cloud job details.
74
75
Parameters:
76
- job_id: dbt Cloud job ID
77
78
Returns:
79
DbtCloudJob object with job details
80
"""
81
82
def get_run(self, run_id: int) -> DbtCloudRun:
83
"""
84
Get dbt Cloud run details.
85
86
Parameters:
87
- run_id: dbt Cloud run ID
88
89
Returns:
90
DbtCloudRun object with run details
91
"""
92
93
def get_manifest(self, job_id: int) -> dict:
94
"""
95
Get dbt manifest for a job.
96
97
Parameters:
98
- job_id: dbt Cloud job ID
99
100
Returns:
101
Parsed manifest.json dictionary
102
"""
103
```
104
105
### Asset Loading and Specifications
106
107
#### load_dbt_cloud_asset_specs
108
109
Loads asset specifications from a dbt Cloud job without creating executable assets.
110
111
```python { .api }
112
def load_dbt_cloud_asset_specs(
113
workspace: DbtCloudWorkspace,
114
job_id: int,
115
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
116
**kwargs
117
) -> Sequence[AssetSpec]:
118
"""
119
Load asset specs from dbt Cloud job.
120
121
Parameters:
122
- workspace: DbtCloudWorkspace resource
123
- job_id: dbt Cloud job ID
124
- dagster_dbt_translator: Custom translator for asset mapping
125
- **kwargs: Additional parameters for spec generation
126
127
Returns:
128
Sequence of AssetSpec objects representing dbt models
129
"""
130
```
131
132
#### load_dbt_cloud_check_specs
133
134
Loads data quality check specifications from a dbt Cloud job.
135
136
```python { .api }
137
def load_dbt_cloud_check_specs(
138
workspace: DbtCloudWorkspace,
139
job_id: int,
140
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
141
**kwargs
142
) -> Sequence[AssetCheckSpec]:
143
"""
144
Load check specs from dbt Cloud job.
145
146
Parameters:
147
- workspace: DbtCloudWorkspace resource
148
- job_id: dbt Cloud job ID
149
- dagster_dbt_translator: Custom translator for check mapping
150
- **kwargs: Additional parameters for spec generation
151
152
Returns:
153
Sequence of AssetCheckSpec objects for dbt tests
154
"""
155
```
156
157
### Asset Decorators
158
159
#### dbt_cloud_assets
160
161
Modern decorator for creating assets from dbt Cloud jobs.
162
163
```python { .api }
164
def dbt_cloud_assets(
165
job_id: int,
166
workspace: DbtCloudWorkspace,
167
name: Optional[str] = None,
168
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
169
partitions_def: Optional[PartitionsDefinition] = None,
170
backfill_policy: Optional[BackfillPolicy] = None,
171
op_tags: Optional[Mapping[str, Any]] = None,
172
**kwargs
173
) -> Callable[..., AssetsDefinition]:
174
"""
175
Create Dagster assets from dbt Cloud job.
176
177
Parameters:
178
- job_id: dbt Cloud job ID
179
- workspace: DbtCloudWorkspace resource
180
- dagster_dbt_translator: Custom translator for asset mapping
181
- backfill_policy: Backfill policy for assets
182
- op_tags: Tags to apply to the underlying op
183
- **kwargs: Additional decorator parameters
184
185
Returns:
186
Decorated function that materializes dbt Cloud assets
187
"""
188
```
189
190
### Sensor Integration
191
192
#### build_dbt_cloud_polling_sensor
193
194
Creates a sensor that polls dbt Cloud for job completion and triggers downstream assets.
195
196
```python { .api }
197
def build_dbt_cloud_polling_sensor(
198
workspace: DbtCloudWorkspace,
199
job_id: int,
200
sensor_name: str,
201
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
202
minimum_interval_seconds: Optional[int] = None,
203
run_tags: Optional[Mapping[str, str]] = None,
204
) -> SensorDefinition:
205
"""
206
Build polling sensor for dbt Cloud job.
207
208
Parameters:
209
- job_id: dbt Cloud job ID to monitor
210
- workspace: DbtCloudWorkspace resource
211
- dagster_dbt_translator: Custom translator for asset mapping
212
- minimum_interval_seconds: Minimum time between sensor evaluations
213
- **kwargs: Additional sensor configuration
214
215
Returns:
216
SensorDefinition that monitors dbt Cloud job completion
217
"""
218
```
219
220
### Data Types
221
222
#### DbtCloudAccount
223
224
Represents a dbt Cloud account with metadata and configuration.
225
226
```python { .api }
227
class DbtCloudAccount:
228
"""
229
dbt Cloud account representation.
230
231
Attributes:
232
- id: Account ID
233
- name: Account name
234
- plan: Account plan type
235
- state: Account state
236
"""
237
238
id: int
239
name: str
240
plan: str
241
state: int
242
```
243
244
#### DbtCloudProject
245
246
Represents a dbt Cloud project within an account.
247
248
```python { .api }
249
class DbtCloudProject:
250
"""
251
dbt Cloud project representation.
252
253
Attributes:
254
- id: Project ID
255
- name: Project name
256
- account_id: Parent account ID
257
- repository_id: Associated repository ID
258
- state: Project state
259
"""
260
261
id: int
262
name: str
263
account_id: int
264
repository_id: Optional[int]
265
state: int
266
```
267
268
#### DbtCloudEnvironment
269
270
Represents a dbt Cloud environment configuration.
271
272
```python { .api }
273
class DbtCloudEnvironment:
274
"""
275
dbt Cloud environment representation.
276
277
Attributes:
278
- id: Environment ID
279
- name: Environment name
280
- project_id: Parent project ID
281
- type: Environment type (development/deployment)
282
- state: Environment state
283
"""
284
285
id: int
286
name: str
287
project_id: int
288
type: str
289
state: int
290
```
291
292
#### DbtCloudJob
293
294
Represents a dbt Cloud job configuration.
295
296
```python { .api }
297
class DbtCloudJob:
298
"""
299
dbt Cloud job representation.
300
301
Attributes:
302
- id: Job ID
303
- name: Job name
304
- project_id: Parent project ID
305
- environment_id: Target environment ID
306
- execute_steps: List of dbt commands to execute
307
- triggers: Job trigger configuration
308
- state: Job state
309
"""
310
311
id: int
312
name: str
313
project_id: int
314
environment_id: int
315
execute_steps: List[str]
316
triggers: dict
317
state: int
318
```
319
320
#### DbtCloudRun
321
322
Represents a dbt Cloud job run instance.
323
324
```python { .api }
325
class DbtCloudRun:
326
"""
327
dbt Cloud run representation.
328
329
Attributes:
330
- id: Run ID
331
- job_id: Parent job ID
332
- trigger: Run trigger information
333
- status: Current run status
334
- started_at: Run start timestamp
335
- finished_at: Run completion timestamp
336
- duration: Run duration in seconds
337
"""
338
339
id: int
340
job_id: int
341
trigger: dict
342
status: int
343
started_at: Optional[str]
344
finished_at: Optional[str]
345
duration: Optional[int]
346
347
@property
348
def status_humanized(self) -> str:
349
"""Get human-readable status string."""
350
351
@property
352
def is_success(self) -> bool:
353
"""Check if run completed successfully."""
354
355
@property
356
def is_complete(self) -> bool:
357
"""Check if run has completed (success or failure)."""
358
```
359
360
#### DbtCloudJobRunStatusType
361
362
Enumeration of dbt Cloud job run statuses.
363
364
```python { .api }
365
class DbtCloudJobRunStatusType(Enum):
366
"""
367
dbt Cloud job run status enumeration.
368
"""
369
QUEUED = 1
370
STARTING = 2
371
RUNNING = 3
372
SUCCESS = 10
373
ERROR = 20
374
CANCELLED = 30
375
```
376
377
## Usage Examples
378
379
### Basic dbt Cloud v2 Asset Creation
380
381
```python
382
from dagster import Definitions, AssetExecutionContext
383
from dagster_dbt.cloud_v2 import (
384
DbtCloudCredentials,
385
DbtCloudWorkspace,
386
dbt_cloud_assets
387
)
388
389
# Configure credentials and workspace
390
credentials = DbtCloudCredentials(
391
account_id=12345,
392
token="dbt_api_token_here",
393
access_url="https://cloud.getdbt.com"
394
)
395
workspace = DbtCloudWorkspace(
396
credentials=credentials,
397
project_id=67890
398
)
399
400
# Create assets from dbt Cloud job
401
@dbt_cloud_assets(
402
job_id=123,
403
workspace=workspace
404
)
405
def my_dbt_cloud_assets(context: AssetExecutionContext):
406
# Trigger and monitor dbt Cloud job
407
run = workspace.run_job(
408
job_id=123,
409
cause="Triggered by Dagster asset materialization"
410
)
411
412
# Poll for completion and yield events
413
yield from run.stream_events(context=context)
414
415
defs = Definitions(
416
assets=[my_dbt_cloud_assets],
417
resources={"workspace": workspace}
418
)
419
```
420
421
### Using Asset Specifications
422
423
```python
424
from dagster import Definitions
425
from dagster_dbt.cloud_v2 import (
426
DbtCloudCredentials,
427
DbtCloudWorkspace,
428
load_dbt_cloud_asset_specs
429
)
430
431
credentials = DbtCloudCredentials(
432
account_id=12345,
433
token="dbt_api_token_here",
434
access_url="https://cloud.getdbt.com"
435
)
436
workspace = DbtCloudWorkspace(
437
credentials=credentials,
438
project_id=67890
439
)
440
441
# Load asset specs without creating executable assets
442
asset_specs = load_dbt_cloud_asset_specs(
443
workspace=workspace,
444
job_id=123
445
)
446
447
# Use specs to create custom assets or for analysis
448
defs = Definitions(assets=asset_specs)
449
```
450
451
### Polling Sensor Integration
452
453
```python
454
from dagster import Definitions
455
from dagster_dbt.cloud_v2 import (
456
DbtCloudCredentials,
457
DbtCloudWorkspace,
458
build_dbt_cloud_polling_sensor
459
)
460
461
credentials = DbtCloudCredentials(
462
account_id=12345,
463
token="dbt_api_token_here",
464
access_url="https://cloud.getdbt.com"
465
)
466
workspace = DbtCloudWorkspace(
467
credentials=credentials,
468
project_id=67890
469
)
470
471
# Create sensor to monitor dbt Cloud job
472
dbt_cloud_sensor = build_dbt_cloud_polling_sensor(
473
job_id=123,
474
workspace=workspace,
475
minimum_interval_seconds=120
476
)
477
478
defs = Definitions(sensors=[dbt_cloud_sensor])
479
```
480
481
### Custom Translation
482
483
```python
484
from dagster import AssetKey
485
from dagster_dbt.cloud_v2 import DbtCloudWorkspace, dbt_cloud_assets
486
from dagster_dbt import DagsterDbtTranslator
487
488
class CustomDbtCloudTranslator(DagsterDbtTranslator):
489
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
490
return AssetKey([
491
"dbt_cloud",
492
dbt_resource_props["database"],
493
dbt_resource_props["schema"],
494
dbt_resource_props["name"]
495
])
496
497
def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
498
return dbt_resource_props.get("config", {}).get("group", "default")
499
500
@dbt_cloud_assets(
501
job_id=123,
502
workspace=workspace,
503
dagster_dbt_translator=CustomDbtCloudTranslator()
504
)
505
def my_custom_assets(context):
506
yield from workspace.run_job(job_id=123).stream_events(context=context)
507
```