CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-dbt-cloud

Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration

84

1.00x
Overview
Eval results
Files

triggers.mddocs/

Async Job Trigger

The DbtCloudRunJobTrigger provides asynchronous monitoring capabilities for dbt Cloud job runs in deferrable Airflow tasks. This trigger enables efficient resource utilization by monitoring job status without occupying worker slots, making it ideal for long-running dbt Cloud jobs.

Capabilities

Async Job Status Monitoring

The trigger runs asynchronously to monitor dbt Cloud job status and emit events when job states change.

class DbtCloudRunJobTrigger:
    def __init__(
        self,
        conn_id: str,
        run_id: int,
        end_time: float,
        poll_interval: float,
        account_id: int | None
    ):
        """
        Async trigger for monitoring dbt Cloud job status.
        
        Args:
            conn_id: Airflow connection ID for dbt Cloud
            run_id: dbt Cloud job run ID to monitor
            end_time: Unix timestamp when monitoring should timeout
            poll_interval: Seconds between status checks
            account_id: dbt Cloud account ID (optional)
        """
    
    def serialize(self) -> tuple[str, dict[str, Any]]:
        """
        Serialize trigger for persistence across Airflow restarts.
        
        Returns:
            tuple[str, dict]: (class_path, kwargs) for trigger reconstruction
        """
    
    async def run(self) -> AsyncIterator[TriggerEvent]:
        """
        Main async execution loop that monitors job status.
        
        Yields:
            TriggerEvent: Events containing job status updates and completion
        """
    
    async def is_still_running(self, hook: DbtCloudHook) -> bool:
        """
        Check if the job run is still in progress.
        
        Args:
            hook: DbtCloudHook instance for API communication
            
        Returns:
            bool: True if job is still running, False if terminal state reached
        """

Usage Examples

Basic Deferrable Operator Usage

The trigger is typically used internally by deferrable operators, but understanding its behavior helps with debugging and advanced configurations.

from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from datetime import datetime

dag = DAG(
    'deferrable_dbt_workflow',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)

# The trigger is automatically used when deferrable=True
run_dbt_job = DbtCloudRunJobOperator(
    task_id='run_dbt_job',
    job_id=12345,
    deferrable=True,        # This enables the trigger
    timeout=7200,           # 2 hours
    check_interval=300,     # Check every 5 minutes
    dag=dag,
)

Sensor with Trigger

from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

# Sensor using the trigger for async monitoring
monitor_job = DbtCloudJobRunSensor(
    task_id='monitor_dbt_job',
    run_id="{{ task_instance.xcom_pull(task_ids='start_job') }}",
    deferrable=True,        # Enables trigger usage
    timeout=14400,          # 4 hours
    poke_interval=180,      # Check every 3 minutes
    dag=dag,
)

Custom Trigger Configuration

While the trigger is typically managed automatically, understanding its configuration helps with troubleshooting:

from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
import time

# Example of trigger parameters (normally handled automatically)
trigger = DbtCloudRunJobTrigger(
    conn_id='dbt_cloud_default',
    run_id=98765,
    end_time=time.time() + 3600,  # 1 hour from now
    poll_interval=60,             # Check every minute
    account_id=12345
)

Monitoring Trigger Events

from airflow.operators.python import PythonOperator

def handle_dbt_completion(**context):
    """Handle completion event from dbt trigger."""
    event = context.get('event', {})
    run_id = event.get('run_id')
    status = event.get('status')
    
    print(f"dbt job run {run_id} completed with status: {status}")
    
    if status == 'success':
        return "Job completed successfully"
    else:
        raise Exception(f"Job failed with status: {status}")

# Process trigger completion
process_completion = PythonOperator(
    task_id='process_completion',
    python_callable=handle_dbt_completion,
    dag=dag,
)

run_dbt_job >> process_completion

Trigger Event Structure

The trigger emits events with the following structure:

{
    "status": "success" | "error" | "cancelled" | "timeout",
    "run_id": int,
    "message": str,
    "job_run_url": str  # Optional URL to dbt Cloud UI
}

Event Types

Success Event

{
    "status": "success",
    "run_id": 12345,
    "message": "dbt Cloud job run completed successfully",
    "job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"
}

Error Event

{
    "status": "error", 
    "run_id": 12345,
    "message": "dbt Cloud job run failed",
    "job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"
}

Timeout Event

{
    "status": "timeout",
    "run_id": 12345,
    "message": "dbt Cloud job run monitoring timed out after 3600 seconds"
}

Advanced Usage Patterns

Multiple Job Monitoring

# Monitor multiple jobs with separate triggers (automatic)
jobs = [111, 222, 333]

for i, job_id in enumerate(jobs):
    run_job = DbtCloudRunJobOperator(
        task_id=f'run_job_{i}',
        job_id=job_id,
        deferrable=True,        # Each gets its own trigger
        wait_for_termination=True,
        dag=dag,
    )

Trigger with Custom Polling Intervals

# Fast polling for critical jobs
critical_job = DbtCloudRunJobOperator(
    task_id='critical_job',
    job_id=99999,
    deferrable=True,
    check_interval=30,      # Check every 30 seconds
    timeout=1800,           # 30 minutes max
    dag=dag,
)

# Slow polling for batch jobs
batch_job = DbtCloudRunJobOperator(
    task_id='batch_job',
    job_id=11111,
    deferrable=True,
    check_interval=600,     # Check every 10 minutes
    timeout=28800,          # 8 hours max
    dag=dag,
)

Error Handling with Triggers

def handle_trigger_failure(**context):
    """Custom error handling for trigger failures."""
    task_instance = context['task_instance']
    exception = context.get('exception')
    
    if 'timeout' in str(exception).lower():
        print("Job monitoring timed out - job may still be running")
        # Implement custom timeout handling
    else:
        print(f"Job monitoring failed: {exception}")
        # Implement custom error handling

long_running_job = DbtCloudRunJobOperator(
    task_id='long_running_job',
    job_id=55555,
    deferrable=True,
    timeout=43200,          # 12 hours
    on_failure_callback=handle_trigger_failure,
    dag=dag,
)

Resource-Efficient Workflows

from airflow.operators.dummy import DummyOperator

# Start multiple long-running jobs without blocking workers
start = DummyOperator(task_id='start', dag=dag)

# All these jobs run concurrently without occupying worker slots
deferrable_jobs = []
for i in range(10):  # 10 parallel jobs
    job = DbtCloudRunJobOperator(
        task_id=f'dbt_job_{i}',
        job_id=10000 + i,
        deferrable=True,    # Uses trigger - no worker blocking
        dag=dag,
    )
    start >> job
    deferrable_jobs.append(job)

# Proceed after all complete
finish = DummyOperator(task_id='finish', dag=dag)
deferrable_jobs >> finish

Performance Considerations

Trigger Efficiency

  • Resource Usage: Triggers run in the Airflow triggerer process, not worker processes
  • Scale: Hundreds of triggers can run concurrently with minimal resource impact
  • Persistence: Triggers survive Airflow restarts and maintain state

Optimal Configuration

  • Poll Interval: Balance between responsiveness and API load
    • Critical jobs: 30-60 seconds
    • Regular jobs: 60-300 seconds
    • Batch jobs: 300-600 seconds
  • Timeout: Set based on expected job duration plus buffer
    • Add 50-100% buffer to expected runtime
    • Consider downstream task dependencies

Best Practices

  • Use deferrable mode for jobs longer than 5 minutes
  • Set reasonable poll intervals to avoid API rate limits
  • Monitor trigger performance in Airflow UI's "Triggers" view
  • Use triggers for I/O-bound operations (API polling, file watching)

Debugging Triggers

Trigger Logs

# Check trigger logs in Airflow UI:
# Admin > Logs > Triggerer Logs
# Or via CLI: airflow logs triggerer

Common Issues

  1. Connection Errors: Check dbt Cloud connection configuration
  2. Timeout Issues: Verify timeout values and job duration expectations
  3. API Rate Limits: Adjust poll intervals if hitting rate limits
  4. Trigger Restart: Triggers resume after Airflow restarts automatically

Types

from typing import Any, AsyncIterator, Dict, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent

class TriggerEvent:
    """Event emitted by triggers to notify task completion."""
    payload: Dict[str, Any]

# The trigger inherits from BaseTrigger
# Standard trigger serialization and async patterns apply

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloud

docs

hooks.md

index.md

openlineage.md

operators.md

sensors.md

triggers.md

tile.json