Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration
npx @tessl/cli install tessl/pypi-apache-airflow-providers-dbt-cloud@4.4.0An Apache Airflow provider package that enables seamless integration with dbt Cloud for orchestrating data transformation workflows. This provider offers comprehensive connectivity to dbt Cloud's analytics engineering platform, allowing teams to trigger dbt Cloud jobs, monitor their status, retrieve artifacts, and integrate with data lineage systems within Airflow DAGs.
pip install apache-airflow-providers-dbt-cloudfrom airflow.providers.dbt.cloud.hooks.dbt import (
DbtCloudHook,
DbtCloudJobRunStatus,
DbtCloudJobRunException,
DbtCloudResourceLookupError,
JobRunInfo
)
from airflow.providers.dbt.cloud.operators.dbt import (
DbtCloudRunJobOperator,
DbtCloudGetJobRunArtifactOperator,
DbtCloudListJobsOperator
)
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTriggerfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
# Define default arguments
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create DAG
dag = DAG(
'dbt_cloud_workflow',
default_args=default_args,
description='Run dbt Cloud transformation job',
schedule_interval=timedelta(hours=6),
catchup=False,
)
# Run a dbt Cloud job
run_dbt_job = DbtCloudRunJobOperator(
task_id='run_dbt_models',
dbt_cloud_conn_id='dbt_cloud_default',
job_id=12345,
check_interval=60,
timeout=3600,
dag=dag,
)
# Monitor job completion with sensor
wait_for_completion = DbtCloudJobRunSensor(
task_id='wait_for_dbt_completion',
dbt_cloud_conn_id='dbt_cloud_default',
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
timeout=3600,
poke_interval=60,
dag=dag,
)
run_dbt_job >> wait_for_completionThe dbt Cloud provider follows Airflow's standard provider pattern with four main component types:
The provider integrates with Airflow's connection system for secure credential management and supports both synchronous and asynchronous (deferrable) task execution patterns.
Low-level interface for comprehensive dbt Cloud API interaction, providing methods for account management, project operations, job execution, and artifact retrieval.
class DbtCloudHook:
def __init__(self, dbt_cloud_conn_id: str = "dbt_cloud_default"): ...
def get_conn(self) -> Session: ...
def test_connection(self) -> tuple[bool, str]: ...
def trigger_job_run(self, job_id: int, cause: str, **kwargs) -> Response: ...
def get_job_run_status(self, run_id: int, **kwargs) -> int: ...
def wait_for_job_run_status(self, run_id: int, **kwargs) -> bool: ...Operators for triggering and managing dbt Cloud job executions with comprehensive configuration options and integration capabilities.
class DbtCloudRunJobOperator:
def __init__(
self,
job_id: int | None = None,
project_name: str | None = None,
environment_name: str | None = None,
job_name: str | None = None,
**kwargs
): ...
class DbtCloudGetJobRunArtifactOperator:
def __init__(self, run_id: int, path: str, **kwargs): ...
class DbtCloudListJobsOperator:
def __init__(self, account_id: int | None = None, **kwargs): ...Sensor for monitoring dbt Cloud job run status with support for both polling and deferrable execution modes.
class DbtCloudJobRunSensor:
def __init__(
self,
run_id: int,
account_id: int | None = None,
deferrable: bool = False,
**kwargs
): ...Async trigger for efficient monitoring of dbt Cloud job status in deferrable tasks.
class DbtCloudRunJobTrigger:
def __init__(
self,
conn_id: str,
run_id: int,
end_time: float,
poll_interval: float,
account_id: int | None
): ...Automatic data lineage tracking integration with OpenLineage for comprehensive visibility across dbt transformations and Airflow workflows.
def generate_openlineage_events_from_dbt_cloud_run(
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor,
task_instance: TaskInstance
) -> OperatorLineage: ...Create a dbt Cloud connection in Airflow with the following parameters:
dbt_cloud_default (or custom name)dbt Cloudcloud.getdbt.com (or your custom dbt Cloud instance, defaults to cloud.getdbt.com){
"account_id": 12345,
"proxies": {
"http": "http://proxy.company.com:8080",
"https": "https://proxy.company.com:8080"
}
}The connection form in Airflow UI uses the following field mappings:
from enum import Enum
from typing import TypedDict, Sequence
from airflow.exceptions import AirflowException
from requests import PreparedRequest
class DbtCloudJobRunStatus(Enum):
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELLED = 30
NON_TERMINAL_STATUSES = (QUEUED, STARTING, RUNNING)
TERMINAL_STATUSES = (SUCCESS, ERROR, CANCELLED)
@classmethod
def check_is_valid(cls, statuses: int | Sequence[int] | set[int]) -> None: ...
@classmethod
def is_terminal(cls, status: int) -> bool: ...
class JobRunInfo(TypedDict):
account_id: int | None
run_id: int
class DbtCloudJobRunException(AirflowException):
"""Exception raised when a dbt Cloud job run fails."""
class DbtCloudResourceLookupError(AirflowException):
"""Exception raised when a dbt Cloud resource cannot be found."""
class TokenAuth:
"""Helper class for Auth when executing requests."""
def __init__(self, token: str): ...
def __call__(self, request: PreparedRequest) -> PreparedRequest: ...