Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration
84
The DbtCloudHook provides a comprehensive low-level interface for interacting with the dbt Cloud API. It handles authentication, connection management, and provides methods for all major dbt Cloud operations including account management, project operations, job execution, and artifact retrieval.
Manages authentication and connection setup for dbt Cloud API interactions.
class DbtCloudHook:
def __init__(self, dbt_cloud_conn_id: str = "dbt_cloud_default"): ...
def get_conn(self, *args, **kwargs) -> Session:
"""
Returns authenticated session for dbt Cloud API.
Returns:
requests.Session: Authenticated session with proper headers
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the dbt Cloud connection.
Returns:
tuple[bool, str]: (success, message) tuple indicating connection status
"""Methods for retrieving and managing dbt Cloud account information.
def list_accounts(self) -> list[Response]:
"""
Retrieve all authorized accounts.
Returns:
list[Response]: List of account information responses
"""
def get_account(self, account_id: int | None = None) -> Response:
"""
Get specific account metadata.
Args:
account_id: Account ID (defaults to connection default)
Returns:
Response: Account metadata response
"""Methods for managing dbt Cloud projects within accounts.
def list_projects(
self,
account_id: int | None = None,
name_contains: str | None = None
) -> list[Response]:
"""
Retrieve projects in an account.
Args:
account_id: Account ID (defaults to connection default)
name_contains: Filter projects by name substring
Returns:
list[Response]: List of project information responses
"""
def get_project(self, project_id: int, account_id: int | None = None) -> Response:
"""
Get specific project details.
Args:
project_id: Project ID to retrieve
account_id: Account ID (defaults to connection default)
Returns:
Response: Project details response
"""Methods for managing dbt Cloud environments within projects.
def list_environments(
self,
project_id: int,
*,
name_contains: str | None = None,
account_id: int | None = None
) -> list[Response]:
"""
Retrieve environments for a project.
Args:
project_id: Project ID to query
name_contains: Filter environments by name substring
account_id: Account ID (defaults to connection default)
Returns:
list[Response]: List of environment information responses
"""
def get_environment(
self,
project_id: int,
environment_id: int,
*,
account_id: int | None = None
) -> Response:
"""
Get specific environment details.
Args:
project_id: Project ID containing the environment
environment_id: Environment ID to retrieve
account_id: Account ID (defaults to connection default)
Returns:
Response: Environment details response
"""Methods for managing dbt Cloud jobs and their configurations.
def list_jobs(
self,
account_id: int | None = None,
order_by: str | None = None,
project_id: int | None = None,
environment_id: int | None = None,
name_contains: str | None = None
) -> list[Response]:
"""
Retrieve jobs with optional filtering.
Args:
account_id: Account ID (defaults to connection default)
order_by: Field to order results by
project_id: Filter jobs by project ID
environment_id: Filter jobs by environment ID
name_contains: Filter jobs by name substring
Returns:
list[Response]: List of job information responses
"""
def get_job(self, job_id: int, account_id: int | None = None) -> Response:
"""
Get specific job details.
Args:
job_id: Job ID to retrieve
account_id: Account ID (defaults to connection default)
Returns:
Response: Job details response
"""
def get_job_by_name(
self,
*,
project_name: str,
environment_name: str,
job_name: str,
account_id: int | None = None
) -> dict:
"""
Lookup job by project, environment, and job names.
Args:
project_name: Name of the project containing the job
environment_name: Name of the environment containing the job
job_name: Name of the job to find
account_id: Account ID (defaults to connection default)
Returns:
dict: Job information with job_id and related metadata
Raises:
DbtCloudResourceLookupError: If job cannot be found
"""Methods for triggering and managing dbt Cloud job runs.
def trigger_job_run(
self,
job_id: int,
cause: str,
account_id: int | None = None,
steps_override: list[str] | None = None,
schema_override: str | None = None,
retry_from_failure: bool = False,
additional_run_config: dict[str, Any] | None = None
) -> Response:
"""
Trigger execution of a dbt Cloud job.
Args:
job_id: ID of job to execute
cause: Reason for triggering the job (max 255 chars)
account_id: Account ID (defaults to connection default)
steps_override: Custom list of dbt steps to run
schema_override: Override default schema/dataset
retry_from_failure: Resume from last failure point
additional_run_config: Additional configuration for the run
Returns:
Response: Job run creation response containing run_id
"""
def cancel_job_run(self, run_id: int, account_id: int | None = None) -> None:
"""
Cancel a running job.
Args:
run_id: ID of job run to cancel
account_id: Account ID (defaults to connection default)
"""
def retry_failed_job_run(
self, job_id: int, account_id: int | None = None
) -> Response:
"""
Retry the most recent failed run of a job.
Args:
job_id: ID of job to retry
account_id: Account ID (defaults to connection default)
Returns:
Response: New job run creation response
"""Methods for monitoring and querying job run status and details.
def list_job_runs(
self,
account_id: int | None = None,
include_related: list[str] | None = None,
job_definition_id: int | None = None,
order_by: str | None = None
) -> list[Response]:
"""
Retrieve job runs with optional filtering.
Args:
account_id: Account ID (defaults to connection default)
include_related: Related resources to include in response
job_definition_id: Filter runs by job ID
order_by: Field to order results by
Returns:
list[Response]: List of job run information responses
"""
def get_job_runs(
self,
account_id: int | None = None,
payload: dict[str, Any] | None = None
) -> Response:
"""
Get job runs with advanced filtering via payload.
Args:
account_id: Account ID (defaults to connection default)
payload: Advanced filter and pagination parameters
Returns:
Response: Job runs response with pagination info
"""
def get_job_run(
self,
run_id: int,
account_id: int | None = None,
include_related: list[str] | None = None
) -> Response:
"""
Get specific job run details.
Args:
run_id: Job run ID to retrieve
account_id: Account ID (defaults to connection default)
include_related: Related resources to include in response
Returns:
Response: Job run details response
"""
def get_job_run_status(self, run_id: int, account_id: int | None = None) -> int:
"""
Get current status of a job run.
Args:
run_id: Job run ID to check
account_id: Account ID (defaults to connection default)
Returns:
int: Status code from DbtCloudJobRunStatus enum
"""
def wait_for_job_run_status(
self,
run_id: int,
account_id: int | None = None,
expected_statuses: int | Sequence[int] | set[int] = DbtCloudJobRunStatus.SUCCESS.value,
check_interval: int = 60,
timeout: int = 60 * 60 * 24 * 7
) -> bool:
"""
Wait for job run to reach expected status.
Args:
run_id: Job run ID to monitor
account_id: Account ID (defaults to connection default)
expected_statuses: Status(es) to wait for
check_interval: Seconds between status checks
timeout: Maximum seconds to wait
Returns:
bool: True if expected status reached, False if timeout
Raises:
DbtCloudJobRunException: If job run fails or is cancelled
"""Methods for retrieving job run artifacts and outputs.
def list_job_run_artifacts(
self,
run_id: int,
account_id: int | None = None,
step: int | None = None
) -> list[Response]:
"""
List available artifacts for a job run.
Args:
run_id: Job run ID to query
account_id: Account ID (defaults to connection default)
step: Specific step number to list artifacts for
Returns:
list[Response]: List of available artifact information
"""
def get_job_run_artifact(
self,
run_id: int,
path: str,
account_id: int | None = None,
step: int | None = None
) -> Response:
"""
Download a specific job run artifact.
Args:
run_id: Job run ID containing the artifact
path: Path to the artifact (e.g., 'manifest.json', 'run_results.json')
account_id: Account ID (defaults to connection default)
step: Specific step number to get artifact from
Returns:
Response: Artifact content response
"""
async def get_job_run_artifacts_concurrently(
self,
run_id: int,
artifacts: list[str],
account_id: int | None = None,
step: int | None = None
):
"""
Download multiple artifacts concurrently.
Args:
run_id: Job run ID containing the artifacts
artifacts: List of artifact paths to download
account_id: Account ID (defaults to connection default)
step: Specific step number to get artifacts from
Returns:
List of artifact content responses
"""from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook
# Initialize hook
hook = DbtCloudHook(dbt_cloud_conn_id='dbt_cloud_default')
# Trigger a job run
response = hook.trigger_job_run(
job_id=12345,
cause="Airflow scheduled run",
steps_override=["dbt run", "dbt test"],
schema_override="dev_schema"
)
run_id = response.json()['data']['id']
# Wait for completion
success = hook.wait_for_job_run_status(
run_id=run_id,
expected_statuses=DbtCloudJobRunStatus.SUCCESS,
check_interval=60,
timeout=3600
)# List available artifacts
artifacts = hook.list_job_run_artifacts(run_id=run_id)
# Download specific artifacts
manifest = hook.get_job_run_artifact(run_id=run_id, path='manifest.json')
run_results = hook.get_job_run_artifact(run_id=run_id, path='run_results.json')# Find job by name instead of ID
job_info = hook.get_job_by_name(
project_name="analytics",
environment_name="production",
job_name="daily_transform"
)
job_id = job_info['job_id']
# List all jobs in a project
jobs = hook.list_jobs(project_id=123, name_contains="daily")from enum import IntEnum
from typing import TypedDict
from requests.auth import AuthBase
from requests import PreparedRequest, Session, Response
class DbtCloudJobRunStatus(Enum):
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELLED = 30
NON_TERMINAL_STATUSES = (QUEUED, STARTING, RUNNING)
TERMINAL_STATUSES = (SUCCESS, ERROR, CANCELLED)
@classmethod
def check_is_valid(cls, statuses: int | Sequence[int] | set[int]) -> None: ...
@classmethod
def is_terminal(cls, status: int) -> bool: ...
class JobRunInfo(TypedDict):
account_id: int | None
run_id: int
class TokenAuth(AuthBase):
def __init__(self, token: str): ...
def __call__(self, request: PreparedRequest) -> PreparedRequest: ...
class DbtCloudJobRunException(Exception):
"""Exception raised when a dbt Cloud job run fails."""
class DbtCloudResourceLookupError(Exception):
"""Exception raised when a dbt Cloud resource cannot be found."""Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloudevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10