or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md

index.mddocs/

0

# Dagster dbt Integration

1

2

A comprehensive integration library that enables orchestrating dbt models as Dagster assets with full lineage tracking, metadata propagation, and observability features. This package bridges the gap between dbt's SQL-based transformation capabilities and Dagster's data orchestration framework, allowing data engineers to leverage both tools' strengths in modern data stacks.

3

4

## Package Information

5

6

- **Package Name**: dagster-dbt

7

- **Language**: Python

8

- **Installation**: `pip install dagster-dbt`

9

10

## Core Imports

11

12

```python

13

from dagster_dbt import dbt_assets, DbtCliResource

14

```

15

16

For dbt Cloud integration:

17

18

```python

19

from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_job

20

```

21

22

For dbt Cloud v2 (recommended):

23

24

```python

25

from dagster_dbt.cloud_v2 import DbtCloudCredentials, DbtCloudWorkspace, dbt_cloud_assets

26

```

27

28

## Basic Usage

29

30

### Local dbt Project with Assets

31

32

```python

33

from dagster import AssetExecutionContext, Definitions

34

from dagster_dbt import DbtCliResource, dbt_assets

35

36

# Define dbt CLI resource

37

dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")

38

39

# Create assets from dbt models

40

@dbt_assets(manifest="./my_dbt_project/target/manifest.json")

41

def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

42

yield from dbt.cli(["build"], context=context).stream()

43

44

# Define the Dagster code location

45

defs = Definitions(

46

assets=[my_dbt_assets],

47

resources={"dbt": dbt_cli_resource}

48

)

49

```

50

51

### dbt Cloud Integration

52

53

```python

54

from dagster import Definitions

55

from dagster_dbt.cloud_v2 import (

56

DbtCloudCredentials,

57

DbtCloudWorkspace,

58

dbt_cloud_assets

59

)

60

61

# Configure dbt Cloud credentials

62

credentials = DbtCloudCredentials(

63

account_id=12345,

64

token="your_token",

65

access_url="https://cloud.getdbt.com"

66

)

67

workspace = DbtCloudWorkspace(

68

credentials=credentials,

69

project_id=67890

70

)

71

72

# Create assets from dbt Cloud job

73

@dbt_cloud_assets(

74

job_id=123,

75

workspace=workspace

76

)

77

def my_dbt_cloud_assets(context):

78

yield from workspace.run_job(job_id=123, context=context)

79

80

defs = Definitions(assets=[my_dbt_cloud_assets])

81

```

82

83

## Architecture

84

85

The dagster-dbt integration follows a layered architecture:

86

87

- **Asset Layer**: `@dbt_assets` decorators convert dbt models into Dagster assets

88

- **Resource Layer**: `DbtCliResource` and Cloud resources handle execution and communication

89

- **Translation Layer**: `DagsterDbtTranslator` maps dbt metadata to Dagster concepts

90

- **Project Layer**: `DbtProject` manages dbt project structure and manifest parsing

91

- **Event Layer**: CLI event handlers provide execution observability

92

93

This design enables seamless integration while maintaining separation of concerns between dbt's transformation logic and Dagster's orchestration capabilities.

94

95

## Capabilities

96

97

### Asset Creation and Management

98

99

Core functionality for creating Dagster assets from dbt models, including decorators, asset specifications, and selection utilities.

100

101

```python { .api }

102

def dbt_assets(

103

manifest: DbtManifestParam,

104

select: str = "fqn:*",

105

exclude: Optional[str] = "",

106

selector: Optional[str] = "",

107

name: Optional[str] = None,

108

io_manager_key: Optional[str] = None,

109

partitions_def: Optional[PartitionsDefinition] = None,

110

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

111

backfill_policy: Optional[BackfillPolicy] = None,

112

op_tags: Optional[Mapping[str, Any]] = None,

113

required_resource_keys: Optional[set[str]] = None,

114

project: Optional[DbtProject] = None,

115

retry_policy: Optional[RetryPolicy] = None,

116

pool: Optional[str] = None,

117

) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...

118

119

def build_dbt_asset_specs(

120

manifest: DbtManifestParam,

121

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

122

select: str = "fqn:*",

123

exclude: Optional[str] = "",

124

selector: Optional[str] = "",

125

project: Optional[DbtProject] = None,

126

) -> Sequence[AssetSpec]: ...

127

128

def build_dbt_asset_selection(

129

dbt_assets: Sequence[AssetsDefinition],

130

dbt_select: str = "fqn:*",

131

dbt_exclude: Optional[str] = "",

132

dbt_selector: Optional[str] = "",

133

) -> AssetSelection: ...

134

```

135

136

[Asset Creation](./asset-creation.md)

137

138

### CLI Resource and Execution

139

140

Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management.

141

142

```python { .api }

143

class DbtCliResource(ConfigurableResource):

144

project_dir: str

145

global_config_flags: list[str] = []

146

profiles_dir: Optional[str] = None

147

profile: Optional[str] = None

148

target: Optional[str] = None

149

dbt_executable: str = "dbt"

150

state_path: Optional[str] = None

151

152

def cli(

153

self,

154

args: Sequence[str],

155

raise_on_error: bool = True,

156

manifest: Optional[DbtManifestParam] = None,

157

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

158

context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,

159

target_path: Optional[Path] = None,

160

) -> DbtCliInvocation: ...

161

```

162

163

[CLI Resource](./cli-resource.md)

164

165

### dbt Cloud Integration (Legacy)

166

167

Original dbt Cloud integration providing job execution, asset loading, and operations.

168

169

```python { .api }

170

def dbt_cloud_resource(api_token: str, account_id: int): ...

171

def load_assets_from_dbt_cloud_job(dbt_cloud: ResourceDefinition, job_id: int): ...

172

def dbt_cloud_run_op(context: OpExecutionContext, dbt_cloud: DbtCloudResource): ...

173

```

174

175

[dbt Cloud Legacy](./dbt-cloud-legacy.md)

176

177

### dbt Cloud v2 Integration

178

179

Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors.

180

181

```python { .api }

182

@dataclass

183

class DbtCloudCredentials(Resolvable):

184

api_token: str

185

account_id: int

186

187

@dataclass

188

class DbtCloudWorkspace(ConfigurableResource):

189

credentials: DbtCloudCredentials

190

project_id: int

191

environment_id: int

192

193

def dbt_cloud_assets(

194

job_id: int,

195

workspace: DbtCloudWorkspace,

196

name: Optional[str] = None,

197

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

198

partitions_def: Optional[PartitionsDefinition] = None,

199

) -> Callable[..., AssetsDefinition]: ...

200

```

201

202

[dbt Cloud v2](./dbt-cloud-v2.md)

203

204

### Translation System

205

206

Customizable mapping between dbt resources and Dagster assets, including metadata, groups, and asset keys.

207

208

```python { .api }

209

class DagsterDbtTranslator:

210

def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): ...

211

212

def get_asset_spec(

213

self,

214

manifest: Mapping[str, Any],

215

unique_id: str,

216

project: Optional[DbtProject]

217

) -> AssetSpec: ...

218

219

def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: ...

220

def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...

221

```

222

223

[Translation System](./translation-system.md)

224

225

### Project Management

226

227

dbt project handling, manifest parsing, and project preparation for integration with Dagster.

228

229

```python { .api }

230

@record_custom

231

class DbtProject(IHaveNew):

232

name: str

233

project_dir: Path

234

target_path: Path

235

profiles_dir: Path

236

profile: Optional[str]

237

target: Optional[str]

238

manifest_path: Path

239

packaged_project_dir: Optional[Path]

240

state_path: Optional[Path]

241

has_uninstalled_deps: bool

242

preparer: DbtProjectPreparer

243

244

def prepare_if_dev(self) -> None: ...

245

246

class DbtProjectPreparer:

247

def prepare(self) -> None: ...

248

249

class DagsterDbtProjectPreparer(DbtProjectPreparer):

250

pass

251

```

252

253

[Project Management](./project-management.md)

254

255

### Component System

256

257

Integration with Dagster's component system for declarative dbt project configuration.

258

259

```python { .api }

260

class DbtProjectComponent(Component):

261

dbt_project_path: str

262

dbt_profiles_path: Optional[str] = None

263

```

264

265

[Component System](./component-system.md)

266

267

### Utilities and Helpers

268

269

Utility functions for asset selection, naming conventions, metadata handling, and manifest operations.

270

271

```python { .api }

272

def get_asset_key_for_model(dbt_assets: Sequence[AssetsDefinition], model_name: str) -> AssetKey: ...

273

def get_asset_key_for_source(dbt_assets: Sequence[AssetsDefinition], source_name: str) -> AssetKey: ...

274

def get_asset_keys_by_output_name_for_source(

275

dbt_assets: Sequence[AssetsDefinition],

276

source_name: str

277

) -> Mapping[str, AssetKey]: ...

278

279

def default_metadata_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: ...

280

def default_group_from_dbt_resource_props(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...

281

def group_from_dbt_resource_props_fallback_to_directory(dbt_resource_props: Mapping[str, Any]) -> Optional[str]: ...

282

```

283

284

[Utilities](./utilities.md)

285

286

### Error Handling

287

288

Comprehensive exception hierarchy for dbt integration error handling and debugging.

289

290

```python { .api }

291

class DagsterDbtError(Failure, ABC): ...

292

class DagsterDbtCliRuntimeError(DagsterDbtError, ABC): ...

293

class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError, DagsterInvariantViolationError): ...

294

class DagsterDbtProjectNotFoundError(DagsterDbtError): ...

295

class DagsterDbtProfilesDirectoryNotFoundError(DagsterDbtError): ...

296

class DagsterDbtManifestNotFoundError(DagsterDbtError): ...

297

class DagsterDbtProjectYmlFileNotFoundError(DagsterDbtError): ...

298

```

299

300

[Error Handling](./error-handling.md)

301

302

### Freshness Checks

303

304

Build asset checks for dbt source freshness validation.

305

306

```python { .api }

307

def build_freshness_checks_from_dbt_assets(

308

dbt_assets: Sequence[AssetsDefinition]

309

) -> Sequence[AssetChecksDefinition]: ...

310

```

311

312

[Freshness Checks](./freshness-checks.md)

313

314

### Asset Selection

315

316

Advanced asset selection capabilities using dbt manifest information.

317

318

```python { .api }

319

class DbtManifestAssetSelection(AssetSelection):

320

manifest: Mapping[str, Any]

321

select: str

322

exclude: str

323

selector: str

324

dagster_dbt_translator: DagsterDbtTranslator

325

project: Optional[DbtProject]

326

```

327

328

## Types

329

330

```python { .api }

331

# Type aliases for common dbt-related types

332

DbtManifestParam = Union[Mapping[str, Any], str, Path]

333

334

# CLI Event Message classes

335

class DbtCliEventMessage(ABC):

336

raw_event: dict[str, Any]

337

338

def to_default_asset_events(

339

self,

340

manifest: DbtManifestParam,

341

dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),

342

context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None,

343

target_path: Optional[Path] = None,

344

project: Optional[DbtProject] = None,

345

) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult, AssetCheckEvaluation]]: ...

346

347

class DbtCliInvocation:

348

process: subprocess.Popen

349

manifest: Mapping[str, Any]

350

dagster_dbt_translator: DagsterDbtTranslator

351

project_dir: Path

352

target_path: Path

353

raise_on_error: bool

354

355

def wait(self) -> "DbtCliInvocation": ...

356

def is_successful(self) -> bool: ...

357

def stream(self) -> Iterator[DbtCliEventMessage]: ...

358

def get_artifact(self, artifact: Literal["manifest.json", "catalog.json", "run_results.json", "sources.json"]) -> dict[str, Any]: ...

359

```

360

361

## Constants

362

363

### Metadata Keys

364

- `DAGSTER_DBT_MANIFEST_METADATA_KEY` = "dagster_dbt/manifest"

365

- `DAGSTER_DBT_TRANSLATOR_METADATA_KEY` = "dagster_dbt/dagster_dbt_translator"

366

- `DAGSTER_DBT_PROJECT_METADATA_KEY` = "dagster_dbt/project"

367

- `DAGSTER_DBT_SELECT_METADATA_KEY` = "dagster_dbt/select"

368

- `DAGSTER_DBT_EXCLUDE_METADATA_KEY` = "dagster_dbt/exclude"

369

- `DAGSTER_DBT_SELECTOR_METADATA_KEY` = "dagster_dbt/selector"

370

- `DAGSTER_DBT_UNIQUE_ID_METADATA_KEY` = "dagster_dbt/unique_id"

371

372

### Selection Defaults

373

- `DBT_DEFAULT_SELECT` = "fqn:*"

374

- `DBT_DEFAULT_EXCLUDE` = ""

375

- `DBT_DEFAULT_SELECTOR` = ""

376

377

### Environment Variables

378

- `DBT_INDIRECT_SELECTION_ENV` = "DBT_INDIRECT_SELECTION"

379

- `DBT_EMPTY_INDIRECT_SELECTION` = "empty"

380

381

### Asset Types

382

- `ASSET_RESOURCE_TYPES` = ["model", "seed", "snapshot"]

383

384

### Version Information

385

- `__version__` = "0.27.9"

386

- `DBT_CORE_VERSION_UPPER_BOUND` = "1.11"