CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-dbt-cloud

Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration

84

1.00x
Overview
Eval results
Files

sensors.mddocs/

Job Monitoring Sensor

The DbtCloudJobRunSensor provides monitoring capabilities for dbt Cloud job runs within Airflow workflows. It can operate in both traditional polling mode and efficient deferrable mode, making it suitable for various monitoring scenarios and resource constraints.

Capabilities

Job Run Status Monitoring

The sensor monitors dbt Cloud job run status and waits for completion before allowing downstream tasks to proceed.

class DbtCloudJobRunSensor:
    def __init__(
        self,
        dbt_cloud_conn_id: str = "dbt_cloud_default",
        run_id: int,
        account_id: int | None = None,
        deferrable: bool = False,
        **kwargs
    ):
        """
        Monitor the status of a dbt Cloud job run.
        
        Args:
            dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
            run_id: dbt Cloud job run ID to monitor
            account_id: dbt Cloud account ID (defaults to connection default)
            deferrable: Use async execution mode for resource efficiency
            **kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
        """
    
    def poke(self, context: Context) -> bool:
        """
        Check if the job run has reached a terminal status.
        
        Args:
            context: Airflow task execution context
            
        Returns:
            bool: True if job run is complete (success/failure), False if still running
            
        Raises:
            DbtCloudJobRunException: If job run fails or is cancelled
        """
    
    def execute(self, context: Context) -> None:
        """
        Execute the sensor (used for deferrable mode).
        
        Args:
            context: Airflow task execution context
        """
    
    def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
        """
        Complete execution for deferrable sensors.
        
        Args:
            context: Airflow task execution context
            event: Trigger event containing job status
            
        Returns:
            int: dbt Cloud job run ID
        """
    
    def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
        """
        Generate OpenLineage metadata facets for data lineage tracking.
        
        Args:
            task_instance: Airflow task instance
            
        Returns:
            OperatorLineage: OpenLineage facets for lineage tracking
        """

Usage Examples

Basic Job Run Monitoring

from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
from datetime import datetime, timedelta

dag = DAG(
    'dbt_workflow_with_monitoring',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)

# Execute dbt job
run_dbt_job = DbtCloudRunJobOperator(
    task_id='run_dbt_job',
    job_id=12345,
    wait_for_termination=False,  # Don't wait in operator
    dag=dag,
)

# Monitor job completion with sensor
wait_for_completion = DbtCloudJobRunSensor(
    task_id='wait_for_dbt_completion',
    dbt_cloud_conn_id='dbt_cloud_default',
    run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
    timeout=3600,           # 1 hour timeout
    poke_interval=60,       # Check every minute
    dag=dag,
)

run_dbt_job >> wait_for_completion

Deferrable Sensor for Resource Efficiency

# Use deferrable mode for long-running jobs
wait_for_long_job = DbtCloudJobRunSensor(
    task_id='wait_for_long_dbt_job',
    run_id="{{ task_instance.xcom_pull(task_ids='run_long_job') }}",
    deferrable=True,        # Enable async monitoring
    timeout=14400,          # 4 hours timeout
    poke_interval=300,      # Check every 5 minutes
    dag=dag,
)

Multiple Job Monitoring

from airflow.operators.dummy import DummyOperator

# Monitor multiple parallel dbt jobs
start = DummyOperator(task_id='start', dag=dag)

# Start multiple jobs
job_runs = []
for i, job_id in enumerate([111, 222, 333]):
    run_job = DbtCloudRunJobOperator(
        task_id=f'run_job_{i+1}',
        job_id=job_id,
        wait_for_termination=False,
        dag=dag,
    )
    
    wait_job = DbtCloudJobRunSensor(
        task_id=f'wait_job_{i+1}',
        run_id=f"{{{{ task_instance.xcom_pull(task_ids='run_job_{i+1}') }}}}",
        deferrable=True,
        dag=dag,
    )
    
    start >> run_job >> wait_job
    job_runs.append(wait_job)

# Continue after all jobs complete
all_complete = DummyOperator(task_id='all_jobs_complete', dag=dag)
job_runs >> all_complete

Sensor with Custom Configuration

# Sensor with extended timeout and custom polling
monitor_critical_job = DbtCloudJobRunSensor(
    task_id='monitor_critical_job',
    run_id="{{ task_instance.xcom_pull(task_ids='run_critical_models') }}",
    account_id=12345,
    timeout=28800,          # 8 hours for critical job
    poke_interval=120,      # Check every 2 minutes
    mode='poke',            # Traditional polling mode
    dag=dag,
)

Error Handling and Alerting

from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

def check_job_failure(**context):
    """Custom logic for handling dbt job failures."""
    run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_job')
    # Add custom failure analysis logic
    return f"dbt job run {run_id} monitoring failed"

# Sensor with failure handling
monitor_with_alerts = DbtCloudJobRunSensor(
    task_id='monitor_with_alerts',
    run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
    timeout=3600,
    on_failure_callback=check_job_failure,
    dag=dag,
)

# Email alert on sensor failure
send_failure_alert = EmailOperator(
    task_id='send_failure_alert',
    to=['data-team@company.com'],
    subject='dbt Cloud Job Monitoring Failed',
    html_content='dbt Cloud job monitoring failed. Please check the logs.',
    trigger_rule='one_failed',  # Trigger only on upstream failure
    dag=dag,
)

monitor_with_alerts >> send_failure_alert

Conditional Logic Based on Job Status

from airflow.operators.python import BranchPythonOperator

def decide_next_task(**context):
    """Branch logic based on job completion."""
    try:
        # Sensor succeeds, continue with success path
        return 'process_results'
    except Exception:
        # Job failed, go to failure handling
        return 'handle_failure'

# Conditional workflow based on monitoring result
branch_on_result = BranchPythonOperator(
    task_id='branch_on_result',
    python_callable=decide_next_task,
    dag=dag,
)

process_success = DummyOperator(task_id='process_results', dag=dag)
handle_failure = DummyOperator(task_id='handle_failure', dag=dag)

wait_for_completion >> branch_on_result
branch_on_result >> [process_success, handle_failure]

Integration with Data Quality Checks

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator

# Monitor job, then validate results
wait_for_models = DbtCloudJobRunSensor(
    task_id='wait_for_models',
    run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",
    dag=dag,
)

# Download test results for validation
get_test_results = DbtCloudGetJobRunArtifactOperator(
    task_id='get_test_results',
    run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",
    path='run_results.json',
    dag=dag,
)

def validate_test_results(**context):
    """Validate dbt test results from downloaded artifact."""
    # Read and validate test results
    pass

validate_tests = PythonOperator(
    task_id='validate_tests',
    python_callable=validate_test_results,
    dag=dag,
)

wait_for_models >> get_test_results >> validate_tests

Cross-DAG Dependencies

from airflow.sensors.external_task import ExternalTaskSensor

# Wait for dbt job in another DAG
wait_external_dbt = ExternalTaskSensor(
    task_id='wait_external_dbt',
    external_dag_id='upstream_dbt_dag',
    external_task_id='wait_for_dbt_completion',
    dag=dag,
)

# Then run local dbt job
run_downstream_job = DbtCloudRunJobOperator(
    task_id='run_downstream_job',
    job_id=67890,
    dag=dag,
)

wait_external_dbt >> run_downstream_job

Configuration Options

Sensor Parameters

The sensor inherits from BaseSensorOperator and supports all standard sensor configuration:

  • timeout: Maximum time to wait for job completion (seconds)
  • poke_interval: Time between status checks in polling mode (seconds)
  • mode: Execution mode ('poke' for polling, 'reschedule' for rescheduling)
  • soft_fail: Whether to mark as skipped instead of failed on timeout
  • deferrable: Enable async execution for resource efficiency

Template Fields

The sensor supports Airflow templating for dynamic values:

  • dbt_cloud_conn_id
  • run_id
  • account_id

Best Practices

Resource Efficiency

  • Use deferrable=True for long-running jobs to free up worker slots
  • Set appropriate poke_interval to balance responsiveness and API load
  • Use mode='reschedule' for very long jobs to avoid blocking workers

Error Handling

  • Set reasonable timeout values based on expected job duration
  • Use soft_fail=True for non-critical monitoring tasks
  • Implement custom failure callbacks for alert and recovery logic

Monitoring Patterns

  • Combine sensors with operators that don't wait (wait_for_termination=False)
  • Use sensors for complex conditional logic based on job outcomes
  • Monitor multiple parallel jobs with individual sensors for better error isolation

Types

from typing import Any, Dict
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context

# Sensor inherits from BaseSensorOperator
# All standard sensor configuration options are available

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloud

docs

hooks.md

index.md

openlineage.md

operators.md

sensors.md

triggers.md

tile.json