or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdopenlineage.mdoperators.mdsensors.mdtriggers.md

operators.mddocs/

0

# Job Execution Operators

1

2

The dbt Cloud provider includes three main operators for job execution and management. These operators provide high-level abstractions for common dbt Cloud operations within Airflow DAGs, handling job execution scheduling, artifact retrieval, and job discovery.

3

4

## Capabilities

5

6

### Job Execution Operator

7

8

The `DbtCloudRunJobOperator` executes dbt Cloud jobs with comprehensive configuration options and monitoring capabilities.

9

10

```python { .api }

11

class DbtCloudRunJobOperator:

12

def __init__(

13

self,

14

dbt_cloud_conn_id: str = "dbt_cloud_default",

15

job_id: int | None = None,

16

project_name: str | None = None,

17

environment_name: str | None = None,

18

job_name: str | None = None,

19

account_id: int | None = None,

20

trigger_reason: str | None = None,

21

steps_override: list[str] | None = None,

22

schema_override: str | None = None,

23

wait_for_termination: bool = True,

24

timeout: int = 60 * 60 * 24 * 7,

25

check_interval: int = 60,

26

additional_run_config: dict[str, Any] | None = None,

27

reuse_existing_run: bool = False,

28

retry_from_failure: bool = False,

29

deferrable: bool = False,

30

**kwargs

31

):

32

"""

33

Execute a dbt Cloud job.

34

35

Args:

36

dbt_cloud_conn_id: Airflow connection ID for dbt Cloud

37

job_id: dbt Cloud job ID to execute (mutually exclusive with name-based lookup)

38

project_name: Project name for job lookup (requires environment_name and job_name)

39

environment_name: Environment name for job lookup (requires project_name and job_name)

40

job_name: Job name for job lookup (requires project_name and environment_name)

41

account_id: dbt Cloud account ID (defaults to connection default)

42

trigger_reason: Reason for triggering the job (max 255 characters)

43

steps_override: Custom list of dbt commands to run

44

schema_override: Override default schema/dataset name

45

wait_for_termination: Whether to wait for job completion

46

timeout: Maximum time to wait for job completion (seconds)

47

check_interval: Time between status checks (seconds)

48

additional_run_config: Additional configuration parameters

49

reuse_existing_run: Use existing run if job is already running

50

retry_from_failure: Resume from last failure point if retrying

51

deferrable: Use async execution mode

52

"""

53

54

def execute(self, context: Context) -> int:

55

"""

56

Execute the dbt Cloud job.

57

58

Args:

59

context: Airflow task execution context

60

61

Returns:

62

int: dbt Cloud job run ID

63

64

Raises:

65

DbtCloudJobRunException: If job execution fails

66

"""

67

68

def execute_complete(self, context: Context, event: dict[str, Any]) -> int:

69

"""

70

Complete execution for deferrable tasks.

71

72

Args:

73

context: Airflow task execution context

74

event: Trigger event containing job status

75

76

Returns:

77

int: dbt Cloud job run ID

78

"""

79

80

def on_kill(self) -> None:

81

"""Cancel the running dbt Cloud job when task is killed."""

82

83

def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:

84

"""

85

Generate OpenLineage metadata facets for data lineage tracking.

86

87

Args:

88

task_instance: Airflow task instance

89

90

Returns:

91

OperatorLineage: OpenLineage facets for lineage tracking

92

"""

93

```

94

95

### Artifact Retrieval Operator

96

97

The `DbtCloudGetJobRunArtifactOperator` downloads artifacts from completed dbt Cloud job runs.

98

99

```python { .api }

100

class DbtCloudGetJobRunArtifactOperator:

101

def __init__(

102

self,

103

dbt_cloud_conn_id: str = "dbt_cloud_default",

104

run_id: int,

105

path: str,

106

account_id: int | None = None,

107

step: int | None = None,

108

output_file_name: str | None = None,

109

**kwargs

110

):

111

"""

112

Download artifacts from a dbt Cloud job run.

113

114

Args:

115

dbt_cloud_conn_id: Airflow connection ID for dbt Cloud

116

run_id: dbt Cloud job run ID to retrieve artifacts from

117

path: Path to the artifact (e.g., 'manifest.json', 'run_results.json', 'catalog.json')

118

account_id: dbt Cloud account ID (defaults to connection default)

119

step: Specific step number to retrieve artifact from

120

output_file_name: Local filename to save artifact (defaults to artifact path basename)

121

"""

122

123

def execute(self, context: Context) -> str:

124

"""

125

Download the specified artifact.

126

127

Args:

128

context: Airflow task execution context

129

130

Returns:

131

str: Local path to the downloaded artifact file

132

"""

133

```

134

135

### Job Listing Operator

136

137

The `DbtCloudListJobsOperator` retrieves information about jobs in a dbt Cloud account or project.

138

139

```python { .api }

140

class DbtCloudListJobsOperator:

141

def __init__(

142

self,

143

dbt_cloud_conn_id: str = "dbt_cloud_default",

144

account_id: int | None = None,

145

project_id: int | None = None,

146

order_by: str | None = None,

147

**kwargs

148

):

149

"""

150

List jobs in a dbt Cloud account or project.

151

152

Args:

153

dbt_cloud_conn_id: Airflow connection ID for dbt Cloud

154

account_id: dbt Cloud account ID (defaults to connection default)

155

project_id: Filter jobs by project ID

156

order_by: Field to order results by (e.g., 'name', 'created_at')

157

"""

158

159

def execute(self, context: Context) -> list:

160

"""

161

Retrieve the list of jobs.

162

163

Args:

164

context: Airflow task execution context

165

166

Returns:

167

list: List of dbt Cloud job IDs (integers)

168

"""

169

```

170

171

## Usage Examples

172

173

### Basic Job Execution

174

175

```python

176

from airflow import DAG

177

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

178

from datetime import datetime, timedelta

179

180

dag = DAG(

181

'dbt_transform_dag',

182

start_date=datetime(2024, 1, 1),

183

schedule_interval='@daily',

184

)

185

186

# Execute dbt job by ID

187

run_dbt_models = DbtCloudRunJobOperator(

188

task_id='run_dbt_models',

189

dbt_cloud_conn_id='dbt_cloud_prod',

190

job_id=12345,

191

trigger_reason='Daily scheduled transformation',

192

timeout=3600,

193

dag=dag,

194

)

195

```

196

197

### Job Execution with Name-based Lookup

198

199

```python

200

# Execute job by project/environment/job names

201

run_staging_models = DbtCloudRunJobOperator(

202

task_id='run_staging_models',

203

project_name='analytics_project',

204

environment_name='production',

205

job_name='staging_models_daily',

206

trigger_reason='Staging data refresh',

207

steps_override=['dbt run --models tag:staging', 'dbt test --models tag:staging'],

208

dag=dag,

209

)

210

```

211

212

### Advanced Job Configuration

213

214

```python

215

# Execute with advanced configuration

216

run_full_pipeline = DbtCloudRunJobOperator(

217

task_id='run_full_pipeline',

218

job_id=54321,

219

schema_override='analytics_{{ ds_nodash }}', # Dynamic schema based on execution date

220

additional_run_config={

221

'threads': 8,

222

'target': 'prod',

223

'vars': {

224

'start_date': '{{ ds }}',

225

'end_date': '{{ next_ds }}'

226

}

227

},

228

retry_from_failure=True,

229

reuse_existing_run=False,

230

dag=dag,

231

)

232

```

233

234

### Deferrable Job Execution

235

236

```python

237

# Use deferrable execution for resource efficiency

238

run_long_job = DbtCloudRunJobOperator(

239

task_id='run_long_dbt_job',

240

job_id=99999,

241

deferrable=True, # Enable async execution

242

timeout=14400, # 4 hours

243

check_interval=300, # Check every 5 minutes

244

dag=dag,

245

)

246

```

247

248

### Artifact Retrieval

249

250

```python

251

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator

252

253

# Download job artifacts after completion

254

download_manifest = DbtCloudGetJobRunArtifactOperator(

255

task_id='download_manifest',

256

run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",

257

path='manifest.json',

258

output_file_name='dbt_manifest_{{ ds }}.json',

259

dag=dag,

260

)

261

262

download_run_results = DbtCloudGetJobRunArtifactOperator(

263

task_id='download_run_results',

264

run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",

265

path='run_results.json',

266

dag=dag,

267

)

268

269

# Set task dependencies

270

run_dbt_models >> [download_manifest, download_run_results]

271

```

272

273

### Job Discovery

274

275

```python

276

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudListJobsOperator

277

278

# List all jobs in an account

279

list_all_jobs = DbtCloudListJobsOperator(

280

task_id='list_all_jobs',

281

account_id=12345,

282

order_by='name',

283

dag=dag,

284

)

285

286

# List jobs for specific project

287

list_project_jobs = DbtCloudListJobsOperator(

288

task_id='list_project_jobs',

289

project_id=67890,

290

dag=dag,

291

)

292

```

293

294

### Error Handling and Monitoring

295

296

```python

297

from airflow.operators.python import PythonOperator

298

299

def handle_dbt_failure(**context):

300

"""Custom error handling for dbt job failures."""

301

run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_models')

302

if run_id:

303

print(f"dbt job run {run_id} failed - triggering cleanup")

304

# Add custom failure handling logic here

305

306

# Add failure callback

307

run_dbt_models = DbtCloudRunJobOperator(

308

task_id='run_dbt_models',

309

job_id=12345,

310

on_failure_callback=handle_dbt_failure,

311

dag=dag,

312

)

313

```

314

315

### Data Lineage Integration

316

317

```python

318

# When using with OpenLineage provider for data lineage

319

# The operator automatically generates lineage metadata

320

run_with_lineage = DbtCloudRunJobOperator(

321

task_id='run_with_lineage',

322

job_id=12345,

323

# OpenLineage integration happens automatically

324

# when apache-airflow-providers-openlineage >= 2.3.0 is installed

325

dag=dag,

326

)

327

```

328

329

## Template Fields

330

331

All operators support Airflow templating for dynamic values:

332

333

### DbtCloudRunJobOperator Template Fields

334

- `dbt_cloud_conn_id`

335

- `job_id`

336

- `project_name`

337

- `environment_name`

338

- `job_name`

339

- `account_id`

340

- `trigger_reason`

341

- `steps_override`

342

- `schema_override`

343

- `additional_run_config`

344

345

### DbtCloudGetJobRunArtifactOperator Template Fields

346

- `dbt_cloud_conn_id`

347

- `run_id`

348

- `path`

349

- `account_id`

350

- `output_file_name`

351

352

### DbtCloudListJobsOperator Template Fields

353

- `account_id`

354

- `project_id`

355

356

## Types

357

358

```python { .api }

359

from typing import Any, Dict, List, Optional

360

from airflow.models import BaseOperator

361

from airflow.models.baseoperatorlink import BaseOperatorLink

362

from airflow.utils.context import Context

363

364

class DbtCloudRunJobOperatorLink(BaseOperatorLink):

365

"""Operator link for monitoring job runs in dbt Cloud UI."""

366

name = "Monitor Job Run"

367

368

def get_link(self, operator: BaseOperator, *, ti_key=None) -> str:

369

"""

370

Generate link to dbt Cloud job run monitoring page.

371

372

Args:

373

operator: The operator instance

374

ti_key: Task instance key

375

376

Returns:

377

str: URL to dbt Cloud job run page

378

"""

379

```