0
# Translation System
1
2
Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys. The translation system provides a flexible interface for controlling how dbt models, tests, and sources are represented as Dagster assets.
3
4
## Capabilities
5
6
### Core Translator
7
8
#### DagsterDbtTranslator
9
10
Main translator class that maps dbt resources to Dagster asset specifications.
11
12
```python { .api }
13
class DagsterDbtTranslator:
14
"""
15
Maps dbt resources to Dagster assets with customizable translation logic.
16
17
This class provides methods to control how dbt models, sources, and tests
18
are converted into Dagster assets, including asset keys, metadata, groups,
19
and other asset properties.
20
"""
21
22
def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...
23
24
def get_asset_spec(
25
self,
26
manifest: Mapping[str, Any],
27
unique_id: str,
28
project: Optional["DbtProject"]
29
) -> AssetSpec:
30
"""
31
Get AssetSpec for a dbt resource.
32
33
Parameters:
34
- manifest: Complete dbt manifest dictionary
35
- unique_id: Unique ID of the dbt resource
36
- project: DbtProject instance (optional)
37
38
Returns:
39
AssetSpec object for the dbt resource
40
"""
41
42
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
43
"""
44
Get asset key for a dbt resource.
45
46
Parameters:
47
- dbt_resource_props: dbt resource properties from manifest
48
49
Returns:
50
AssetKey for the dbt resource
51
"""
52
53
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
54
"""
55
Get group name for a dbt resource.
56
57
Parameters:
58
- dbt_resource_props: dbt resource properties from manifest
59
60
Returns:
61
Group name string or None
62
"""
63
64
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
65
"""
66
Get metadata for a dbt resource.
67
68
Parameters:
69
- dbt_resource_props: dbt resource properties from manifest
70
71
Returns:
72
Dictionary of Dagster metadata
73
"""
74
75
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
76
"""
77
Get tags for a dbt resource.
78
79
Parameters:
80
- dbt_resource_props: dbt resource properties from manifest
81
82
Returns:
83
Dictionary of asset tags
84
"""
85
86
def get_freshness_policy(
87
self,
88
dbt_resource_props: Mapping[str, Any]
89
) -> Optional[FreshnessPolicy]:
90
"""
91
Get freshness policy for a dbt resource.
92
93
Parameters:
94
- dbt_resource_props: dbt resource properties from manifest
95
96
Returns:
97
FreshnessPolicy object or None
98
"""
99
100
def get_auto_materialize_policy(
101
self,
102
dbt_resource_props: Mapping[str, Any]
103
) -> Optional[AutoMaterializePolicy]:
104
"""
105
Get auto-materialize policy for a dbt resource.
106
107
Parameters:
108
- dbt_resource_props: dbt resource properties from manifest
109
110
Returns:
111
AutoMaterializePolicy object or None
112
"""
113
114
def get_deps_asset_keys(
115
self,
116
dbt_resource_props: Mapping[str, Any],
117
manifest: Mapping[str, Any]
118
) -> Iterable[AssetKey]:
119
"""
120
Get dependency asset keys for a dbt resource.
121
122
Parameters:
123
- dbt_resource_props: dbt resource properties from manifest
124
- manifest: Complete dbt manifest dictionary
125
126
Returns:
127
Iterable of AssetKey objects representing dependencies
128
"""
129
130
def get_asset_check_spec(
131
self,
132
asset_spec: AssetSpec,
133
manifest: Mapping[str, Any],
134
unique_id: str,
135
project: Optional["DbtProject"]
136
) -> Optional[AssetCheckSpec]:
137
"""
138
Get AssetCheckSpec for a dbt test.
139
140
Parameters:
141
- asset_spec: AssetSpec for the target asset
142
- manifest: Complete dbt manifest dictionary
143
- unique_id: Unique ID of the dbt test
144
- project: DbtProject instance (optional)
145
146
Returns:
147
AssetCheckSpec object for the dbt test or None
148
"""
149
150
def get_partition_mapping(
151
self,
152
dbt_resource_props: Mapping[str, Any],
153
dbt_parent_resource_props: Mapping[str, Any]
154
) -> Optional[PartitionMapping]:
155
"""
156
Get partition mapping between assets.
157
158
Parameters:
159
- dbt_resource_props: Child dbt resource properties
160
- dbt_parent_resource_props: Parent dbt resource properties
161
162
Returns:
163
PartitionMapping or None
164
"""
165
166
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
167
"""
168
Get description for a dbt resource.
169
170
Parameters:
171
- dbt_resource_props: dbt resource properties from manifest
172
173
Returns:
174
Description string
175
"""
176
177
def get_code_version(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
178
"""
179
Get code version for a dbt resource.
180
181
Parameters:
182
- dbt_resource_props: dbt resource properties from manifest
183
184
Returns:
185
Code version string or None
186
"""
187
188
def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]:
189
"""
190
Get owners for a dbt resource.
191
192
Parameters:
193
- dbt_resource_props: dbt resource properties from manifest
194
195
Returns:
196
List of owner strings or None
197
"""
198
199
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
200
"""
201
Get automation condition for a dbt resource.
202
203
Parameters:
204
- dbt_resource_props: dbt resource properties from manifest
205
206
Returns:
207
AutomationCondition or None
208
"""
209
210
def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]:
211
"""
212
Get partitions definition for a dbt resource.
213
214
Parameters:
215
- dbt_resource_props: dbt resource properties from manifest
216
217
Returns:
218
PartitionsDefinition or None
219
"""
220
```
221
222
### Translator Settings
223
224
#### DagsterDbtTranslatorSettings
225
226
Configuration settings that control translator behavior and feature enablement.
227
228
```python { .api }
229
@dataclass(frozen=True)
230
class DagsterDbtTranslatorSettings(Resolvable):
231
"""
232
Settings for controlling DagsterDbtTranslator behavior.
233
234
Attributes:
235
- enable_asset_checks: Whether to create asset checks from dbt tests
236
- enable_duplicate_source_asset_keys: Allow duplicate asset keys for sources
237
- enable_code_references: Whether to include code references in metadata
238
- enable_dbt_selection_by_name: Enable selection by name instead of unique_id
239
- enable_source_tests_as_checks: Create checks for source tests
240
"""
241
242
enable_asset_checks: bool = True
243
enable_duplicate_source_asset_keys: bool = False
244
enable_code_references: bool = False
245
enable_dbt_selection_by_name: bool = False
246
enable_source_tests_as_checks: bool = False
247
```
248
249
## Usage Examples
250
251
### Custom Asset Key Generation
252
253
```python
254
from dagster import AssetKey
255
from dagster_dbt import DagsterDbtTranslator
256
257
class CustomAssetKeyTranslator(DagsterDbtTranslator):
258
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
259
"""Generate custom asset keys with database prefix."""
260
database = dbt_resource_props.get("database", "default")
261
schema = dbt_resource_props.get("schema", "default")
262
name = dbt_resource_props["name"]
263
264
return AssetKey([database, schema, name])
265
266
def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
267
"""Group assets by dbt package."""
268
package_name = dbt_resource_props.get("package_name")
269
if package_name and package_name != "my_project":
270
return f"dbt_package_{package_name}"
271
272
# Use directory structure for main project
273
fqn = dbt_resource_props.get("fqn", [])
274
if len(fqn) > 1:
275
return fqn[1] # First subdirectory
276
277
return "default"
278
279
# Use custom translator
280
from dagster_dbt import dbt_assets
281
282
@dbt_assets(
283
manifest="./target/manifest.json",
284
dagster_dbt_translator=CustomAssetKeyTranslator()
285
)
286
def my_dbt_assets(context, dbt):
287
yield from dbt.cli(["build"], context=context).stream()
288
```
289
290
### Environment-Specific Translation
291
292
```python
293
import os
294
from dagster_dbt import DagsterDbtTranslator
295
296
class EnvironmentDbtTranslator(DagsterDbtTranslator):
297
def __init__(self, environment: str = "dev"):
298
self.environment = environment
299
300
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
301
"""Prefix asset keys with environment."""
302
base_key = super().get_asset_key(dbt_resource_props)
303
return AssetKey([self.environment] + list(base_key.path))
304
305
def get_metadata(self, dbt_resource_props: dict) -> dict:
306
"""Add environment metadata."""
307
metadata = super().get_metadata(dbt_resource_props)
308
metadata["environment"] = self.environment
309
metadata["target_database"] = dbt_resource_props.get("database")
310
return metadata
311
312
def get_tags(self, dbt_resource_props: dict) -> dict:
313
"""Add environment and materialization tags."""
314
tags = super().get_tags(dbt_resource_props)
315
tags["environment"] = self.environment
316
317
config = dbt_resource_props.get("config", {})
318
if "materialized" in config:
319
tags["materialization"] = config["materialized"]
320
321
return tags
322
323
# Use environment-specific translator
324
environment = os.getenv("DAGSTER_ENV", "dev")
325
translator = EnvironmentDbtTranslator(environment=environment)
326
327
@dbt_assets(
328
manifest="./target/manifest.json",
329
dagster_dbt_translator=translator
330
)
331
def environment_dbt_assets(context, dbt):
332
yield from dbt.cli(["build"], context=context).stream()
333
```
334
335
### Freshness Policy Translation
336
337
```python
338
from dagster import FreshnessPolicy
339
from dagster_dbt import DagsterDbtTranslator
340
from datetime import timedelta
341
342
class FreshnessPolicyTranslator(DagsterDbtTranslator):
343
def get_freshness_policy(self, dbt_resource_props: dict) -> Optional[FreshnessPolicy]:
344
"""Set freshness policies based on dbt configuration."""
345
config = dbt_resource_props.get("config", {})
346
347
# Check for custom freshness configuration
348
if "dagster_freshness_policy" in config:
349
policy_config = config["dagster_freshness_policy"]
350
return FreshnessPolicy(
351
maximum_lag_minutes=policy_config.get("maximum_lag_minutes", 60),
352
cron_schedule=policy_config.get("cron_schedule")
353
)
354
355
# Set default policies based on materialization
356
materialized = config.get("materialized")
357
if materialized == "incremental":
358
return FreshnessPolicy(maximum_lag_minutes=30)
359
elif materialized == "snapshot":
360
return FreshnessPolicy(maximum_lag_minutes=60 * 24) # 24 hours
361
362
return None
363
364
def get_auto_materialize_policy(self, dbt_resource_props: dict) -> Optional[AutoMaterializePolicy]:
365
"""Set auto-materialize policies for specific models."""
366
tags = dbt_resource_props.get("tags", [])
367
368
if "auto_materialize" in tags:
369
return AutoMaterializePolicy.eager()
370
elif "lazy_materialize" in tags:
371
return AutoMaterializePolicy.lazy()
372
373
return None
374
375
@dbt_assets(
376
manifest="./target/manifest.json",
377
dagster_dbt_translator=FreshnessPolicyTranslator()
378
)
379
def fresh_dbt_assets(context, dbt):
380
yield from dbt.cli(["build"], context=context).stream()
381
```
382
383
### Multi-Tenant Translation
384
385
```python
386
from dagster import AssetKey
387
from dagster_dbt import DagsterDbtTranslator
388
389
class MultiTenantTranslator(DagsterDbtTranslator):
390
def __init__(self, tenant_id: str):
391
self.tenant_id = tenant_id
392
393
def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:
394
"""Create tenant-specific asset keys."""
395
base_key = super().get_asset_key(dbt_resource_props)
396
return AssetKey([self.tenant_id] + list(base_key.path))
397
398
def get_metadata(self, dbt_resource_props: dict) -> dict:
399
"""Add tenant metadata."""
400
metadata = super().get_metadata(dbt_resource_props)
401
metadata.update({
402
"tenant_id": self.tenant_id,
403
"tenant_database": f"{self.tenant_id}_{dbt_resource_props.get('database', 'default')}",
404
"tenant_schema": f"{self.tenant_id}_{dbt_resource_props.get('schema', 'default')}"
405
})
406
return metadata
407
408
def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:
409
"""Group by tenant and model type."""
410
base_group = super().get_group_name(dbt_resource_props)
411
return f"{self.tenant_id}_{base_group}" if base_group else self.tenant_id
412
413
# Create tenant-specific assets
414
def create_tenant_assets(tenant_id: str):
415
translator = MultiTenantTranslator(tenant_id=tenant_id)
416
417
@dbt_assets(
418
name=f"{tenant_id}_dbt_assets",
419
manifest=f"./tenants/{tenant_id}/target/manifest.json",
420
dagster_dbt_translator=translator
421
)
422
def tenant_dbt_assets(context, dbt):
423
yield from dbt.cli(
424
["build", "--target", tenant_id],
425
context=context
426
).stream()
427
428
return tenant_dbt_assets
429
430
# Create assets for multiple tenants
431
tenant_assets = [
432
create_tenant_assets("client_a"),
433
create_tenant_assets("client_b"),
434
create_tenant_assets("client_c")
435
]
436
```
437
438
## Advanced Translation Patterns
439
440
### Dependency Override
441
442
```python
443
from dagster_dbt import DagsterDbtTranslator
444
445
class DependencyOverrideTranslator(DagsterDbtTranslator):
446
def get_deps_asset_keys(self, dbt_resource_props: dict, manifest: dict) -> Iterable[AssetKey]:
447
"""Override dependencies for specific models."""
448
model_name = dbt_resource_props["name"]
449
450
# Custom dependency logic for specific models
451
if model_name == "critical_summary":
452
# Force dependency on external data source
453
yield AssetKey(["external", "data_feed"])
454
455
# Include normal dbt dependencies
456
yield from super().get_deps_asset_keys(dbt_resource_props, manifest)
457
```
458
459
### Code Reference Enhancement
460
461
```python
462
from dagster_dbt import DagsterDbtTranslator
463
464
class CodeReferenceTranslator(DagsterDbtTranslator):
465
def get_metadata(self, dbt_resource_props: dict) -> dict:
466
"""Enhanced code references and documentation."""
467
metadata = super().get_metadata(dbt_resource_props)
468
469
# Add enhanced code references
470
original_file_path = dbt_resource_props.get("original_file_path")
471
if original_file_path:
472
metadata["dbt_model_path"] = original_file_path
473
metadata["github_link"] = f"https://github.com/myorg/dbt-project/blob/main/{original_file_path}"
474
475
# Add documentation metadata
476
description = dbt_resource_props.get("description")
477
if description:
478
metadata["model_description"] = description
479
480
# Add column information
481
columns = dbt_resource_props.get("columns", {})
482
if columns:
483
metadata["column_count"] = len(columns)
484
metadata["documented_columns"] = len([
485
col for col in columns.values()
486
if col.get("description")
487
])
488
489
return metadata
490
```
491
492
## Type Definitions
493
494
```python { .api }
495
from dagster import AssetKey, AssetSpec, FreshnessPolicy, AutoMaterializePolicy
496
from typing import Mapping, Any, Optional, Iterable
497
```