0
# Asset Creation and Management
1
2
Core functionality for creating Dagster assets from dbt models, including decorators, asset specifications, and selection utilities. This module provides the primary interface for integrating dbt models into Dagster pipelines as first-class assets.
3
4
## Capabilities
5
6
### Asset Decorators
7
8
#### dbt_assets
9
10
Creates a set of Dagster assets from dbt models defined in a manifest.json file. This is the primary decorator for local dbt integration.
11
12
```python { .api }
13
def dbt_assets(
14
manifest: DbtManifestParam,
15
select: str = DBT_DEFAULT_SELECT,
16
exclude: str = DBT_DEFAULT_EXCLUDE,
17
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
18
backfill_policy: Optional[BackfillPolicy] = None,
19
op_tags: Optional[dict] = None,
20
required_resource_keys: Optional[set] = None,
21
**kwargs
22
) -> Callable:
23
"""
24
Create Dagster assets from dbt models.
25
26
Parameters:
27
- manifest: Path to dbt manifest.json or parsed manifest dict
28
- select: dbt selection string for included models (default: "fqn:*")
29
- exclude: dbt selection string for excluded models (default: "")
30
- dagster_dbt_translator: Custom translator for dbt->Dagster mapping
31
- backfill_policy: Backfill policy for the assets
32
- op_tags: Tags to apply to the underlying op
33
- required_resource_keys: Additional required resource keys
34
35
Returns:
36
Decorated function that yields asset materializations
37
"""
38
```
39
40
#### Usage Example
41
42
```python
43
from dagster import AssetExecutionContext, Definitions
44
from dagster_dbt import dbt_assets, DbtCliResource
45
46
dbt_resource = DbtCliResource(project_dir="./my_dbt_project")
47
48
@dbt_assets(
49
manifest="./my_dbt_project/target/manifest.json",
50
select="tag:daily",
51
exclude="tag:deprecated"
52
)
53
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
54
yield from dbt.cli(["build"], context=context).stream()
55
```
56
57
### Asset Specifications
58
59
#### build_dbt_asset_specs
60
61
Builds a sequence of AssetSpec objects from a dbt manifest without creating executable assets.
62
63
```python { .api }
64
def build_dbt_asset_specs(
65
manifest: DbtManifestParam,
66
select: str = DBT_DEFAULT_SELECT,
67
exclude: str = DBT_DEFAULT_EXCLUDE,
68
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
69
**kwargs
70
) -> Sequence[AssetSpec]:
71
"""
72
Build asset specifications from dbt manifest.
73
74
Parameters:
75
- manifest: Path to dbt manifest.json or parsed manifest dict
76
- select: dbt selection string for included models
77
- exclude: dbt selection string for excluded models
78
- dagster_dbt_translator: Custom translator for dbt->Dagster mapping
79
- **kwargs: Additional arguments passed to AssetSpec creation
80
81
Returns:
82
Sequence of AssetSpec objects representing dbt models
83
"""
84
```
85
86
### Asset Selection
87
88
#### build_dbt_asset_selection
89
90
Creates a Dagster AssetSelection from dbt assets and selection criteria.
91
92
```python { .api }
93
def build_dbt_asset_selection(
94
dbt_assets: Sequence[AssetsDefinition],
95
dbt_select: str = DBT_DEFAULT_SELECT,
96
dbt_exclude: Optional[str] = DBT_DEFAULT_EXCLUDE,
97
dbt_selector: Optional[str] = DBT_DEFAULT_SELECTOR
98
) -> AssetSelection:
99
"""
100
Build asset selection from dbt assets and selection criteria.
101
102
Parameters:
103
- dbt_assets: Sequence of dbt AssetsDefinition objects
104
- dbt_select: dbt selection string (default: "fqn:*")
105
- dbt_exclude: dbt exclusion string (default: "")
106
- dbt_selector: dbt selector string (default: "")
107
108
Returns:
109
AssetSelection object for use in Dagster job/schedule definitions
110
"""
111
```
112
113
#### build_schedule_from_dbt_selection
114
115
Creates a schedule definition from a dbt selection.
116
117
```python { .api }
118
def build_schedule_from_dbt_selection(
119
dbt_assets: Sequence[AssetsDefinition],
120
job_name: str,
121
cron_schedule: str,
122
dbt_select: str = DBT_DEFAULT_SELECT,
123
dbt_exclude: Optional[str] = DBT_DEFAULT_EXCLUDE,
124
dbt_selector: str = DBT_DEFAULT_SELECTOR,
125
schedule_name: Optional[str] = None,
126
tags: Optional[Mapping[str, str]] = None,
127
config: Optional[RunConfig] = None,
128
execution_timezone: Optional[str] = None,
129
default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED
130
) -> ScheduleDefinition:
131
"""
132
Build schedule from dbt selection criteria.
133
134
Parameters:
135
- dbt_assets: Sequence of dbt assets definitions
136
- job_name: Name for the generated job
137
- cron_schedule: Cron expression for schedule timing
138
- dbt_select: dbt selection string (default: "fqn:*")
139
- dbt_exclude: dbt exclusion string (default: "")
140
- dbt_selector: dbt selector string (default: "")
141
- schedule_name: Name for the schedule (optional)
142
- tags: Tags to apply to the job (optional)
143
- config: Run configuration (optional)
144
- execution_timezone: Timezone for schedule execution (optional)
145
- default_status: Default schedule status (default: STOPPED)
146
147
Returns:
148
ScheduleDefinition for the selected dbt models
149
"""
150
```
151
152
### Asset Key Utilities
153
154
#### get_asset_key_for_model
155
156
Generates the Dagster asset key for a dbt model.
157
158
```python { .api }
159
def get_asset_key_for_model(dbt_resource_props: Mapping[str, Any]) -> AssetKey:
160
"""
161
Get asset key for dbt model.
162
163
Parameters:
164
- dbt_resource_props: dbt model properties from manifest
165
166
Returns:
167
AssetKey for the dbt model
168
"""
169
```
170
171
#### get_asset_key_for_source
172
173
Generates the Dagster asset key for a dbt source.
174
175
```python { .api }
176
def get_asset_key_for_source(dbt_resource_props: Mapping[str, Any]) -> AssetKey:
177
"""
178
Get asset key for dbt source.
179
180
Parameters:
181
- dbt_resource_props: dbt source properties from manifest
182
183
Returns:
184
AssetKey for the dbt source
185
"""
186
```
187
188
#### get_asset_keys_by_output_name_for_source
189
190
Gets all asset keys associated with a dbt source, organized by output name.
191
192
```python { .api }
193
def get_asset_keys_by_output_name_for_source(
194
dbt_resource_props: Mapping[str, Any],
195
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None
196
) -> Mapping[str, AssetKey]:
197
"""
198
Get asset keys by output name for dbt source.
199
200
Parameters:
201
- dbt_resource_props: dbt source properties from manifest
202
- dagster_dbt_translator: Custom translator for asset key mapping
203
204
Returns:
205
Mapping from output names to AssetKey objects
206
"""
207
```
208
209
### Metadata Utilities
210
211
#### default_metadata_from_dbt_resource_props
212
213
Generates default Dagster metadata from dbt resource properties.
214
215
```python { .api }
216
def default_metadata_from_dbt_resource_props(
217
dbt_resource_props: Mapping[str, Any]
218
) -> Mapping[str, Any]:
219
"""
220
Generate default metadata from dbt resource properties.
221
222
Parameters:
223
- dbt_resource_props: dbt resource properties from manifest
224
225
Returns:
226
Dictionary of Dagster metadata
227
"""
228
```
229
230
#### default_group_from_dbt_resource_props
231
232
Determines the default group name for a dbt resource.
233
234
```python { .api }
235
def default_group_from_dbt_resource_props(
236
dbt_resource_props: Mapping[str, Any]
237
) -> Optional[str]:
238
"""
239
Get default group name from dbt resource properties.
240
241
Parameters:
242
- dbt_resource_props: dbt resource properties from manifest
243
244
Returns:
245
Group name string or None
246
"""
247
```
248
249
#### group_from_dbt_resource_props_fallback_to_directory
250
251
Gets group name with fallback to directory-based naming.
252
253
```python { .api }
254
def group_from_dbt_resource_props_fallback_to_directory(
255
dbt_resource_props: Mapping[str, Any]
256
) -> Optional[str]:
257
"""
258
Get group name with directory fallback.
259
260
Parameters:
261
- dbt_resource_props: dbt resource properties from manifest
262
263
Returns:
264
Group name string or None
265
"""
266
```
267
268
## Type Definitions
269
270
```python { .api }
271
# Type alias for manifest parameter
272
DbtManifestParam = Union[Mapping[str, Any], str, Path]
273
```
274
275
## Constants
276
277
- `DBT_DEFAULT_SELECT` = "fqn:*" - Default selection includes all models
278
- `DBT_DEFAULT_EXCLUDE` = "" - Default exclusion is empty
279
- `ASSET_RESOURCE_TYPES` = ["model", "seed", "snapshot"] - dbt resource types treated as assets