Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration
84
The DbtCloudJobRunSensor provides monitoring capabilities for dbt Cloud job runs within Airflow workflows. It can operate in both traditional polling mode and efficient deferrable mode, making it suitable for various monitoring scenarios and resource constraints.
The sensor monitors dbt Cloud job run status and waits for completion before allowing downstream tasks to proceed.
class DbtCloudJobRunSensor:
def __init__(
self,
dbt_cloud_conn_id: str = "dbt_cloud_default",
run_id: int,
account_id: int | None = None,
deferrable: bool = False,
**kwargs
):
"""
Monitor the status of a dbt Cloud job run.
Args:
dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
run_id: dbt Cloud job run ID to monitor
account_id: dbt Cloud account ID (defaults to connection default)
deferrable: Use async execution mode for resource efficiency
**kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
"""
def poke(self, context: Context) -> bool:
"""
Check if the job run has reached a terminal status.
Args:
context: Airflow task execution context
Returns:
bool: True if job run is complete (success/failure), False if still running
Raises:
DbtCloudJobRunException: If job run fails or is cancelled
"""
def execute(self, context: Context) -> None:
"""
Execute the sensor (used for deferrable mode).
Args:
context: Airflow task execution context
"""
def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
"""
Complete execution for deferrable sensors.
Args:
context: Airflow task execution context
event: Trigger event containing job status
Returns:
int: dbt Cloud job run ID
"""
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
"""
Generate OpenLineage metadata facets for data lineage tracking.
Args:
task_instance: Airflow task instance
Returns:
OperatorLineage: OpenLineage facets for lineage tracking
"""from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
from datetime import datetime, timedelta
dag = DAG(
'dbt_workflow_with_monitoring',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
# Execute dbt job
run_dbt_job = DbtCloudRunJobOperator(
task_id='run_dbt_job',
job_id=12345,
wait_for_termination=False, # Don't wait in operator
dag=dag,
)
# Monitor job completion with sensor
wait_for_completion = DbtCloudJobRunSensor(
task_id='wait_for_dbt_completion',
dbt_cloud_conn_id='dbt_cloud_default',
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
timeout=3600, # 1 hour timeout
poke_interval=60, # Check every minute
dag=dag,
)
run_dbt_job >> wait_for_completion# Use deferrable mode for long-running jobs
wait_for_long_job = DbtCloudJobRunSensor(
task_id='wait_for_long_dbt_job',
run_id="{{ task_instance.xcom_pull(task_ids='run_long_job') }}",
deferrable=True, # Enable async monitoring
timeout=14400, # 4 hours timeout
poke_interval=300, # Check every 5 minutes
dag=dag,
)from airflow.operators.dummy import DummyOperator
# Monitor multiple parallel dbt jobs
start = DummyOperator(task_id='start', dag=dag)
# Start multiple jobs
job_runs = []
for i, job_id in enumerate([111, 222, 333]):
run_job = DbtCloudRunJobOperator(
task_id=f'run_job_{i+1}',
job_id=job_id,
wait_for_termination=False,
dag=dag,
)
wait_job = DbtCloudJobRunSensor(
task_id=f'wait_job_{i+1}',
run_id=f"{{{{ task_instance.xcom_pull(task_ids='run_job_{i+1}') }}}}",
deferrable=True,
dag=dag,
)
start >> run_job >> wait_job
job_runs.append(wait_job)
# Continue after all jobs complete
all_complete = DummyOperator(task_id='all_jobs_complete', dag=dag)
job_runs >> all_complete# Sensor with extended timeout and custom polling
monitor_critical_job = DbtCloudJobRunSensor(
task_id='monitor_critical_job',
run_id="{{ task_instance.xcom_pull(task_ids='run_critical_models') }}",
account_id=12345,
timeout=28800, # 8 hours for critical job
poke_interval=120, # Check every 2 minutes
mode='poke', # Traditional polling mode
dag=dag,
)from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
def check_job_failure(**context):
"""Custom logic for handling dbt job failures."""
run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_job')
# Add custom failure analysis logic
return f"dbt job run {run_id} monitoring failed"
# Sensor with failure handling
monitor_with_alerts = DbtCloudJobRunSensor(
task_id='monitor_with_alerts',
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
timeout=3600,
on_failure_callback=check_job_failure,
dag=dag,
)
# Email alert on sensor failure
send_failure_alert = EmailOperator(
task_id='send_failure_alert',
to=['data-team@company.com'],
subject='dbt Cloud Job Monitoring Failed',
html_content='dbt Cloud job monitoring failed. Please check the logs.',
trigger_rule='one_failed', # Trigger only on upstream failure
dag=dag,
)
monitor_with_alerts >> send_failure_alertfrom airflow.operators.python import BranchPythonOperator
def decide_next_task(**context):
"""Branch logic based on job completion."""
try:
# Sensor succeeds, continue with success path
return 'process_results'
except Exception:
# Job failed, go to failure handling
return 'handle_failure'
# Conditional workflow based on monitoring result
branch_on_result = BranchPythonOperator(
task_id='branch_on_result',
python_callable=decide_next_task,
dag=dag,
)
process_success = DummyOperator(task_id='process_results', dag=dag)
handle_failure = DummyOperator(task_id='handle_failure', dag=dag)
wait_for_completion >> branch_on_result
branch_on_result >> [process_success, handle_failure]from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator
# Monitor job, then validate results
wait_for_models = DbtCloudJobRunSensor(
task_id='wait_for_models',
run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",
dag=dag,
)
# Download test results for validation
get_test_results = DbtCloudGetJobRunArtifactOperator(
task_id='get_test_results',
run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",
path='run_results.json',
dag=dag,
)
def validate_test_results(**context):
"""Validate dbt test results from downloaded artifact."""
# Read and validate test results
pass
validate_tests = PythonOperator(
task_id='validate_tests',
python_callable=validate_test_results,
dag=dag,
)
wait_for_models >> get_test_results >> validate_testsfrom airflow.sensors.external_task import ExternalTaskSensor
# Wait for dbt job in another DAG
wait_external_dbt = ExternalTaskSensor(
task_id='wait_external_dbt',
external_dag_id='upstream_dbt_dag',
external_task_id='wait_for_dbt_completion',
dag=dag,
)
# Then run local dbt job
run_downstream_job = DbtCloudRunJobOperator(
task_id='run_downstream_job',
job_id=67890,
dag=dag,
)
wait_external_dbt >> run_downstream_jobThe sensor inherits from BaseSensorOperator and supports all standard sensor configuration:
The sensor supports Airflow templating for dynamic values:
dbt_cloud_conn_idrun_idaccount_iddeferrable=True for long-running jobs to free up worker slotspoke_interval to balance responsiveness and API loadmode='reschedule' for very long jobs to avoid blocking workerstimeout values based on expected job durationsoft_fail=True for non-critical monitoring taskswait_for_termination=False)from typing import Any, Dict
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
# Sensor inherits from BaseSensorOperator
# All standard sensor configuration options are availableInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloudevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10