or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md

translation-system.mddocs/

0

# Translation System

1

2

Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys. The translation system provides a flexible interface for controlling how dbt models, tests, and sources are represented as Dagster assets.

3

4

## Capabilities

5

6

### Core Translator

7

8

#### DagsterDbtTranslator

9

10

Main translator class that maps dbt resources to Dagster asset specifications.

11

12

```python { .api }

13

class DagsterDbtTranslator:

14

"""

15

Maps dbt resources to Dagster assets with customizable translation logic.

16

17

This class provides methods to control how dbt models, sources, and tests

18

are converted into Dagster assets, including asset keys, metadata, groups,

19

and other asset properties.

20

"""

21

22

def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...

23

24

def get_asset_spec(

25

self,

26

manifest: Mapping[str, Any],

27

unique_id: str,

28

project: Optional["DbtProject"]

29

) -> AssetSpec:

30

"""

31

Get AssetSpec for a dbt resource.

32

33

Parameters:

34

- manifest: Complete dbt manifest dictionary

35

- unique_id: Unique ID of the dbt resource

36

- project: DbtProject instance (optional)

37

38

Returns:

39

AssetSpec object for the dbt resource

40

"""

41

42

def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:

43

"""

44

Get asset key for a dbt resource.

45

46

Parameters:

47

- dbt_resource_props: dbt resource properties from manifest

48

49

Returns:

50

AssetKey for the dbt resource

51

"""

52

53

def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:

54

"""

55

Get group name for a dbt resource.

56

57

Parameters:

58

- dbt_resource_props: dbt resource properties from manifest

59

60

Returns:

61

Group name string or None

62

"""

63

64

def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:

65

"""

66

Get metadata for a dbt resource.

67

68

Parameters:

69

- dbt_resource_props: dbt resource properties from manifest

70

71

Returns:

72

Dictionary of Dagster metadata

73

"""

74

75

def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:

76

"""

77

Get tags for a dbt resource.

78

79

Parameters:

80

- dbt_resource_props: dbt resource properties from manifest

81

82

Returns:

83

Dictionary of asset tags

84

"""

85

86

def get_freshness_policy(

87

self,

88

dbt_resource_props: Mapping[str, Any]

89

) -> Optional[FreshnessPolicy]:

90

"""

91

Get freshness policy for a dbt resource.

92

93

Parameters:

94

- dbt_resource_props: dbt resource properties from manifest

95

96

Returns:

97

FreshnessPolicy object or None

98

"""

99

100

def get_auto_materialize_policy(

101

self,

102

dbt_resource_props: Mapping[str, Any]

103

) -> Optional[AutoMaterializePolicy]:

104

"""

105

Get auto-materialize policy for a dbt resource.

106

107

Parameters:

108

- dbt_resource_props: dbt resource properties from manifest

109

110

Returns:

111

AutoMaterializePolicy object or None

112

"""

113

114

def get_deps_asset_keys(

115

self,

116

dbt_resource_props: Mapping[str, Any],

117

manifest: Mapping[str, Any]

118

) -> Iterable[AssetKey]:

119

"""

120

Get dependency asset keys for a dbt resource.

121

122

Parameters:

123

- dbt_resource_props: dbt resource properties from manifest

124

- manifest: Complete dbt manifest dictionary

125

126

Returns:

127

Iterable of AssetKey objects representing dependencies

128

"""

129

130

def get_asset_check_spec(

131

self,

132

asset_spec: AssetSpec,

133

manifest: Mapping[str, Any],

134

unique_id: str,

135

project: Optional["DbtProject"]

136

) -> Optional[AssetCheckSpec]:

137

"""

138

Get AssetCheckSpec for a dbt test.

139

140

Parameters:

141

- asset_spec: AssetSpec for the target asset

142

- manifest: Complete dbt manifest dictionary

143

- unique_id: Unique ID of the dbt test

144

- project: DbtProject instance (optional)

145

146

Returns:

147

AssetCheckSpec object for the dbt test or None

148

"""

149

150

def get_partition_mapping(

151

self,

152

dbt_resource_props: Mapping[str, Any],

153

dbt_parent_resource_props: Mapping[str, Any]

154

) -> Optional[PartitionMapping]:

155

"""

156

Get partition mapping between assets.

157

158

Parameters:

159

- dbt_resource_props: Child dbt resource properties

160

- dbt_parent_resource_props: Parent dbt resource properties

161

162

Returns:

163

PartitionMapping or None

164

"""

165

166

def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:

167

"""

168

Get description for a dbt resource.

169

170

Parameters:

171

- dbt_resource_props: dbt resource properties from manifest

172

173

Returns:

174

Description string

175

"""

176

177

def get_code_version(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:

178

"""

179

Get code version for a dbt resource.

180

181

Parameters:

182

- dbt_resource_props: dbt resource properties from manifest

183

184

Returns:

185

Code version string or None

186

"""

187

188

def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]:

189

"""

190

Get owners for a dbt resource.

191

192

Parameters:

193

- dbt_resource_props: dbt resource properties from manifest

194

195

Returns:

196

List of owner strings or None

197

"""

198

199

def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:

200

"""

201

Get automation condition for a dbt resource.

202

203

Parameters:

204

- dbt_resource_props: dbt resource properties from manifest

205

206

Returns:

207

AutomationCondition or None

208

"""

209

210

def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]:

211

"""

212

Get partitions definition for a dbt resource.

213

214

Parameters:

215

- dbt_resource_props: dbt resource properties from manifest

216

217

Returns:

218

PartitionsDefinition or None

219

"""

220

```

221

222

### Translator Settings

223

224

#### DagsterDbtTranslatorSettings

225

226

Configuration settings that control translator behavior and feature enablement.

227

228

```python { .api }

229

@dataclass(frozen=True)

230

class DagsterDbtTranslatorSettings(Resolvable):

231

"""

232

Settings for controlling DagsterDbtTranslator behavior.

233

234

Attributes:

235

- enable_asset_checks: Whether to create asset checks from dbt tests

236

- enable_duplicate_source_asset_keys: Allow duplicate asset keys for sources

237

- enable_code_references: Whether to include code references in metadata

238

- enable_dbt_selection_by_name: Enable selection by name instead of unique_id

239

- enable_source_tests_as_checks: Create checks for source tests

240

"""

241

242

enable_asset_checks: bool = True

243

enable_duplicate_source_asset_keys: bool = False

244

enable_code_references: bool = False

245

enable_dbt_selection_by_name: bool = False

246

enable_source_tests_as_checks: bool = False

247

```

248

249

## Usage Examples

250

251

### Custom Asset Key Generation

252

253

```python

254

from dagster import AssetKey

255

from dagster_dbt import DagsterDbtTranslator

256

257

class CustomAssetKeyTranslator(DagsterDbtTranslator):

258

def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:

259

"""Generate custom asset keys with database prefix."""

260

database = dbt_resource_props.get("database", "default")

261

schema = dbt_resource_props.get("schema", "default")

262

name = dbt_resource_props["name"]

263

264

return AssetKey([database, schema, name])

265

266

def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:

267

"""Group assets by dbt package."""

268

package_name = dbt_resource_props.get("package_name")

269

if package_name and package_name != "my_project":

270

return f"dbt_package_{package_name}"

271

272

# Use directory structure for main project

273

fqn = dbt_resource_props.get("fqn", [])

274

if len(fqn) > 1:

275

return fqn[1] # First subdirectory

276

277

return "default"

278

279

# Use custom translator

280

from dagster_dbt import dbt_assets

281

282

@dbt_assets(

283

manifest="./target/manifest.json",

284

dagster_dbt_translator=CustomAssetKeyTranslator()

285

)

286

def my_dbt_assets(context, dbt):

287

yield from dbt.cli(["build"], context=context).stream()

288

```

289

290

### Environment-Specific Translation

291

292

```python

293

import os

294

from dagster_dbt import DagsterDbtTranslator

295

296

class EnvironmentDbtTranslator(DagsterDbtTranslator):

297

def __init__(self, environment: str = "dev"):

298

self.environment = environment

299

300

def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:

301

"""Prefix asset keys with environment."""

302

base_key = super().get_asset_key(dbt_resource_props)

303

return AssetKey([self.environment] + list(base_key.path))

304

305

def get_metadata(self, dbt_resource_props: dict) -> dict:

306

"""Add environment metadata."""

307

metadata = super().get_metadata(dbt_resource_props)

308

metadata["environment"] = self.environment

309

metadata["target_database"] = dbt_resource_props.get("database")

310

return metadata

311

312

def get_tags(self, dbt_resource_props: dict) -> dict:

313

"""Add environment and materialization tags."""

314

tags = super().get_tags(dbt_resource_props)

315

tags["environment"] = self.environment

316

317

config = dbt_resource_props.get("config", {})

318

if "materialized" in config:

319

tags["materialization"] = config["materialized"]

320

321

return tags

322

323

# Use environment-specific translator

324

environment = os.getenv("DAGSTER_ENV", "dev")

325

translator = EnvironmentDbtTranslator(environment=environment)

326

327

@dbt_assets(

328

manifest="./target/manifest.json",

329

dagster_dbt_translator=translator

330

)

331

def environment_dbt_assets(context, dbt):

332

yield from dbt.cli(["build"], context=context).stream()

333

```

334

335

### Freshness Policy Translation

336

337

```python

338

from dagster import FreshnessPolicy

339

from dagster_dbt import DagsterDbtTranslator

340

from datetime import timedelta

341

342

class FreshnessPolicyTranslator(DagsterDbtTranslator):

343

def get_freshness_policy(self, dbt_resource_props: dict) -> Optional[FreshnessPolicy]:

344

"""Set freshness policies based on dbt configuration."""

345

config = dbt_resource_props.get("config", {})

346

347

# Check for custom freshness configuration

348

if "dagster_freshness_policy" in config:

349

policy_config = config["dagster_freshness_policy"]

350

return FreshnessPolicy(

351

maximum_lag_minutes=policy_config.get("maximum_lag_minutes", 60),

352

cron_schedule=policy_config.get("cron_schedule")

353

)

354

355

# Set default policies based on materialization

356

materialized = config.get("materialized")

357

if materialized == "incremental":

358

return FreshnessPolicy(maximum_lag_minutes=30)

359

elif materialized == "snapshot":

360

return FreshnessPolicy(maximum_lag_minutes=60 * 24) # 24 hours

361

362

return None

363

364

def get_auto_materialize_policy(self, dbt_resource_props: dict) -> Optional[AutoMaterializePolicy]:

365

"""Set auto-materialize policies for specific models."""

366

tags = dbt_resource_props.get("tags", [])

367

368

if "auto_materialize" in tags:

369

return AutoMaterializePolicy.eager()

370

elif "lazy_materialize" in tags:

371

return AutoMaterializePolicy.lazy()

372

373

return None

374

375

@dbt_assets(

376

manifest="./target/manifest.json",

377

dagster_dbt_translator=FreshnessPolicyTranslator()

378

)

379

def fresh_dbt_assets(context, dbt):

380

yield from dbt.cli(["build"], context=context).stream()

381

```

382

383

### Multi-Tenant Translation

384

385

```python

386

from dagster import AssetKey

387

from dagster_dbt import DagsterDbtTranslator

388

389

class MultiTenantTranslator(DagsterDbtTranslator):

390

def __init__(self, tenant_id: str):

391

self.tenant_id = tenant_id

392

393

def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:

394

"""Create tenant-specific asset keys."""

395

base_key = super().get_asset_key(dbt_resource_props)

396

return AssetKey([self.tenant_id] + list(base_key.path))

397

398

def get_metadata(self, dbt_resource_props: dict) -> dict:

399

"""Add tenant metadata."""

400

metadata = super().get_metadata(dbt_resource_props)

401

metadata.update({

402

"tenant_id": self.tenant_id,

403

"tenant_database": f"{self.tenant_id}_{dbt_resource_props.get('database', 'default')}",

404

"tenant_schema": f"{self.tenant_id}_{dbt_resource_props.get('schema', 'default')}"

405

})

406

return metadata

407

408

def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:

409

"""Group by tenant and model type."""

410

base_group = super().get_group_name(dbt_resource_props)

411

return f"{self.tenant_id}_{base_group}" if base_group else self.tenant_id

412

413

# Create tenant-specific assets

414

def create_tenant_assets(tenant_id: str):

415

translator = MultiTenantTranslator(tenant_id=tenant_id)

416

417

@dbt_assets(

418

name=f"{tenant_id}_dbt_assets",

419

manifest=f"./tenants/{tenant_id}/target/manifest.json",

420

dagster_dbt_translator=translator

421

)

422

def tenant_dbt_assets(context, dbt):

423

yield from dbt.cli(

424

["build", "--target", tenant_id],

425

context=context

426

).stream()

427

428

return tenant_dbt_assets

429

430

# Create assets for multiple tenants

431

tenant_assets = [

432

create_tenant_assets("client_a"),

433

create_tenant_assets("client_b"),

434

create_tenant_assets("client_c")

435

]

436

```

437

438

## Advanced Translation Patterns

439

440

### Dependency Override

441

442

```python

443

from dagster_dbt import DagsterDbtTranslator

444

445

class DependencyOverrideTranslator(DagsterDbtTranslator):

446

def get_deps_asset_keys(self, dbt_resource_props: dict, manifest: dict) -> Iterable[AssetKey]:

447

"""Override dependencies for specific models."""

448

model_name = dbt_resource_props["name"]

449

450

# Custom dependency logic for specific models

451

if model_name == "critical_summary":

452

# Force dependency on external data source

453

yield AssetKey(["external", "data_feed"])

454

455

# Include normal dbt dependencies

456

yield from super().get_deps_asset_keys(dbt_resource_props, manifest)

457

```

458

459

### Code Reference Enhancement

460

461

```python

462

from dagster_dbt import DagsterDbtTranslator

463

464

class CodeReferenceTranslator(DagsterDbtTranslator):

465

def get_metadata(self, dbt_resource_props: dict) -> dict:

466

"""Enhanced code references and documentation."""

467

metadata = super().get_metadata(dbt_resource_props)

468

469

# Add enhanced code references

470

original_file_path = dbt_resource_props.get("original_file_path")

471

if original_file_path:

472

metadata["dbt_model_path"] = original_file_path

473

metadata["github_link"] = f"https://github.com/myorg/dbt-project/blob/main/{original_file_path}"

474

475

# Add documentation metadata

476

description = dbt_resource_props.get("description")

477

if description:

478

metadata["model_description"] = description

479

480

# Add column information

481

columns = dbt_resource_props.get("columns", {})

482

if columns:

483

metadata["column_count"] = len(columns)

484

metadata["documented_columns"] = len([

485

col for col in columns.values()

486

if col.get("description")

487

])

488

489

return metadata

490

```

491

492

## Type Definitions

493

494

```python { .api }

495

from dagster import AssetKey, AssetSpec, FreshnessPolicy, AutoMaterializePolicy

496

from typing import Mapping, Any, Optional, Iterable

497

```