or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md

utilities.mddocs/

0

# Utilities and Helpers

1

2

Utility functions for asset selection, naming conventions, metadata handling, and manifest operations. These utilities provide common functionality needed for dbt-Dagster integration.

3

4

## Capabilities

5

6

### Asset Selection Utilities

7

8

#### select_unique_ids

9

10

Selects dbt resources by their unique IDs using dbt's selection syntax.

11

12

```python { .api }

13

def select_unique_ids(

14

manifest: Mapping[str, Any],

15

select: str,

16

exclude: str = ""

17

) -> Set[str]:

18

"""

19

Select dbt resources by unique IDs using dbt selection syntax.

20

21

Parameters:

22

- manifest: Parsed dbt manifest dictionary

23

- select: dbt selection string (e.g., "tag:daily", "model:my_model+")

24

- exclude: dbt exclusion string

25

26

Returns:

27

Set of unique IDs for selected dbt resources

28

"""

29

```

30

31

### Naming Utilities

32

33

#### dagster_name_fn

34

35

Generates Dagster-compatible names from dbt resource properties.

36

37

```python { .api }

38

def dagster_name_fn(dbt_resource_props: Mapping[str, Any]) -> str:

39

"""

40

Generate Dagster name from dbt resource properties.

41

42

Creates a valid Dagster asset name from dbt resource information,

43

handling special characters and ensuring uniqueness.

44

45

Parameters:

46

- dbt_resource_props: dbt resource properties from manifest

47

48

Returns:

49

Valid Dagster asset name string

50

"""

51

```

52

53

### Asset Key Utilities

54

55

#### default_node_info_to_asset_key

56

57

Default function for converting dbt node information to Dagster asset keys.

58

59

```python { .api }

60

def default_node_info_to_asset_key(node_info: Mapping[str, Any]) -> AssetKey:

61

"""

62

Convert dbt node info to asset key using default logic.

63

64

Parameters:

65

- node_info: dbt node information dictionary

66

67

Returns:

68

AssetKey for the dbt node

69

"""

70

```

71

72

### Metadata Utilities

73

74

#### DbtMetadataSet

75

76

Metadata entries specifically for dbt objects and their integration with Dagster.

77

78

```python { .api }

79

class DbtMetadataSet:

80

"""

81

Collection of metadata entries for dbt objects.

82

83

Provides standardized metadata that can be attached to Dagster

84

assets representing dbt models, sources, and tests.

85

"""

86

87

@property

88

def materialization_type(self) -> MetadataEntry:

89

"""

90

Metadata entry for dbt materialization type.

91

92

Returns:

93

MetadataEntry indicating the dbt materialization (table, view, etc.)

94

"""

95

```

96

97

### Manifest Utilities

98

99

#### validate_manifest

100

101

Validates and loads dbt manifest from various input formats.

102

103

```python { .api }

104

def validate_manifest(manifest: DbtManifestParam) -> Mapping[str, Any]:

105

"""

106

Validate and load dbt manifest from various formats.

107

108

Parameters:

109

- manifest: Manifest as dict, file path, or JSON string

110

111

Returns:

112

Validated manifest dictionary

113

114

Raises:

115

DagsterDbtError: If manifest is invalid

116

"""

117

```

118

119

#### read_manifest_path

120

121

Reads and caches manifest content from file path.

122

123

```python { .api }

124

def read_manifest_path(manifest_path: str) -> Mapping[str, Any]:

125

"""

126

Read manifest from file path with caching.

127

128

Uses internal caching to avoid repeated file I/O for the same

129

manifest file within a single process.

130

131

Parameters:

132

- manifest_path: Path to manifest.json file

133

134

Returns:

135

Parsed manifest dictionary

136

137

Raises:

138

DagsterDbtManifestNotFoundError: If file doesn't exist

139

"""

140

```

141

142

## Usage Examples

143

144

### Custom Asset Selection

145

146

```python

147

from dagster import AssetSelection

148

from dagster_dbt.utils import select_unique_ids

149

from dagster_dbt.dbt_manifest import validate_manifest

150

151

def create_custom_asset_selection(manifest_path: str, criteria: str) -> AssetSelection:

152

"""Create asset selection from custom criteria."""

153

manifest = validate_manifest(manifest_path)

154

155

# Get unique IDs for selected models

156

selected_ids = select_unique_ids(

157

manifest=manifest,

158

select=criteria,

159

exclude="tag:deprecated"

160

)

161

162

# Convert to asset keys (assuming standard naming)

163

asset_keys = []

164

for unique_id in selected_ids:

165

node = manifest["nodes"][unique_id]

166

asset_key = AssetKey([node["schema"], node["name"]])

167

asset_keys.append(asset_key)

168

169

return AssetSelection.keys(*asset_keys)

170

171

# Use custom selection

172

selection = create_custom_asset_selection(

173

"./target/manifest.json",

174

"tag:daily +models/marts/"

175

)

176

```

177

178

### Asset Key Generation

179

180

```python

181

from dagster import AssetKey

182

from dagster_dbt.utils import dagster_name_fn, default_node_info_to_asset_key

183

184

def custom_asset_key_generator(dbt_resource_props: dict) -> AssetKey:

185

"""Generate custom asset keys with environment prefix."""

186

# Use utility to get base asset key

187

base_key = default_node_info_to_asset_key(dbt_resource_props)

188

189

# Add environment prefix

190

environment = os.getenv("DBT_TARGET", "dev")

191

return AssetKey([environment] + list(base_key.path))

192

193

def custom_naming_function(dbt_resource_props: dict) -> str:

194

"""Generate custom Dagster asset names."""

195

# Use utility for base name

196

base_name = dagster_name_fn(dbt_resource_props)

197

198

# Add materialization suffix

199

materialization = dbt_resource_props.get("config", {}).get("materialized", "view")

200

return f"{base_name}_{materialization}"

201

```

202

203

### Metadata Enhancement

204

205

```python

206

from dagster import MetadataEntry

207

from dagster_dbt.metadata_set import DbtMetadataSet

208

209

def enhanced_dbt_metadata(dbt_resource_props: dict) -> dict:

210

"""Generate enhanced metadata for dbt assets."""

211

metadata = {}

212

213

# Use DbtMetadataSet for standard entries

214

dbt_metadata = DbtMetadataSet()

215

if "materialized" in dbt_resource_props.get("config", {}):

216

metadata["materialization"] = dbt_metadata.materialization_type

217

218

# Add custom metadata

219

if "description" in dbt_resource_props:

220

metadata["description"] = MetadataEntry.text(dbt_resource_props["description"])

221

222

# Add column count

223

columns = dbt_resource_props.get("columns", {})

224

if columns:

225

metadata["column_count"] = MetadataEntry.int(len(columns))

226

227

# Document column coverage

228

documented_columns = sum(

229

1 for col in columns.values()

230

if col.get("description")

231

)

232

metadata["documentation_coverage"] = MetadataEntry.float(

233

documented_columns / len(columns) if columns else 0.0

234

)

235

236

# Add test information

237

if "test" in dbt_resource_props.get("resource_type", ""):

238

test_config = dbt_resource_props.get("test_metadata", {})

239

metadata["test_type"] = MetadataEntry.text(

240

test_config.get("name", "unknown")

241

)

242

243

return metadata

244

```

245

246

### Manifest Processing

247

248

```python

249

from dagster_dbt.utils import validate_manifest, read_manifest_path

250

from dagster_dbt.errors import DagsterDbtManifestNotFoundError

251

252

def analyze_dbt_project(manifest_input) -> dict:

253

"""Analyze dbt project from manifest."""

254

try:

255

# Validate and load manifest

256

manifest = validate_manifest(manifest_input)

257

258

# Analyze nodes

259

nodes = manifest.get("nodes", {})

260

sources = manifest.get("sources", {})

261

262

analysis = {

263

"total_nodes": len(nodes),

264

"total_sources": len(sources),

265

"models": len([n for n in nodes.values() if n.get("resource_type") == "model"]),

266

"tests": len([n for n in nodes.values() if n.get("resource_type") == "test"]),

267

"snapshots": len([n for n in nodes.values() if n.get("resource_type") == "snapshot"]),

268

"seeds": len([n for n in nodes.values() if n.get("resource_type") == "seed"]),

269

}

270

271

# Analyze materializations

272

materializations = {}

273

for node in nodes.values():

274

if node.get("resource_type") == "model":

275

mat_type = node.get("config", {}).get("materialized", "view")

276

materializations[mat_type] = materializations.get(mat_type, 0) + 1

277

278

analysis["materializations"] = materializations

279

280

# Analyze tags

281

all_tags = set()

282

for node in nodes.values():

283

all_tags.update(node.get("tags", []))

284

285

analysis["unique_tags"] = sorted(list(all_tags))

286

analysis["tag_count"] = len(all_tags)

287

288

return analysis

289

290

except DagsterDbtManifestNotFoundError as e:

291

return {"error": f"Manifest not found: {e}"}

292

except Exception as e:

293

return {"error": f"Analysis failed: {e}"}

294

295

# Analyze from file path

296

analysis = analyze_dbt_project("./target/manifest.json")

297

print(f"Found {analysis['models']} models and {analysis['tests']} tests")

298

299

# Analyze from manifest dict

300

with open("./target/manifest.json") as f:

301

manifest_dict = json.load(f)

302

analysis = analyze_dbt_project(manifest_dict)

303

```

304

305

### Selection Validation

306

307

```python

308

from dagster_dbt.utils import select_unique_ids

309

310

def validate_dbt_selection(manifest: dict, select: str, exclude: str = "") -> dict:

311

"""Validate dbt selection criteria and return statistics."""

312

try:

313

selected_ids = select_unique_ids(

314

manifest=manifest,

315

select=select,

316

exclude=exclude

317

)

318

319

# Analyze selection results

320

selected_nodes = {

321

unique_id: manifest["nodes"][unique_id]

322

for unique_id in selected_ids

323

if unique_id in manifest["nodes"]

324

}

325

326

# Group by resource type

327

by_type = {}

328

for node in selected_nodes.values():

329

resource_type = node.get("resource_type", "unknown")

330

by_type[resource_type] = by_type.get(resource_type, 0) + 1

331

332

return {

333

"valid": True,

334

"selected_count": len(selected_ids),

335

"by_type": by_type,

336

"selected_models": [

337

node["name"] for node in selected_nodes.values()

338

if node.get("resource_type") == "model"

339

]

340

}

341

342

except Exception as e:

343

return {

344

"valid": False,

345

"error": str(e)

346

}

347

348

# Validate selection

349

result = validate_dbt_selection(

350

manifest=manifest,

351

select="tag:daily",

352

exclude="tag:slow"

353

)

354

355

if result["valid"]:

356

print(f"Selection is valid, found {result['selected_count']} resources")

357

else:

358

print(f"Selection is invalid: {result['error']}")

359

```

360

361

## Constants

362

363

```python { .api }

364

# Asset resource types that are treated as assets

365

ASSET_RESOURCE_TYPES = ["model", "seed", "snapshot"]

366

```

367

368

## Type Definitions

369

370

```python { .api }

371

from dagster import AssetKey, MetadataEntry

372

from typing import Mapping, Any, Set, Union

373

from pathlib import Path

374

375

# Type alias for manifest parameter

376

DbtManifestParam = Union[Mapping[str, Any], str, Path]

377

```