0
# Utilities and Helpers
1
2
Utility functions for asset selection, naming conventions, metadata handling, and manifest operations. These utilities provide common functionality needed for dbt-Dagster integration.
3
4
## Capabilities
5
6
### Asset Selection Utilities
7
8
#### select_unique_ids
9
10
Selects dbt resources by their unique IDs using dbt's selection syntax.
11
12
```python { .api }
13
def select_unique_ids(
14
manifest: Mapping[str, Any],
15
select: str,
16
exclude: str = ""
17
) -> Set[str]:
18
"""
19
Select dbt resources by unique IDs using dbt selection syntax.
20
21
Parameters:
22
- manifest: Parsed dbt manifest dictionary
23
- select: dbt selection string (e.g., "tag:daily", "model:my_model+")
24
- exclude: dbt exclusion string
25
26
Returns:
27
Set of unique IDs for selected dbt resources
28
"""
29
```
30
31
### Naming Utilities
32
33
#### dagster_name_fn
34
35
Generates Dagster-compatible names from dbt resource properties.
36
37
```python { .api }
38
def dagster_name_fn(dbt_resource_props: Mapping[str, Any]) -> str:
39
"""
40
Generate Dagster name from dbt resource properties.
41
42
Creates a valid Dagster asset name from dbt resource information,
43
handling special characters and ensuring uniqueness.
44
45
Parameters:
46
- dbt_resource_props: dbt resource properties from manifest
47
48
Returns:
49
Valid Dagster asset name string
50
"""
51
```
52
53
### Asset Key Utilities
54
55
#### default_node_info_to_asset_key
56
57
Default function for converting dbt node information to Dagster asset keys.
58
59
```python { .api }
60
def default_node_info_to_asset_key(node_info: Mapping[str, Any]) -> AssetKey:
61
"""
62
Convert dbt node info to asset key using default logic.
63
64
Parameters:
65
- node_info: dbt node information dictionary
66
67
Returns:
68
AssetKey for the dbt node
69
"""
70
```
71
72
### Metadata Utilities
73
74
#### DbtMetadataSet
75
76
Metadata entries specifically for dbt objects and their integration with Dagster.
77
78
```python { .api }
79
class DbtMetadataSet:
80
"""
81
Collection of metadata entries for dbt objects.
82
83
Provides standardized metadata that can be attached to Dagster
84
assets representing dbt models, sources, and tests.
85
"""
86
87
@property
88
def materialization_type(self) -> MetadataEntry:
89
"""
90
Metadata entry for dbt materialization type.
91
92
Returns:
93
MetadataEntry indicating the dbt materialization (table, view, etc.)
94
"""
95
```
96
97
### Manifest Utilities
98
99
#### validate_manifest
100
101
Validates and loads dbt manifest from various input formats.
102
103
```python { .api }
104
def validate_manifest(manifest: DbtManifestParam) -> Mapping[str, Any]:
105
"""
106
Validate and load dbt manifest from various formats.
107
108
Parameters:
109
- manifest: Manifest as dict, file path, or JSON string
110
111
Returns:
112
Validated manifest dictionary
113
114
Raises:
115
DagsterDbtError: If manifest is invalid
116
"""
117
```
118
119
#### read_manifest_path
120
121
Reads and caches manifest content from file path.
122
123
```python { .api }
124
def read_manifest_path(manifest_path: str) -> Mapping[str, Any]:
125
"""
126
Read manifest from file path with caching.
127
128
Uses internal caching to avoid repeated file I/O for the same
129
manifest file within a single process.
130
131
Parameters:
132
- manifest_path: Path to manifest.json file
133
134
Returns:
135
Parsed manifest dictionary
136
137
Raises:
138
DagsterDbtManifestNotFoundError: If file doesn't exist
139
"""
140
```
141
142
## Usage Examples
143
144
### Custom Asset Selection
145
146
```python
147
from dagster import AssetSelection
148
from dagster_dbt.utils import select_unique_ids
149
from dagster_dbt.dbt_manifest import validate_manifest
150
151
def create_custom_asset_selection(manifest_path: str, criteria: str) -> AssetSelection:
152
"""Create asset selection from custom criteria."""
153
manifest = validate_manifest(manifest_path)
154
155
# Get unique IDs for selected models
156
selected_ids = select_unique_ids(
157
manifest=manifest,
158
select=criteria,
159
exclude="tag:deprecated"
160
)
161
162
# Convert to asset keys (assuming standard naming)
163
asset_keys = []
164
for unique_id in selected_ids:
165
node = manifest["nodes"][unique_id]
166
asset_key = AssetKey([node["schema"], node["name"]])
167
asset_keys.append(asset_key)
168
169
return AssetSelection.keys(*asset_keys)
170
171
# Use custom selection
172
selection = create_custom_asset_selection(
173
"./target/manifest.json",
174
"tag:daily +models/marts/"
175
)
176
```
177
178
### Asset Key Generation
179
180
```python
181
from dagster import AssetKey
182
from dagster_dbt.utils import dagster_name_fn, default_node_info_to_asset_key
183
184
def custom_asset_key_generator(dbt_resource_props: dict) -> AssetKey:
185
"""Generate custom asset keys with environment prefix."""
186
# Use utility to get base asset key
187
base_key = default_node_info_to_asset_key(dbt_resource_props)
188
189
# Add environment prefix
190
environment = os.getenv("DBT_TARGET", "dev")
191
return AssetKey([environment] + list(base_key.path))
192
193
def custom_naming_function(dbt_resource_props: dict) -> str:
194
"""Generate custom Dagster asset names."""
195
# Use utility for base name
196
base_name = dagster_name_fn(dbt_resource_props)
197
198
# Add materialization suffix
199
materialization = dbt_resource_props.get("config", {}).get("materialized", "view")
200
return f"{base_name}_{materialization}"
201
```
202
203
### Metadata Enhancement
204
205
```python
206
from dagster import MetadataEntry
207
from dagster_dbt.metadata_set import DbtMetadataSet
208
209
def enhanced_dbt_metadata(dbt_resource_props: dict) -> dict:
210
"""Generate enhanced metadata for dbt assets."""
211
metadata = {}
212
213
# Use DbtMetadataSet for standard entries
214
dbt_metadata = DbtMetadataSet()
215
if "materialized" in dbt_resource_props.get("config", {}):
216
metadata["materialization"] = dbt_metadata.materialization_type
217
218
# Add custom metadata
219
if "description" in dbt_resource_props:
220
metadata["description"] = MetadataEntry.text(dbt_resource_props["description"])
221
222
# Add column count
223
columns = dbt_resource_props.get("columns", {})
224
if columns:
225
metadata["column_count"] = MetadataEntry.int(len(columns))
226
227
# Document column coverage
228
documented_columns = sum(
229
1 for col in columns.values()
230
if col.get("description")
231
)
232
metadata["documentation_coverage"] = MetadataEntry.float(
233
documented_columns / len(columns) if columns else 0.0
234
)
235
236
# Add test information
237
if "test" in dbt_resource_props.get("resource_type", ""):
238
test_config = dbt_resource_props.get("test_metadata", {})
239
metadata["test_type"] = MetadataEntry.text(
240
test_config.get("name", "unknown")
241
)
242
243
return metadata
244
```
245
246
### Manifest Processing
247
248
```python
249
from dagster_dbt.utils import validate_manifest, read_manifest_path
250
from dagster_dbt.errors import DagsterDbtManifestNotFoundError
251
252
def analyze_dbt_project(manifest_input) -> dict:
253
"""Analyze dbt project from manifest."""
254
try:
255
# Validate and load manifest
256
manifest = validate_manifest(manifest_input)
257
258
# Analyze nodes
259
nodes = manifest.get("nodes", {})
260
sources = manifest.get("sources", {})
261
262
analysis = {
263
"total_nodes": len(nodes),
264
"total_sources": len(sources),
265
"models": len([n for n in nodes.values() if n.get("resource_type") == "model"]),
266
"tests": len([n for n in nodes.values() if n.get("resource_type") == "test"]),
267
"snapshots": len([n for n in nodes.values() if n.get("resource_type") == "snapshot"]),
268
"seeds": len([n for n in nodes.values() if n.get("resource_type") == "seed"]),
269
}
270
271
# Analyze materializations
272
materializations = {}
273
for node in nodes.values():
274
if node.get("resource_type") == "model":
275
mat_type = node.get("config", {}).get("materialized", "view")
276
materializations[mat_type] = materializations.get(mat_type, 0) + 1
277
278
analysis["materializations"] = materializations
279
280
# Analyze tags
281
all_tags = set()
282
for node in nodes.values():
283
all_tags.update(node.get("tags", []))
284
285
analysis["unique_tags"] = sorted(list(all_tags))
286
analysis["tag_count"] = len(all_tags)
287
288
return analysis
289
290
except DagsterDbtManifestNotFoundError as e:
291
return {"error": f"Manifest not found: {e}"}
292
except Exception as e:
293
return {"error": f"Analysis failed: {e}"}
294
295
# Analyze from file path
296
analysis = analyze_dbt_project("./target/manifest.json")
297
print(f"Found {analysis['models']} models and {analysis['tests']} tests")
298
299
# Analyze from manifest dict
300
with open("./target/manifest.json") as f:
301
manifest_dict = json.load(f)
302
analysis = analyze_dbt_project(manifest_dict)
303
```
304
305
### Selection Validation
306
307
```python
308
from dagster_dbt.utils import select_unique_ids
309
310
def validate_dbt_selection(manifest: dict, select: str, exclude: str = "") -> dict:
311
"""Validate dbt selection criteria and return statistics."""
312
try:
313
selected_ids = select_unique_ids(
314
manifest=manifest,
315
select=select,
316
exclude=exclude
317
)
318
319
# Analyze selection results
320
selected_nodes = {
321
unique_id: manifest["nodes"][unique_id]
322
for unique_id in selected_ids
323
if unique_id in manifest["nodes"]
324
}
325
326
# Group by resource type
327
by_type = {}
328
for node in selected_nodes.values():
329
resource_type = node.get("resource_type", "unknown")
330
by_type[resource_type] = by_type.get(resource_type, 0) + 1
331
332
return {
333
"valid": True,
334
"selected_count": len(selected_ids),
335
"by_type": by_type,
336
"selected_models": [
337
node["name"] for node in selected_nodes.values()
338
if node.get("resource_type") == "model"
339
]
340
}
341
342
except Exception as e:
343
return {
344
"valid": False,
345
"error": str(e)
346
}
347
348
# Validate selection
349
result = validate_dbt_selection(
350
manifest=manifest,
351
select="tag:daily",
352
exclude="tag:slow"
353
)
354
355
if result["valid"]:
356
print(f"Selection is valid, found {result['selected_count']} resources")
357
else:
358
print(f"Selection is invalid: {result['error']}")
359
```
360
361
## Constants
362
363
```python { .api }
364
# Asset resource types that are treated as assets
365
ASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"]
366
```
367
368
## Type Definitions
369
370
```python { .api }
371
from dagster import AssetKey, MetadataEntry
372
from typing import Mapping, Any, Set, Union
373
from pathlib import Path
374
375
# Type alias for manifest parameter
376
DbtManifestParam = Union[Mapping[str, Any], str, Path]
377
```