Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration
84
The dbt Cloud provider includes three main operators for job execution and management. These operators provide high-level abstractions for common dbt Cloud operations within Airflow DAGs, handling job execution scheduling, artifact retrieval, and job discovery.
The DbtCloudRunJobOperator executes dbt Cloud jobs with comprehensive configuration options and monitoring capabilities.
class DbtCloudRunJobOperator:
def __init__(
self,
dbt_cloud_conn_id: str = "dbt_cloud_default",
job_id: int | None = None,
project_name: str | None = None,
environment_name: str | None = None,
job_name: str | None = None,
account_id: int | None = None,
trigger_reason: str | None = None,
steps_override: list[str] | None = None,
schema_override: str | None = None,
wait_for_termination: bool = True,
timeout: int = 60 * 60 * 24 * 7,
check_interval: int = 60,
additional_run_config: dict[str, Any] | None = None,
reuse_existing_run: bool = False,
retry_from_failure: bool = False,
deferrable: bool = False,
**kwargs
):
"""
Execute a dbt Cloud job.
Args:
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
job_id: dbt Cloud job ID to execute (mutually exclusive with name-based lookup)
project_name: Project name for job lookup (requires environment_name and job_name)
environment_name: Environment name for job lookup (requires project_name and job_name)
job_name: Job name for job lookup (requires project_name and environment_name)
account_id: dbt Cloud account ID (defaults to connection default)
trigger_reason: Reason for triggering the job (max 255 characters)
steps_override: Custom list of dbt commands to run
schema_override: Override default schema/dataset name
wait_for_termination: Whether to wait for job completion
timeout: Maximum time to wait for job completion (seconds)
check_interval: Time between status checks (seconds)
additional_run_config: Additional configuration parameters
reuse_existing_run: Use existing run if job is already running
retry_from_failure: Resume from last failure point if retrying
deferrable: Use async execution mode
"""
def execute(self, context: Context) -> int:
"""
Execute the dbt Cloud job.
Args:
context: Airflow task execution context
Returns:
int: dbt Cloud job run ID
Raises:
DbtCloudJobRunException: If job execution fails
"""
def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
"""
Complete execution for deferrable tasks.
Args:
context: Airflow task execution context
event: Trigger event containing job status
Returns:
int: dbt Cloud job run ID
"""
def on_kill(self) -> None:
"""Cancel the running dbt Cloud job when task is killed."""
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
"""
Generate OpenLineage metadata facets for data lineage tracking.
Args:
task_instance: Airflow task instance
Returns:
OperatorLineage: OpenLineage facets for lineage tracking
"""The DbtCloudGetJobRunArtifactOperator downloads artifacts from completed dbt Cloud job runs.
class DbtCloudGetJobRunArtifactOperator:
def __init__(
self,
dbt_cloud_conn_id: str = "dbt_cloud_default",
run_id: int,
path: str,
account_id: int | None = None,
step: int | None = None,
output_file_name: str | None = None,
**kwargs
):
"""
Download artifacts from a dbt Cloud job run.
Args:
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
run_id: dbt Cloud job run ID to retrieve artifacts from
path: Path to the artifact (e.g., 'manifest.json', 'run_results.json', 'catalog.json')
account_id: dbt Cloud account ID (defaults to connection default)
step: Specific step number to retrieve artifact from
output_file_name: Local filename to save artifact (defaults to artifact path basename)
"""
def execute(self, context: Context) -> str:
"""
Download the specified artifact.
Args:
context: Airflow task execution context
Returns:
str: Local path to the downloaded artifact file
"""The DbtCloudListJobsOperator retrieves information about jobs in a dbt Cloud account or project.
class DbtCloudListJobsOperator:
def __init__(
self,
dbt_cloud_conn_id: str = "dbt_cloud_default",
account_id: int | None = None,
project_id: int | None = None,
order_by: str | None = None,
**kwargs
):
"""
List jobs in a dbt Cloud account or project.
Args:
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
account_id: dbt Cloud account ID (defaults to connection default)
project_id: Filter jobs by project ID
order_by: Field to order results by (e.g., 'name', 'created_at')
"""
def execute(self, context: Context) -> list:
"""
Retrieve the list of jobs.
Args:
context: Airflow task execution context
Returns:
list: List of dbt Cloud job IDs (integers)
"""from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from datetime import datetime, timedelta
dag = DAG(
'dbt_transform_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
# Execute dbt job by ID
run_dbt_models = DbtCloudRunJobOperator(
task_id='run_dbt_models',
dbt_cloud_conn_id='dbt_cloud_prod',
job_id=12345,
trigger_reason='Daily scheduled transformation',
timeout=3600,
dag=dag,
)# Execute job by project/environment/job names
run_staging_models = DbtCloudRunJobOperator(
task_id='run_staging_models',
project_name='analytics_project',
environment_name='production',
job_name='staging_models_daily',
trigger_reason='Staging data refresh',
steps_override=['dbt run --models tag:staging', 'dbt test --models tag:staging'],
dag=dag,
)# Execute with advanced configuration
run_full_pipeline = DbtCloudRunJobOperator(
task_id='run_full_pipeline',
job_id=54321,
schema_override='analytics_{{ ds_nodash }}', # Dynamic schema based on execution date
additional_run_config={
'threads': 8,
'target': 'prod',
'vars': {
'start_date': '{{ ds }}',
'end_date': '{{ next_ds }}'
}
},
retry_from_failure=True,
reuse_existing_run=False,
dag=dag,
)# Use deferrable execution for resource efficiency
run_long_job = DbtCloudRunJobOperator(
task_id='run_long_dbt_job',
job_id=99999,
deferrable=True, # Enable async execution
timeout=14400, # 4 hours
check_interval=300, # Check every 5 minutes
dag=dag,
)from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator
# Download job artifacts after completion
download_manifest = DbtCloudGetJobRunArtifactOperator(
task_id='download_manifest',
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
path='manifest.json',
output_file_name='dbt_manifest_{{ ds }}.json',
dag=dag,
)
download_run_results = DbtCloudGetJobRunArtifactOperator(
task_id='download_run_results',
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
path='run_results.json',
dag=dag,
)
# Set task dependencies
run_dbt_models >> [download_manifest, download_run_results]from airflow.providers.dbt.cloud.operators.dbt import DbtCloudListJobsOperator
# List all jobs in an account
list_all_jobs = DbtCloudListJobsOperator(
task_id='list_all_jobs',
account_id=12345,
order_by='name',
dag=dag,
)
# List jobs for specific project
list_project_jobs = DbtCloudListJobsOperator(
task_id='list_project_jobs',
project_id=67890,
dag=dag,
)from airflow.operators.python import PythonOperator
def handle_dbt_failure(**context):
"""Custom error handling for dbt job failures."""
run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_models')
if run_id:
print(f"dbt job run {run_id} failed - triggering cleanup")
# Add custom failure handling logic here
# Add failure callback
run_dbt_models = DbtCloudRunJobOperator(
task_id='run_dbt_models',
job_id=12345,
on_failure_callback=handle_dbt_failure,
dag=dag,
)# When using with OpenLineage provider for data lineage
# The operator automatically generates lineage metadata
run_with_lineage = DbtCloudRunJobOperator(
task_id='run_with_lineage',
job_id=12345,
# OpenLineage integration happens automatically
# when apache-airflow-providers-openlineage >= 2.3.0 is installed
dag=dag,
)All operators support Airflow templating for dynamic values:
dbt_cloud_conn_idjob_idproject_nameenvironment_namejob_nameaccount_idtrigger_reasonsteps_overrideschema_overrideadditional_run_configdbt_cloud_conn_idrun_idpathaccount_idoutput_file_nameaccount_idproject_idfrom typing import Any, Dict, List, Optional
from airflow.models import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.utils.context import Context
class DbtCloudRunJobOperatorLink(BaseOperatorLink):
"""Operator link for monitoring job runs in dbt Cloud UI."""
name = "Monitor Job Run"
def get_link(self, operator: BaseOperator, *, ti_key=None) -> str:
"""
Generate link to dbt Cloud job run monitoring page.
Args:
operator: The operator instance
ti_key: Task instance key
Returns:
str: URL to dbt Cloud job run page
"""Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloudevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10