or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md

dbt-cloud-legacy.mddocs/

0

# dbt Cloud Integration (Legacy)

1

2

Original dbt Cloud integration providing job execution, asset loading, and operations. This module supports the legacy dbt Cloud API integration patterns and is maintained for backward compatibility.

3

4

## Capabilities

5

6

### Cloud Resources

7

8

#### dbt_cloud_resource

9

10

Factory function for creating a dbt Cloud resource with API authentication.

11

12

```python { .api }

13

def dbt_cloud_resource(

14

api_token: str,

15

account_id: int,

16

disable_schedule_on_trigger: bool = True

17

) -> ResourceDefinition:

18

"""

19

Create a dbt Cloud resource.

20

21

Parameters:

22

- api_token: dbt Cloud API token for authentication

23

- account_id: dbt Cloud account ID

24

- disable_schedule_on_trigger: Whether to disable dbt Cloud schedules when triggering jobs

25

26

Returns:

27

ResourceDefinition for dbt Cloud integration

28

"""

29

```

30

31

#### DbtCloudResource

32

33

Legacy dbt Cloud resource class for job execution and management.

34

35

```python { .api }

36

class DbtCloudResource:

37

"""

38

Legacy dbt Cloud resource for job execution.

39

40

Attributes:

41

- api_token: dbt Cloud API token

42

- account_id: dbt Cloud account ID

43

- disable_schedule_on_trigger: Schedule disable behavior

44

"""

45

46

def run_job_and_poll(

47

self,

48

job_id: int,

49

cause: str = "Triggered by Dagster",

50

steps_override: Optional[List[str]] = None,

51

schema_override: Optional[str] = None,

52

poll_interval: int = 10,

53

poll_timeout: Optional[int] = None

54

) -> DbtCloudOutput:

55

"""

56

Run a dbt Cloud job and poll for completion.

57

58

Parameters:

59

- job_id: dbt Cloud job ID to execute

60

- cause: Description of why the job was triggered

61

- steps_override: Custom dbt commands to run instead of job default

62

- schema_override: Override the target schema

63

- poll_interval: Seconds between polling attempts

64

- poll_timeout: Maximum seconds to wait for completion

65

66

Returns:

67

DbtCloudOutput containing run results and metadata

68

"""

69

70

def get_job(self, job_id: int) -> dict:

71

"""

72

Get dbt Cloud job details.

73

74

Parameters:

75

- job_id: dbt Cloud job ID

76

77

Returns:

78

Job details dictionary from dbt Cloud API

79

"""

80

81

def get_run(self, run_id: int) -> dict:

82

"""

83

Get dbt Cloud run details.

84

85

Parameters:

86

- run_id: dbt Cloud run ID

87

88

Returns:

89

Run details dictionary from dbt Cloud API

90

"""

91

```

92

93

#### DbtCloudClientResource

94

95

Configurable resource implementation for dbt Cloud API client.

96

97

```python { .api }

98

class DbtCloudClientResource(ConfigurableResource):

99

"""

100

Configurable dbt Cloud client resource.

101

102

Attributes:

103

- api_token: dbt Cloud API token for authentication

104

- account_id: dbt Cloud account ID

105

- disable_schedule_on_trigger: Whether to disable schedules on trigger

106

"""

107

108

api_token: str

109

account_id: int

110

disable_schedule_on_trigger: bool = True

111

112

def run_job_and_poll(

113

self,

114

job_id: int,

115

**kwargs

116

) -> DbtCloudOutput:

117

"""

118

Run and poll dbt Cloud job.

119

120

Parameters:

121

- job_id: dbt Cloud job ID

122

- **kwargs: Additional job execution parameters

123

124

Returns:

125

DbtCloudOutput with execution results

126

"""

127

```

128

129

### Asset Loading

130

131

#### load_assets_from_dbt_cloud_job

132

133

Creates Dagster assets from a dbt Cloud job configuration.

134

135

```python { .api }

136

def load_assets_from_dbt_cloud_job(

137

dbt_cloud: ResourceDefinition,

138

job_id: int,

139

node_info_to_asset_key: Callable[[dict], AssetKey] = default_node_info_to_asset_key,

140

node_info_to_group_fn: Callable[[dict], Optional[str]] = lambda _: None,

141

node_info_to_freshness_policy_fn: Callable[[dict], Optional[FreshnessPolicy]] = lambda _: None,

142

partitions_def: Optional[PartitionsDefinition] = None,

143

partition_key_to_vars_fn: Optional[Callable[[str], dict]] = None,

144

op_tags: Optional[dict] = None

145

) -> Sequence[AssetsDefinition]:

146

"""

147

Load assets from dbt Cloud job.

148

149

Parameters:

150

- dbt_cloud: dbt Cloud resource definition

151

- job_id: dbt Cloud job ID to load assets from

152

- node_info_to_asset_key: Function to generate asset keys from dbt nodes

153

- node_info_to_group_fn: Function to determine asset groups

154

- node_info_to_freshness_policy_fn: Function to set freshness policies

155

- partitions_def: Partitioning definition for assets

156

- partition_key_to_vars_fn: Function to map partition keys to dbt vars

157

- op_tags: Tags to apply to generated ops

158

159

Returns:

160

Sequence of AssetsDefinition objects

161

"""

162

```

163

164

### Operations

165

166

#### dbt_cloud_run_op

167

168

Operation for executing dbt Cloud jobs within Dagster pipelines.

169

170

```python { .api }

171

def dbt_cloud_run_op(

172

context: OpExecutionContext,

173

dbt_cloud: DbtCloudResource

174

) -> DbtCloudOutput:

175

"""

176

Execute dbt Cloud job operation.

177

178

Parameters:

179

- context: Dagster operation execution context

180

- dbt_cloud: dbt Cloud resource instance

181

182

Returns:

183

DbtCloudOutput containing job execution results

184

"""

185

```

186

187

### Output Types

188

189

#### DbtCloudOutput

190

191

Output type for dbt Cloud operations containing run results and metadata.

192

193

```python { .api }

194

class DbtCloudOutput:

195

"""

196

Output from dbt Cloud job execution.

197

198

Attributes:

199

- run_details: Complete run details from dbt Cloud API

200

- is_successful: Whether the run completed successfully

201

- run_id: dbt Cloud run ID

202

- job_id: dbt Cloud job ID

203

"""

204

205

run_details: dict

206

is_successful: bool

207

208

@property

209

def run_id(self) -> int:

210

"""Get the dbt Cloud run ID."""

211

212

@property

213

def job_id(self) -> int:

214

"""Get the dbt Cloud job ID."""

215

216

@property

217

def run_status(self) -> DbtCloudRunStatus:

218

"""Get the run status enum."""

219

220

def get_artifact(self, artifact_name: str) -> Optional[dict]:

221

"""

222

Get a specific artifact from the run.

223

224

Parameters:

225

- artifact_name: Name of the artifact to retrieve

226

227

Returns:

228

Artifact dictionary or None if not found

229

"""

230

```

231

232

#### DbtCloudRunStatus

233

234

Enumeration of possible dbt Cloud run statuses.

235

236

```python { .api }

237

class DbtCloudRunStatus(Enum):

238

"""

239

dbt Cloud run status enumeration.

240

"""

241

QUEUED = 1

242

STARTING = 2

243

RUNNING = 3

244

SUCCESS = 10

245

ERROR = 20

246

CANCELLED = 30

247

```

248

249

## Usage Examples

250

251

### Basic dbt Cloud Job Execution

252

253

```python

254

from dagster import job, op, Definitions

255

from dagster_dbt.cloud import dbt_cloud_resource, dbt_cloud_run_op

256

257

# Create resource

258

dbt_cloud = dbt_cloud_resource(

259

api_token="dbt_api_token_here",

260

account_id=12345

261

)

262

263

@op(required_resource_keys={"dbt_cloud"})

264

def run_dbt_cloud_job(context):

265

dbt_cloud = context.resources.dbt_cloud

266

return dbt_cloud.run_job_and_poll(

267

job_id=67890,

268

cause="Triggered by Dagster pipeline"

269

)

270

271

@job(resource_defs={"dbt_cloud": dbt_cloud})

272

def dbt_cloud_job():

273

run_dbt_cloud_job()

274

275

defs = Definitions(jobs=[dbt_cloud_job])

276

```

277

278

### Loading Assets from dbt Cloud Job

279

280

```python

281

from dagster import Definitions

282

from dagster_dbt.cloud import dbt_cloud_resource, load_assets_from_dbt_cloud_job

283

284

dbt_cloud = dbt_cloud_resource(

285

api_token="dbt_api_token_here",

286

account_id=12345

287

)

288

289

# Load assets from dbt Cloud job

290

dbt_cloud_assets = load_assets_from_dbt_cloud_job(

291

dbt_cloud=dbt_cloud,

292

job_id=67890

293

)

294

295

defs = Definitions(

296

assets=dbt_cloud_assets,

297

resources={"dbt_cloud": dbt_cloud}

298

)

299

```

300

301

### Custom Asset Key Mapping

302

303

```python

304

from dagster import AssetKey

305

from dagster_dbt.cloud import load_assets_from_dbt_cloud_job, dbt_cloud_resource

306

307

def custom_asset_key_fn(node_info: dict) -> AssetKey:

308

"""Custom function to generate asset keys from dbt nodes."""

309

return AssetKey([

310

"dbt_cloud",

311

node_info["database"],

312

node_info["schema"],

313

node_info["name"]

314

])

315

316

def custom_group_fn(node_info: dict) -> str:

317

"""Custom function to determine asset groups."""

318

return node_info.get("config", {}).get("materialized", "default")

319

320

dbt_cloud = dbt_cloud_resource(

321

api_token="dbt_api_token_here",

322

account_id=12345

323

)

324

325

assets = load_assets_from_dbt_cloud_job(

326

dbt_cloud=dbt_cloud,

327

job_id=67890,

328

node_info_to_asset_key=custom_asset_key_fn,

329

node_info_to_group_fn=custom_group_fn

330

)

331

```

332

333

## Migration to dbt Cloud v2

334

335

The legacy dbt Cloud integration is maintained for backward compatibility, but new projects should use the dbt Cloud v2 integration for improved features:

336

337

- Better resource management with `DbtCloudCredentials` and `DbtCloudWorkspace`

338

- Asset specifications with `load_dbt_cloud_asset_specs`

339

- Modern `@dbt_cloud_assets` decorator

340

- Polling sensors with `build_dbt_cloud_polling_sensor`

341

342

See [dbt Cloud v2](./dbt-cloud-v2.md) for the recommended modern integration.