A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
npx @tessl/cli install tessl/pypi-dagster-dbt@0.27.00
# Dagster dbt Integration
1
2
A comprehensive integration library that enables orchestrating dbt models as Dagster assets with full lineage tracking, metadata propagation, and observability features. This package bridges the gap between dbt's SQL-based transformation capabilities and Dagster's data orchestration framework, allowing data engineers to leverage both tools' strengths in modern data stacks.
3
4
## Package Information
5
6
- **Package Name**: dagster-dbt
7
- **Language**: Python
8
- **Installation**: `pip install dagster-dbt`
9
10
## Core Imports
11
12
```python
13
from dagster_dbt import dbt_assets, DbtCliResource
14
```
15
16
For dbt Cloud integration:
17
18
```python
19
from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_job
20
```
21
22
For dbt Cloud v2 (recommended):
23
24
```python
25
from dagster_dbt.cloud_v2 import DbtCloudCredentials, DbtCloudWorkspace, dbt_cloud_assets
26
```
27
28
## Basic Usage
29
30
### Local dbt Project with Assets
31
32
```python
33
from dagster import AssetExecutionContext, Definitions
34
from dagster_dbt import DbtCliResource, dbt_assets
35
36
# Define dbt CLI resource
37
dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")
38
39
# Create assets from dbt models
40
@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
41
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
42
yield from dbt.cli(["build"], context=context).stream()
43
44
# Define the Dagster code location
45
defs = Definitions(
46
assets=[my_dbt_assets],
47
resources={"dbt": dbt_cli_resource}
48
)
49
```
50
51
### dbt Cloud Integration
52
53
```python
54
from dagster import Definitions
55
from dagster_dbt.cloud_v2 import (
56
DbtCloudCredentials,
57
DbtCloudWorkspace,
58
dbt_cloud_assets
59
)
60
61
# Configure dbt Cloud credentials
62
credentials = DbtCloudCredentials(
63
account_id=12345,
64
token="your_token",
65
access_url="https://cloud.getdbt.com"
66
)
67
workspace = DbtCloudWorkspace(
68
credentials=credentials,
69
project_id=67890
70
)
71
72
# Create assets from dbt Cloud job
73
@dbt_cloud_assets(
74
job_id=123,
75
workspace=workspace
76
)
77
def my_dbt_cloud_assets(context):
78
yield from workspace.run_job(job_id=123, context=context)
79
80
defs = Definitions(assets=[my_dbt_cloud_assets])
81
```
82
83
## Architecture
84
85
The dagster-dbt integration follows a layered architecture:
86
87
- **Asset Layer**: `@dbt_assets` decorators convert dbt models into Dagster assets
88
- **Resource Layer**: `DbtCliResource` and Cloud resources handle execution and communication
89
- **Translation Layer**: `DagsterDbtTranslator` maps dbt metadata to Dagster concepts
90
- **Project Layer**: `DbtProject` manages dbt project structure and manifest parsing
91
- **Event Layer**: CLI event handlers provide execution observability
92
93
This design enables seamless integration while maintaining separation of concerns between dbt's transformation logic and Dagster's orchestration capabilities.
94
95
## Capabilities
96
97
### Asset Creation and Management
98
99
Core functionality for creating Dagster assets from dbt models, including decorators, asset specifications, and selection utilities.
100
101
```python { .api }
102
def dbt_assets(
103
manifest: DbtManifestParam,
104
select: str = "fqn:*",
105
exclude: Optional[str] = "",
106
selector: Optional[str] = "",
107
name: Optional[str] = None,
108
io_manager_key: Optional[str] = None,
109
partitions_def: Optional[PartitionsDefinition] = None,
110
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
111
backfill_policy: Optional[BackfillPolicy] = None,
112
op_tags: Optional[Mapping[str, Any]] = None,
113
required_resource_keys: Optional[set[str]] = None,
114
project: Optional[DbtProject] = None,
115
retry_policy: Optional[RetryPolicy] = None,
116
pool: Optional[str] = None,
117
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...
118
119
def build_dbt_asset_specs(
120
manifest: DbtManifestParam,
121
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
122
select: str = "fqn:*",
123
exclude: Optional[str] = "",
124
selector: Optional[str] = "",
125
project: Optional[DbtProject] = None,
126
) -> Sequence[AssetSpec]: ...
127
128
def build_dbt_asset_selection(
129
dbt_assets: Sequence[AssetsDefinition],
130
dbt_select: str = "fqn:*",
131
dbt_exclude: Optional[str] = "",
132
dbt_selector: Optional[str] = "",
133
) -> AssetSelection: ...
134
```
135
136
[Asset Creation](./asset-creation.md)
137
138
### CLI Resource and Execution
139
140
Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management.
141
142
```python { .api }
143
class DbtCliResource(ConfigurableResource):
144
project_dir: str
145
global_config_flags: list[str] = []
146
profiles_dir: Optional[str] = None
147
profile: Optional[str] = None
148
target: Optional[str] = None
149
dbt_executable: str = "dbt"
150
state_path: Optional[str] = None
151
152
def cli(
153
self,
154
args: Sequence[str],
155
raise_on_error: bool = True,
156
manifest: Optional[DbtManifestParam] = None,
157
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
158
context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,
159
target_path: Optional[Path] = None,
160
) -> DbtCliInvocation: ...
161
```
162
163
[CLI Resource](./cli-resource.md)
164
165
### dbt Cloud Integration (Legacy)
166
167
Original dbt Cloud integration providing job execution, asset loading, and operations.
168
169
```python { .api }
170
def dbt_cloud_resource(api_token: str, account_id: int): ...
171
def load_assets_from_dbt_cloud_job(dbt_cloud: ResourceDefinition, job_id: int): ...
172
def dbt_cloud_run_op(context: OpExecutionContext, dbt_cloud: DbtCloudResource): ...
173
```
174
175
[dbt Cloud Legacy](./dbt-cloud-legacy.md)
176
177
### dbt Cloud v2 Integration
178
179
Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors.
180
181
```python { .api }
182
@dataclass
183
class DbtCloudCredentials(Resolvable):
184
api_token: str
185
account_id: int
186
187
@dataclass
188
class DbtCloudWorkspace(ConfigurableResource):
189
credentials: DbtCloudCredentials
190
project_id: int
191
environment_id: int
192
193
def dbt_cloud_assets(
194
job_id: int,
195
workspace: DbtCloudWorkspace,
196
name: Optional[str] = None,
197
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
198
partitions_def: Optional[PartitionsDefinition] = None,
199
) -> Callable[..., AssetsDefinition]: ...
200
```
201
202
[dbt Cloud v2](./dbt-cloud-v2.md)
203
204
### Translation System
205
206
Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys.
207
208
```python { .api }
209
class DagsterDbtTranslator:
210
def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...
211
212
def get_asset_spec(
213
self,
214
manifest: Mapping[str, Any],
215
unique_id: str,
216
project: Optional[DbtProject]
217
) -> AssetSpec: ...
218
219
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: ...
220
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...
221
```
222
223
[Translation System](./translation-system.md)
224
225
### Project Management
226
227
dbt project handling, manifest parsing, and project preparation for integration with Dagster.
228
229
```python { .api }
230
@record_custom
231
class DbtProject(IHaveNew):
232
name: str
233
project_dir: Path
234
target_path: Path
235
profiles_dir: Path
236
profile: Optional[str]
237
target: Optional[str]
238
manifest_path: Path
239
packaged_project_dir: Optional[Path]
240
state_path: Optional[Path]
241
has_uninstalled_deps: bool
242
preparer: DbtProjectPreparer
243
244
def prepare_if_dev(self) -> None: ...
245
246
class DbtProjectPreparer:
247
def prepare(self) -> None: ...
248
249
class DagsterDbtProjectPreparer(DbtProjectPreparer):
250
pass
251
```
252
253
[Project Management](./project-management.md)
254
255
### Component System
256
257
Integration with Dagster's component system for declarative dbt project configuration.
258
259
```python { .api }
260
class DbtProjectComponent(Component):
261
dbt_project_path: str
262
dbt_profiles_path: Optional[str] = None
263
```
264
265
[Component System](./component-system.md)
266
267
### Utilities and Helpers
268
269
Utility functions for asset selection, naming conventions, metadata handling, and manifest operations.
270
271
```python { .api }
272
def get_asset_key_for_model(dbt_assets: Sequence[AssetsDefinition], model_name: str) -> AssetKey: ...
273
def get_asset_key_for_source(dbt_assets: Sequence[AssetsDefinition], source_name: str) -> AssetKey: ...
274
def get_asset_keys_by_output_name_for_source(
275
dbt_assets: Sequence[AssetsDefinition],
276
source_name: str
277
) -> Mapping[str, AssetKey]: ...
278
279
def default_metadata_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...
280
def default_group_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...
281
def group_from_dbt_resource_props_fallback_to_directory(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...
282
```
283
284
[Utilities](./utilities.md)
285
286
### Error Handling
287
288
Comprehensive exception hierarchy for dbt integration error handling and debugging.
289
290
```python { .api }
291
class DagsterDbtError(Failure, ABC): ...
292
class DagsterDbtCliRuntimeError(DagsterDbtError, ABC): ...
293
class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError, DagsterInvariantViolationError): ...
294
class DagsterDbtProjectNotFoundError(DagsterDbtError): ...
295
class DagsterDbtProfilesDirectoryNotFoundError(DagsterDbtError): ...
296
class DagsterDbtManifestNotFoundError(DagsterDbtError): ...
297
class DagsterDbtProjectYmlFileNotFoundError(DagsterDbtError): ...
298
```
299
300
[Error Handling](./error-handling.md)
301
302
### Freshness Checks
303
304
Build asset checks for dbt source freshness validation.
305
306
```python { .api }
307
def build_freshness_checks_from_dbt_assets(
308
dbt_assets: Sequence[AssetsDefinition]
309
) -> Sequence[AssetChecksDefinition]: ...
310
```
311
312
[Freshness Checks](./freshness-checks.md)
313
314
### Asset Selection
315
316
Advanced asset selection capabilities using dbt manifest information.
317
318
```python { .api }
319
class DbtManifestAssetSelection(AssetSelection):
320
manifest: Mapping[str, Any]
321
select: str
322
exclude: str
323
selector: str
324
dagster_dbt_translator: DagsterDbtTranslator
325
project: Optional[DbtProject]
326
```
327
328
## Types
329
330
```python { .api }
331
# Type aliases for common dbt-related types
332
DbtManifestParam = Union[Mapping[str, Any], str, Path]
333
334
# CLI Event Message classes
335
class DbtCliEventMessage(ABC):
336
raw_event: dict[str, Any]
337
338
def to_default_asset_events(
339
self,
340
manifest: DbtManifestParam,
341
dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
342
context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,
343
target_path: Optional[Path] = None,
344
project: Optional[DbtProject] = None,
345
) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult, AssetCheckEvaluation]]: ...
346
347
class DbtCliInvocation:
348
process: subprocess.Popen
349
manifest: Mapping[str, Any]
350
dagster_dbt_translator: DagsterDbtTranslator
351
project_dir: Path
352
target_path: Path
353
raise_on_error: bool
354
355
def wait(self) -> "DbtCliInvocation": ...
356
def is_successful(self) -> bool: ...
357
def stream(self) -> Iterator[DbtCliEventMessage]: ...
358
def get_artifact(self, artifact: Literal["manifest.json", "catalog.json", "run_results.json", "sources.json"]) -> dict[str, Any]: ...
359
```
360
361
## Constants
362
363
### Metadata Keys
364
- `DAGSTER_DBT_MANIFEST_METADATA_KEY` = "dagster_dbt/manifest"
365
- `DAGSTER_DBT_TRANSLATOR_METADATA_KEY` = "dagster_dbt/dagster_dbt_translator"
366
- `DAGSTER_DBT_PROJECT_METADATA_KEY` = "dagster_dbt/project"
367
- `DAGSTER_DBT_SELECT_METADATA_KEY` = "dagster_dbt/select"
368
- `DAGSTER_DBT_EXCLUDE_METADATA_KEY` = "dagster_dbt/exclude"
369
- `DAGSTER_DBT_SELECTOR_METADATA_KEY` = "dagster_dbt/selector"
370
- `DAGSTER_DBT_UNIQUE_ID_METADATA_KEY` = "dagster_dbt/unique_id"
371
372
### Selection Defaults
373
- `DBT_DEFAULT_SELECT` = "fqn:*"
374
- `DBT_DEFAULT_EXCLUDE` = ""
375
- `DBT_DEFAULT_SELECTOR` = ""
376
377
### Environment Variables
378
- `DBT_INDIRECT_SELECTION_ENV` = "DBT_INDIRECT_SELECTION"
379
- `DBT_EMPTY_INDIRECT_SELECTION` = "empty"
380
381
### Asset Types
382
- `ASSET_RESOURCE_TYPES` = ["model", "seed", "snapshot"]
383
384
### Version Information
385
- `__version__` = "0.27.9"
386
- `DBT_CORE_VERSION_UPPER_BOUND` = "1.11"