or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md

dbt-cloud-v2.mddocs/

0

# dbt Cloud v2 Integration

1

2

Modern dbt Cloud integration with improved resource management, asset specifications, and polling sensors. This is the recommended approach for new dbt Cloud integrations, providing better composability and more flexible configuration options.

3

4

## Capabilities

5

6

### Credentials and Workspace Management

7

8

#### DbtCloudCredentials

9

10

Dataclass for dbt Cloud API authentication credentials.

11

12

```python { .api }

13

@dataclass

14

class DbtCloudCredentials(Resolvable):

15

"""

16

dbt Cloud API credentials.

17

18

Attributes:

19

- api_token: dbt Cloud API token for authentication

20

- account_id: dbt Cloud account ID

21

"""

22

23

api_token: str

24

account_id: int

25

```

26

27

#### DbtCloudWorkspace

28

29

Configurable resource representing a dbt Cloud workspace with project and environment context.

30

31

```python { .api }

32

@dataclass

33

class DbtCloudWorkspace(ConfigurableResource):

34

"""

35

dbt Cloud workspace resource for project and environment management.

36

37

Attributes:

38

- credentials: DbtCloudCredentials instance

39

- project_id: dbt Cloud project ID

40

- environment_id: dbt Cloud environment ID

41

"""

42

43

credentials: DbtCloudCredentials

44

project_id: int

45

environment_id: int

46

47

def run_job(

48

self,

49

job_id: int,

50

cause: str = "Triggered by Dagster",

51

steps_override: Optional[List[str]] = None,

52

schema_override: Optional[str] = None,

53

git_sha: Optional[str] = None,

54

**kwargs

55

) -> DbtCloudRun:

56

"""

57

Run a dbt Cloud job.

58

59

Parameters:

60

- job_id: dbt Cloud job ID to execute

61

- cause: Description of why the job was triggered

62

- steps_override: Custom dbt commands to run

63

- schema_override: Override target schema

64

- git_sha: Specific git SHA to run against

65

- **kwargs: Additional job trigger parameters

66

67

Returns:

68

DbtCloudRun object representing the triggered run

69

"""

70

71

def get_job(self, job_id: int) -> DbtCloudJob:

72

"""

73

Get dbt Cloud job details.

74

75

Parameters:

76

- job_id: dbt Cloud job ID

77

78

Returns:

79

DbtCloudJob object with job details

80

"""

81

82

def get_run(self, run_id: int) -> DbtCloudRun:

83

"""

84

Get dbt Cloud run details.

85

86

Parameters:

87

- run_id: dbt Cloud run ID

88

89

Returns:

90

DbtCloudRun object with run details

91

"""

92

93

def get_manifest(self, job_id: int) -> dict:

94

"""

95

Get dbt manifest for a job.

96

97

Parameters:

98

- job_id: dbt Cloud job ID

99

100

Returns:

101

Parsed manifest.json dictionary

102

"""

103

```

104

105

### Asset Loading and Specifications

106

107

#### load_dbt_cloud_asset_specs

108

109

Loads asset specifications from a dbt Cloud job without creating executable assets.

110

111

```python { .api }

112

def load_dbt_cloud_asset_specs(

113

workspace: DbtCloudWorkspace,

114

job_id: int,

115

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

116

**kwargs

117

) -> Sequence[AssetSpec]:

118

"""

119

Load asset specs from dbt Cloud job.

120

121

Parameters:

122

- workspace: DbtCloudWorkspace resource

123

- job_id: dbt Cloud job ID

124

- dagster_dbt_translator: Custom translator for asset mapping

125

- **kwargs: Additional parameters for spec generation

126

127

Returns:

128

Sequence of AssetSpec objects representing dbt models

129

"""

130

```

131

132

#### load_dbt_cloud_check_specs

133

134

Loads data quality check specifications from a dbt Cloud job.

135

136

```python { .api }

137

def load_dbt_cloud_check_specs(

138

workspace: DbtCloudWorkspace,

139

job_id: int,

140

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

141

**kwargs

142

) -> Sequence[AssetCheckSpec]:

143

"""

144

Load check specs from dbt Cloud job.

145

146

Parameters:

147

- workspace: DbtCloudWorkspace resource

148

- job_id: dbt Cloud job ID

149

- dagster_dbt_translator: Custom translator for check mapping

150

- **kwargs: Additional parameters for spec generation

151

152

Returns:

153

Sequence of AssetCheckSpec objects for dbt tests

154

"""

155

```

156

157

### Asset Decorators

158

159

#### dbt_cloud_assets

160

161

Modern decorator for creating assets from dbt Cloud jobs.

162

163

```python { .api }

164

def dbt_cloud_assets(

165

job_id: int,

166

workspace: DbtCloudWorkspace,

167

name: Optional[str] = None,

168

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

169

partitions_def: Optional[PartitionsDefinition] = None,

170

backfill_policy: Optional[BackfillPolicy] = None,

171

op_tags: Optional[Mapping[str, Any]] = None,

172

**kwargs

173

) -> Callable[..., AssetsDefinition]:

174

"""

175

Create Dagster assets from dbt Cloud job.

176

177

Parameters:

178

- job_id: dbt Cloud job ID

179

- workspace: DbtCloudWorkspace resource

180

- dagster_dbt_translator: Custom translator for asset mapping

181

- backfill_policy: Backfill policy for assets

182

- op_tags: Tags to apply to the underlying op

183

- **kwargs: Additional decorator parameters

184

185

Returns:

186

Decorated function that materializes dbt Cloud assets

187

"""

188

```

189

190

### Sensor Integration

191

192

#### build_dbt_cloud_polling_sensor

193

194

Creates a sensor that polls dbt Cloud for job completion and triggers downstream assets.

195

196

```python { .api }

197

def build_dbt_cloud_polling_sensor(

198

workspace: DbtCloudWorkspace,

199

job_id: int,

200

sensor_name: str,

201

dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,

202

minimum_interval_seconds: Optional[int] = None,

203

run_tags: Optional[Mapping[str, str]] = None,

204

) -> SensorDefinition:

205

"""

206

Build polling sensor for dbt Cloud job.

207

208

Parameters:

209

- job_id: dbt Cloud job ID to monitor

210

- workspace: DbtCloudWorkspace resource

211

- dagster_dbt_translator: Custom translator for asset mapping

212

- minimum_interval_seconds: Minimum time between sensor evaluations

213

- **kwargs: Additional sensor configuration

214

215

Returns:

216

SensorDefinition that monitors dbt Cloud job completion

217

"""

218

```

219

220

### Data Types

221

222

#### DbtCloudAccount

223

224

Represents a dbt Cloud account with metadata and configuration.

225

226

```python { .api }

227

class DbtCloudAccount:

228

"""

229

dbt Cloud account representation.

230

231

Attributes:

232

- id: Account ID

233

- name: Account name

234

- plan: Account plan type

235

- state: Account state

236

"""

237

238

id: int

239

name: str

240

plan: str

241

state: int

242

```

243

244

#### DbtCloudProject

245

246

Represents a dbt Cloud project within an account.

247

248

```python { .api }

249

class DbtCloudProject:

250

"""

251

dbt Cloud project representation.

252

253

Attributes:

254

- id: Project ID

255

- name: Project name

256

- account_id: Parent account ID

257

- repository_id: Associated repository ID

258

- state: Project state

259

"""

260

261

id: int

262

name: str

263

account_id: int

264

repository_id: Optional[int]

265

state: int

266

```

267

268

#### DbtCloudEnvironment

269

270

Represents a dbt Cloud environment configuration.

271

272

```python { .api }

273

class DbtCloudEnvironment:

274

"""

275

dbt Cloud environment representation.

276

277

Attributes:

278

- id: Environment ID

279

- name: Environment name

280

- project_id: Parent project ID

281

- type: Environment type (development/deployment)

282

- state: Environment state

283

"""

284

285

id: int

286

name: str

287

project_id: int

288

type: str

289

state: int

290

```

291

292

#### DbtCloudJob

293

294

Represents a dbt Cloud job configuration.

295

296

```python { .api }

297

class DbtCloudJob:

298

"""

299

dbt Cloud job representation.

300

301

Attributes:

302

- id: Job ID

303

- name: Job name

304

- project_id: Parent project ID

305

- environment_id: Target environment ID

306

- execute_steps: List of dbt commands to execute

307

- triggers: Job trigger configuration

308

- state: Job state

309

"""

310

311

id: int

312

name: str

313

project_id: int

314

environment_id: int

315

execute_steps: List[str]

316

triggers: dict

317

state: int

318

```

319

320

#### DbtCloudRun

321

322

Represents a dbt Cloud job run instance.

323

324

```python { .api }

325

class DbtCloudRun:

326

"""

327

dbt Cloud run representation.

328

329

Attributes:

330

- id: Run ID

331

- job_id: Parent job ID

332

- trigger: Run trigger information

333

- status: Current run status

334

- started_at: Run start timestamp

335

- finished_at: Run completion timestamp

336

- duration: Run duration in seconds

337

"""

338

339

id: int

340

job_id: int

341

trigger: dict

342

status: int

343

started_at: Optional[str]

344

finished_at: Optional[str]

345

duration: Optional[int]

346

347

@property

348

def status_humanized(self) -> str:

349

"""Get human-readable status string."""

350

351

@property

352

def is_success(self) -> bool:

353

"""Check if run completed successfully."""

354

355

@property

356

def is_complete(self) -> bool:

357

"""Check if run has completed (success or failure)."""

358

```

359

360

#### DbtCloudJobRunStatusType

361

362

Enumeration of dbt Cloud job run statuses.

363

364

```python { .api }

365

class DbtCloudJobRunStatusType(Enum):

366

"""

367

dbt Cloud job run status enumeration.

368

"""

369

QUEUED = 1

370

STARTING = 2

371

RUNNING = 3

372

SUCCESS = 10

373

ERROR = 20

374

CANCELLED = 30

375

```

376

377

## Usage Examples

378

379

### Basic dbt Cloud v2 Asset Creation

380

381

```python

382

from dagster import Definitions, AssetExecutionContext

383

from dagster_dbt.cloud_v2 import (

384

DbtCloudCredentials,

385

DbtCloudWorkspace,

386

dbt_cloud_assets

387

)

388

389

# Configure credentials and workspace

390

credentials = DbtCloudCredentials(

391

account_id=12345,

392

token="dbt_api_token_here",

393

access_url="https://cloud.getdbt.com"

394

)

395

workspace = DbtCloudWorkspace(

396

credentials=credentials,

397

project_id=67890

398

)

399

400

# Create assets from dbt Cloud job

401

@dbt_cloud_assets(

402

job_id=123,

403

workspace=workspace

404

)

405

def my_dbt_cloud_assets(context: AssetExecutionContext):

406

# Trigger and monitor dbt Cloud job

407

run = workspace.run_job(

408

job_id=123,

409

cause="Triggered by Dagster asset materialization"

410

)

411

412

# Poll for completion and yield events

413

yield from run.stream_events(context=context)

414

415

defs = Definitions(

416

assets=[my_dbt_cloud_assets],

417

resources={"workspace": workspace}

418

)

419

```

420

421

### Using Asset Specifications

422

423

```python

424

from dagster import Definitions

425

from dagster_dbt.cloud_v2 import (

426

DbtCloudCredentials,

427

DbtCloudWorkspace,

428

load_dbt_cloud_asset_specs

429

)

430

431

credentials = DbtCloudCredentials(

432

account_id=12345,

433

token="dbt_api_token_here",

434

access_url="https://cloud.getdbt.com"

435

)

436

workspace = DbtCloudWorkspace(

437

credentials=credentials,

438

project_id=67890

439

)

440

441

# Load asset specs without creating executable assets

442

asset_specs = load_dbt_cloud_asset_specs(

443

workspace=workspace,

444

job_id=123

445

)

446

447

# Use specs to create custom assets or for analysis

448

defs = Definitions(assets=asset_specs)

449

```

450

451

### Polling Sensor Integration

452

453

```python

454

from dagster import Definitions

455

from dagster_dbt.cloud_v2 import (

456

DbtCloudCredentials,

457

DbtCloudWorkspace,

458

build_dbt_cloud_polling_sensor

459

)

460

461

credentials = DbtCloudCredentials(

462

account_id=12345,

463

token="dbt_api_token_here",

464

access_url="https://cloud.getdbt.com"

465

)

466

workspace = DbtCloudWorkspace(

467

credentials=credentials,

468

project_id=67890

469

)

470

471

# Create sensor to monitor dbt Cloud job

472

dbt_cloud_sensor = build_dbt_cloud_polling_sensor(

473

job_id=123,

474

workspace=workspace,

475

minimum_interval_seconds=120

476

)

477

478

defs = Definitions(sensors=[dbt_cloud_sensor])

479

```

480

481

### Custom Translation

482

483

```python

484

from dagster import AssetKey

485

from dagster_dbt.cloud_v2 import DbtCloudWorkspace, dbt_cloud_assets

486

from dagster_dbt import DagsterDbtTranslator

487

488

class CustomDbtCloudTranslator(DagsterDbtTranslator):

489

def get_asset_key(self, dbt_resource_props: dict) -> AssetKey:

490

return AssetKey([

491

"dbt_cloud",

492

dbt_resource_props["database"],

493

dbt_resource_props["schema"],

494

dbt_resource_props["name"]

495

])

496

497

def get_group_name(self, dbt_resource_props: dict) -> Optional[str]:

498

return dbt_resource_props.get("config", {}).get("group", "default")

499

500

@dbt_cloud_assets(

501

job_id=123,

502

workspace=workspace,

503

dagster_dbt_translator=CustomDbtCloudTranslator()

504

)

505

def my_custom_assets(context):

506

yield from workspace.run_job(job_id=123).stream_events(context=context)

507

```