0
# dbt Cloud Hook
1
2
The `DbtCloudHook` provides a comprehensive low-level interface for interacting with the dbt Cloud API. It handles authentication, connection management, and provides methods for all major dbt Cloud operations including account management, project operations, job execution, and artifact retrieval.
3
4
## Capabilities
5
6
### Connection and Authentication
7
8
Manages authentication and connection setup for dbt Cloud API interactions.
9
10
```python { .api }
11
class DbtCloudHook:
12
def __init__(self, dbt_cloud_conn_id: str = "dbt_cloud_default"): ...
13
14
def get_conn(self, *args, **kwargs) -> Session:
15
"""
16
Returns authenticated session for dbt Cloud API.
17
18
Returns:
19
requests.Session: Authenticated session with proper headers
20
"""
21
22
def test_connection(self) -> tuple[bool, str]:
23
"""
24
Test the dbt Cloud connection.
25
26
Returns:
27
tuple[bool, str]: (success, message) tuple indicating connection status
28
"""
29
```
30
31
### Account Management
32
33
Methods for retrieving and managing dbt Cloud account information.
34
35
```python { .api }
36
def list_accounts(self) -> list[Response]:
37
"""
38
Retrieve all authorized accounts.
39
40
Returns:
41
list[Response]: List of account information responses
42
"""
43
44
def get_account(self, account_id: int | None = None) -> Response:
45
"""
46
Get specific account metadata.
47
48
Args:
49
account_id: Account ID (defaults to connection default)
50
51
Returns:
52
Response: Account metadata response
53
"""
54
```
55
56
### Project Management
57
58
Methods for managing dbt Cloud projects within accounts.
59
60
```python { .api }
61
def list_projects(
62
self,
63
account_id: int | None = None,
64
name_contains: str | None = None
65
) -> list[Response]:
66
"""
67
Retrieve projects in an account.
68
69
Args:
70
account_id: Account ID (defaults to connection default)
71
name_contains: Filter projects by name substring
72
73
Returns:
74
list[Response]: List of project information responses
75
"""
76
77
def get_project(self, project_id: int, account_id: int | None = None) -> Response:
78
"""
79
Get specific project details.
80
81
Args:
82
project_id: Project ID to retrieve
83
account_id: Account ID (defaults to connection default)
84
85
Returns:
86
Response: Project details response
87
"""
88
```
89
90
### Environment Management
91
92
Methods for managing dbt Cloud environments within projects.
93
94
```python { .api }
95
def list_environments(
96
self,
97
project_id: int,
98
*,
99
name_contains: str | None = None,
100
account_id: int | None = None
101
) -> list[Response]:
102
"""
103
Retrieve environments for a project.
104
105
Args:
106
project_id: Project ID to query
107
name_contains: Filter environments by name substring
108
account_id: Account ID (defaults to connection default)
109
110
Returns:
111
list[Response]: List of environment information responses
112
"""
113
114
def get_environment(
115
self,
116
project_id: int,
117
environment_id: int,
118
*,
119
account_id: int | None = None
120
) -> Response:
121
"""
122
Get specific environment details.
123
124
Args:
125
project_id: Project ID containing the environment
126
environment_id: Environment ID to retrieve
127
account_id: Account ID (defaults to connection default)
128
129
Returns:
130
Response: Environment details response
131
"""
132
```
133
134
### Job Management
135
136
Methods for managing dbt Cloud jobs and their configurations.
137
138
```python { .api }
139
def list_jobs(
140
self,
141
account_id: int | None = None,
142
order_by: str | None = None,
143
project_id: int | None = None,
144
environment_id: int | None = None,
145
name_contains: str | None = None
146
) -> list[Response]:
147
"""
148
Retrieve jobs with optional filtering.
149
150
Args:
151
account_id: Account ID (defaults to connection default)
152
order_by: Field to order results by
153
project_id: Filter jobs by project ID
154
environment_id: Filter jobs by environment ID
155
name_contains: Filter jobs by name substring
156
157
Returns:
158
list[Response]: List of job information responses
159
"""
160
161
def get_job(self, job_id: int, account_id: int | None = None) -> Response:
162
"""
163
Get specific job details.
164
165
Args:
166
job_id: Job ID to retrieve
167
account_id: Account ID (defaults to connection default)
168
169
Returns:
170
Response: Job details response
171
"""
172
173
def get_job_by_name(
174
self,
175
*,
176
project_name: str,
177
environment_name: str,
178
job_name: str,
179
account_id: int | None = None
180
) -> dict:
181
"""
182
Lookup job by project, environment, and job names.
183
184
Args:
185
project_name: Name of the project containing the job
186
environment_name: Name of the environment containing the job
187
job_name: Name of the job to find
188
account_id: Account ID (defaults to connection default)
189
190
Returns:
191
dict: Job information with job_id and related metadata
192
193
Raises:
194
DbtCloudResourceLookupError: If job cannot be found
195
"""
196
```
197
198
### Job Run Execution
199
200
Methods for triggering and managing dbt Cloud job runs.
201
202
```python { .api }
203
def trigger_job_run(
204
self,
205
job_id: int,
206
cause: str,
207
account_id: int | None = None,
208
steps_override: list[str] | None = None,
209
schema_override: str | None = None,
210
retry_from_failure: bool = False,
211
additional_run_config: dict[str, Any] | None = None
212
) -> Response:
213
"""
214
Trigger execution of a dbt Cloud job.
215
216
Args:
217
job_id: ID of job to execute
218
cause: Reason for triggering the job (max 255 chars)
219
account_id: Account ID (defaults to connection default)
220
steps_override: Custom list of dbt steps to run
221
schema_override: Override default schema/dataset
222
retry_from_failure: Resume from last failure point
223
additional_run_config: Additional configuration for the run
224
225
Returns:
226
Response: Job run creation response containing run_id
227
"""
228
229
def cancel_job_run(self, run_id: int, account_id: int | None = None) -> None:
230
"""
231
Cancel a running job.
232
233
Args:
234
run_id: ID of job run to cancel
235
account_id: Account ID (defaults to connection default)
236
"""
237
238
def retry_failed_job_run(
239
self, job_id: int, account_id: int | None = None
240
) -> Response:
241
"""
242
Retry the most recent failed run of a job.
243
244
Args:
245
job_id: ID of job to retry
246
account_id: Account ID (defaults to connection default)
247
248
Returns:
249
Response: New job run creation response
250
"""
251
```
252
253
### Job Run Monitoring
254
255
Methods for monitoring and querying job run status and details.
256
257
```python { .api }
258
def list_job_runs(
259
self,
260
account_id: int | None = None,
261
include_related: list[str] | None = None,
262
job_definition_id: int | None = None,
263
order_by: str | None = None
264
) -> list[Response]:
265
"""
266
Retrieve job runs with optional filtering.
267
268
Args:
269
account_id: Account ID (defaults to connection default)
270
include_related: Related resources to include in response
271
job_definition_id: Filter runs by job ID
272
order_by: Field to order results by
273
274
Returns:
275
list[Response]: List of job run information responses
276
"""
277
278
def get_job_runs(
279
self,
280
account_id: int | None = None,
281
payload: dict[str, Any] | None = None
282
) -> Response:
283
"""
284
Get job runs with advanced filtering via payload.
285
286
Args:
287
account_id: Account ID (defaults to connection default)
288
payload: Advanced filter and pagination parameters
289
290
Returns:
291
Response: Job runs response with pagination info
292
"""
293
294
def get_job_run(
295
self,
296
run_id: int,
297
account_id: int | None = None,
298
include_related: list[str] | None = None
299
) -> Response:
300
"""
301
Get specific job run details.
302
303
Args:
304
run_id: Job run ID to retrieve
305
account_id: Account ID (defaults to connection default)
306
include_related: Related resources to include in response
307
308
Returns:
309
Response: Job run details response
310
"""
311
312
def get_job_run_status(self, run_id: int, account_id: int | None = None) -> int:
313
"""
314
Get current status of a job run.
315
316
Args:
317
run_id: Job run ID to check
318
account_id: Account ID (defaults to connection default)
319
320
Returns:
321
int: Status code from DbtCloudJobRunStatus enum
322
"""
323
324
def wait_for_job_run_status(
325
self,
326
run_id: int,
327
account_id: int | None = None,
328
expected_statuses: int | Sequence[int] | set[int] = DbtCloudJobRunStatus.SUCCESS.value,
329
check_interval: int = 60,
330
timeout: int = 60 * 60 * 24 * 7
331
) -> bool:
332
"""
333
Wait for job run to reach expected status.
334
335
Args:
336
run_id: Job run ID to monitor
337
account_id: Account ID (defaults to connection default)
338
expected_statuses: Status(es) to wait for
339
check_interval: Seconds between status checks
340
timeout: Maximum seconds to wait
341
342
Returns:
343
bool: True if expected status reached, False if timeout
344
345
Raises:
346
DbtCloudJobRunException: If job run fails or is cancelled
347
"""
348
```
349
350
### Artifact Management
351
352
Methods for retrieving job run artifacts and outputs.
353
354
```python { .api }
355
def list_job_run_artifacts(
356
self,
357
run_id: int,
358
account_id: int | None = None,
359
step: int | None = None
360
) -> list[Response]:
361
"""
362
List available artifacts for a job run.
363
364
Args:
365
run_id: Job run ID to query
366
account_id: Account ID (defaults to connection default)
367
step: Specific step number to list artifacts for
368
369
Returns:
370
list[Response]: List of available artifact information
371
"""
372
373
def get_job_run_artifact(
374
self,
375
run_id: int,
376
path: str,
377
account_id: int | None = None,
378
step: int | None = None
379
) -> Response:
380
"""
381
Download a specific job run artifact.
382
383
Args:
384
run_id: Job run ID containing the artifact
385
path: Path to the artifact (e.g., 'manifest.json', 'run_results.json')
386
account_id: Account ID (defaults to connection default)
387
step: Specific step number to get artifact from
388
389
Returns:
390
Response: Artifact content response
391
"""
392
393
async def get_job_run_artifacts_concurrently(
394
self,
395
run_id: int,
396
artifacts: list[str],
397
account_id: int | None = None,
398
step: int | None = None
399
):
400
"""
401
Download multiple artifacts concurrently.
402
403
Args:
404
run_id: Job run ID containing the artifacts
405
artifacts: List of artifact paths to download
406
account_id: Account ID (defaults to connection default)
407
step: Specific step number to get artifacts from
408
409
Returns:
410
List of artifact content responses
411
"""
412
```
413
414
## Usage Examples
415
416
### Basic Job Execution
417
418
```python
419
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook
420
421
# Initialize hook
422
hook = DbtCloudHook(dbt_cloud_conn_id='dbt_cloud_default')
423
424
# Trigger a job run
425
response = hook.trigger_job_run(
426
job_id=12345,
427
cause="Airflow scheduled run",
428
steps_override=["dbt run", "dbt test"],
429
schema_override="dev_schema"
430
)
431
run_id = response.json()['data']['id']
432
433
# Wait for completion
434
success = hook.wait_for_job_run_status(
435
run_id=run_id,
436
expected_statuses=DbtCloudJobRunStatus.SUCCESS,
437
check_interval=60,
438
timeout=3600
439
)
440
```
441
442
### Artifact Retrieval
443
444
```python
445
# List available artifacts
446
artifacts = hook.list_job_run_artifacts(run_id=run_id)
447
448
# Download specific artifacts
449
manifest = hook.get_job_run_artifact(run_id=run_id, path='manifest.json')
450
run_results = hook.get_job_run_artifact(run_id=run_id, path='run_results.json')
451
```
452
453
### Resource Discovery
454
455
```python
456
# Find job by name instead of ID
457
job_info = hook.get_job_by_name(
458
project_name="analytics",
459
environment_name="production",
460
job_name="daily_transform"
461
)
462
job_id = job_info['job_id']
463
464
# List all jobs in a project
465
jobs = hook.list_jobs(project_id=123, name_contains="daily")
466
```
467
468
## Types
469
470
```python { .api }
471
from enum import IntEnum
472
from typing import TypedDict
473
from requests.auth import AuthBase
474
from requests import PreparedRequest, Session, Response
475
476
class DbtCloudJobRunStatus(Enum):
477
QUEUED = 1
478
STARTING = 2
479
RUNNING = 3
480
SUCCESS = 10
481
ERROR = 20
482
CANCELLED = 30
483
NON_TERMINAL_STATUSES = (QUEUED, STARTING, RUNNING)
484
TERMINAL_STATUSES = (SUCCESS, ERROR, CANCELLED)
485
486
@classmethod
487
def check_is_valid(cls, statuses: int | Sequence[int] | set[int]) -> None: ...
488
489
@classmethod
490
def is_terminal(cls, status: int) -> bool: ...
491
492
class JobRunInfo(TypedDict):
493
account_id: int | None
494
run_id: int
495
496
class TokenAuth(AuthBase):
497
def __init__(self, token: str): ...
498
def __call__(self, request: PreparedRequest) -> PreparedRequest: ...
499
500
class DbtCloudJobRunException(Exception):
501
"""Exception raised when a dbt Cloud job run fails."""
502
503
class DbtCloudResourceLookupError(Exception):
504
"""Exception raised when a dbt Cloud resource cannot be found."""
505
```