Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration
84
The DbtCloudRunJobTrigger provides asynchronous monitoring capabilities for dbt Cloud job runs in deferrable Airflow tasks. This trigger enables efficient resource utilization by monitoring job status without occupying worker slots, making it ideal for long-running dbt Cloud jobs.
The trigger runs asynchronously to monitor dbt Cloud job status and emit events when job states change.
class DbtCloudRunJobTrigger:
def __init__(
self,
conn_id: str,
run_id: int,
end_time: float,
poll_interval: float,
account_id: int | None
):
"""
Async trigger for monitoring dbt Cloud job status.
Args:
conn_id: Airflow connection ID for dbt Cloud
run_id: dbt Cloud job run ID to monitor
end_time: Unix timestamp when monitoring should timeout
poll_interval: Seconds between status checks
account_id: dbt Cloud account ID (optional)
"""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serialize trigger for persistence across Airflow restarts.
Returns:
tuple[str, dict]: (class_path, kwargs) for trigger reconstruction
"""
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Main async execution loop that monitors job status.
Yields:
TriggerEvent: Events containing job status updates and completion
"""
async def is_still_running(self, hook: DbtCloudHook) -> bool:
"""
Check if the job run is still in progress.
Args:
hook: DbtCloudHook instance for API communication
Returns:
bool: True if job is still running, False if terminal state reached
"""The trigger is typically used internally by deferrable operators, but understanding its behavior helps with debugging and advanced configurations.
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from datetime import datetime
dag = DAG(
'deferrable_dbt_workflow',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
# The trigger is automatically used when deferrable=True
run_dbt_job = DbtCloudRunJobOperator(
task_id='run_dbt_job',
job_id=12345,
deferrable=True, # This enables the trigger
timeout=7200, # 2 hours
check_interval=300, # Check every 5 minutes
dag=dag,
)from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
# Sensor using the trigger for async monitoring
monitor_job = DbtCloudJobRunSensor(
task_id='monitor_dbt_job',
run_id="{{ task_instance.xcom_pull(task_ids='start_job') }}",
deferrable=True, # Enables trigger usage
timeout=14400, # 4 hours
poke_interval=180, # Check every 3 minutes
dag=dag,
)While the trigger is typically managed automatically, understanding its configuration helps with troubleshooting:
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
import time
# Example of trigger parameters (normally handled automatically)
trigger = DbtCloudRunJobTrigger(
conn_id='dbt_cloud_default',
run_id=98765,
end_time=time.time() + 3600, # 1 hour from now
poll_interval=60, # Check every minute
account_id=12345
)from airflow.operators.python import PythonOperator
def handle_dbt_completion(**context):
"""Handle completion event from dbt trigger."""
event = context.get('event', {})
run_id = event.get('run_id')
status = event.get('status')
print(f"dbt job run {run_id} completed with status: {status}")
if status == 'success':
return "Job completed successfully"
else:
raise Exception(f"Job failed with status: {status}")
# Process trigger completion
process_completion = PythonOperator(
task_id='process_completion',
python_callable=handle_dbt_completion,
dag=dag,
)
run_dbt_job >> process_completionThe trigger emits events with the following structure:
{
"status": "success" | "error" | "cancelled" | "timeout",
"run_id": int,
"message": str,
"job_run_url": str # Optional URL to dbt Cloud UI
}{
"status": "success",
"run_id": 12345,
"message": "dbt Cloud job run completed successfully",
"job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"
}{
"status": "error",
"run_id": 12345,
"message": "dbt Cloud job run failed",
"job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"
}{
"status": "timeout",
"run_id": 12345,
"message": "dbt Cloud job run monitoring timed out after 3600 seconds"
}# Monitor multiple jobs with separate triggers (automatic)
jobs = [111, 222, 333]
for i, job_id in enumerate(jobs):
run_job = DbtCloudRunJobOperator(
task_id=f'run_job_{i}',
job_id=job_id,
deferrable=True, # Each gets its own trigger
wait_for_termination=True,
dag=dag,
)# Fast polling for critical jobs
critical_job = DbtCloudRunJobOperator(
task_id='critical_job',
job_id=99999,
deferrable=True,
check_interval=30, # Check every 30 seconds
timeout=1800, # 30 minutes max
dag=dag,
)
# Slow polling for batch jobs
batch_job = DbtCloudRunJobOperator(
task_id='batch_job',
job_id=11111,
deferrable=True,
check_interval=600, # Check every 10 minutes
timeout=28800, # 8 hours max
dag=dag,
)def handle_trigger_failure(**context):
"""Custom error handling for trigger failures."""
task_instance = context['task_instance']
exception = context.get('exception')
if 'timeout' in str(exception).lower():
print("Job monitoring timed out - job may still be running")
# Implement custom timeout handling
else:
print(f"Job monitoring failed: {exception}")
# Implement custom error handling
long_running_job = DbtCloudRunJobOperator(
task_id='long_running_job',
job_id=55555,
deferrable=True,
timeout=43200, # 12 hours
on_failure_callback=handle_trigger_failure,
dag=dag,
)from airflow.operators.dummy import DummyOperator
# Start multiple long-running jobs without blocking workers
start = DummyOperator(task_id='start', dag=dag)
# All these jobs run concurrently without occupying worker slots
deferrable_jobs = []
for i in range(10): # 10 parallel jobs
job = DbtCloudRunJobOperator(
task_id=f'dbt_job_{i}',
job_id=10000 + i,
deferrable=True, # Uses trigger - no worker blocking
dag=dag,
)
start >> job
deferrable_jobs.append(job)
# Proceed after all complete
finish = DummyOperator(task_id='finish', dag=dag)
deferrable_jobs >> finish# Check trigger logs in Airflow UI:
# Admin > Logs > Triggerer Logs
# Or via CLI: airflow logs triggererfrom typing import Any, AsyncIterator, Dict, Tuple
from airflow.triggers.base import BaseTrigger, TriggerEvent
class TriggerEvent:
"""Event emitted by triggers to notify task completion."""
payload: Dict[str, Any]
# The trigger inherits from BaseTrigger
# Standard trigger serialization and async patterns applyInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloudevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10