CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-dbt-cloud

Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration

84

1.00x
Overview
Eval results
Files

operators.mddocs/

Job Execution Operators

The dbt Cloud provider includes three main operators for job execution and management. These operators provide high-level abstractions for common dbt Cloud operations within Airflow DAGs, handling job execution scheduling, artifact retrieval, and job discovery.

Capabilities

Job Execution Operator

The DbtCloudRunJobOperator executes dbt Cloud jobs with comprehensive configuration options and monitoring capabilities.

class DbtCloudRunJobOperator:
    def __init__(
        self,
        dbt_cloud_conn_id: str = "dbt_cloud_default",
        job_id: int | None = None,
        project_name: str | None = None,
        environment_name: str | None = None,
        job_name: str | None = None,
        account_id: int | None = None,
        trigger_reason: str | None = None,
        steps_override: list[str] | None = None,
        schema_override: str | None = None,
        wait_for_termination: bool = True,
        timeout: int = 60 * 60 * 24 * 7,
        check_interval: int = 60,
        additional_run_config: dict[str, Any] | None = None,
        reuse_existing_run: bool = False,
        retry_from_failure: bool = False,
        deferrable: bool = False,
        **kwargs
    ):
        """
        Execute a dbt Cloud job.
        
        Args:
            dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
            job_id: dbt Cloud job ID to execute (mutually exclusive with name-based lookup)
            project_name: Project name for job lookup (requires environment_name and job_name)
            environment_name: Environment name for job lookup (requires project_name and job_name)
            job_name: Job name for job lookup (requires project_name and environment_name)
            account_id: dbt Cloud account ID (defaults to connection default)
            trigger_reason: Reason for triggering the job (max 255 characters)
            steps_override: Custom list of dbt commands to run
            schema_override: Override default schema/dataset name
            wait_for_termination: Whether to wait for job completion
            timeout: Maximum time to wait for job completion (seconds)
            check_interval: Time between status checks (seconds)
            additional_run_config: Additional configuration parameters
            reuse_existing_run: Use existing run if job is already running
            retry_from_failure: Resume from last failure point if retrying
            deferrable: Use async execution mode
        """
    
    def execute(self, context: Context) -> int:
        """
        Execute the dbt Cloud job.
        
        Args:
            context: Airflow task execution context
            
        Returns:
            int: dbt Cloud job run ID
            
        Raises:
            DbtCloudJobRunException: If job execution fails
        """
    
    def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
        """
        Complete execution for deferrable tasks.
        
        Args:
            context: Airflow task execution context
            event: Trigger event containing job status
            
        Returns:
            int: dbt Cloud job run ID
        """
    
    def on_kill(self) -> None:
        """Cancel the running dbt Cloud job when task is killed."""
    
    def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
        """
        Generate OpenLineage metadata facets for data lineage tracking.
        
        Args:
            task_instance: Airflow task instance
            
        Returns:
            OperatorLineage: OpenLineage facets for lineage tracking
        """

Artifact Retrieval Operator

The DbtCloudGetJobRunArtifactOperator downloads artifacts from completed dbt Cloud job runs.

class DbtCloudGetJobRunArtifactOperator:
    def __init__(
        self,
        dbt_cloud_conn_id: str = "dbt_cloud_default",
        run_id: int,
        path: str,
        account_id: int | None = None,
        step: int | None = None,
        output_file_name: str | None = None,
        **kwargs
    ):
        """
        Download artifacts from a dbt Cloud job run.
        
        Args:
            dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
            run_id: dbt Cloud job run ID to retrieve artifacts from
            path: Path to the artifact (e.g., 'manifest.json', 'run_results.json', 'catalog.json')
            account_id: dbt Cloud account ID (defaults to connection default)
            step: Specific step number to retrieve artifact from
            output_file_name: Local filename to save artifact (defaults to artifact path basename)
        """
    
    def execute(self, context: Context) -> str:
        """
        Download the specified artifact.
        
        Args:
            context: Airflow task execution context
            
        Returns:
            str: Local path to the downloaded artifact file
        """

Job Listing Operator

The DbtCloudListJobsOperator retrieves information about jobs in a dbt Cloud account or project.

class DbtCloudListJobsOperator:
    def __init__(
        self,
        dbt_cloud_conn_id: str = "dbt_cloud_default",
        account_id: int | None = None,
        project_id: int | None = None,
        order_by: str | None = None,
        **kwargs
    ):
        """
        List jobs in a dbt Cloud account or project.
        
        Args:
            dbt_cloud_conn_id: Airflow connection ID for dbt Cloud
            account_id: dbt Cloud account ID (defaults to connection default)
            project_id: Filter jobs by project ID
            order_by: Field to order results by (e.g., 'name', 'created_at')
        """
    
    def execute(self, context: Context) -> list:
        """
        Retrieve the list of jobs.
        
        Args:
            context: Airflow task execution context
            
        Returns:
            list: List of dbt Cloud job IDs (integers)
        """

Usage Examples

Basic Job Execution

from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from datetime import datetime, timedelta

dag = DAG(
    'dbt_transform_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)

# Execute dbt job by ID
run_dbt_models = DbtCloudRunJobOperator(
    task_id='run_dbt_models',
    dbt_cloud_conn_id='dbt_cloud_prod',
    job_id=12345,
    trigger_reason='Daily scheduled transformation',
    timeout=3600,
    dag=dag,
)

Job Execution with Name-based Lookup

# Execute job by project/environment/job names
run_staging_models = DbtCloudRunJobOperator(
    task_id='run_staging_models',
    project_name='analytics_project',
    environment_name='production',
    job_name='staging_models_daily',
    trigger_reason='Staging data refresh',
    steps_override=['dbt run --models tag:staging', 'dbt test --models tag:staging'],
    dag=dag,
)

Advanced Job Configuration

# Execute with advanced configuration
run_full_pipeline = DbtCloudRunJobOperator(
    task_id='run_full_pipeline',
    job_id=54321,
    schema_override='analytics_{{ ds_nodash }}',  # Dynamic schema based on execution date
    additional_run_config={
        'threads': 8,
        'target': 'prod',
        'vars': {
            'start_date': '{{ ds }}',
            'end_date': '{{ next_ds }}'
        }
    },
    retry_from_failure=True,
    reuse_existing_run=False,
    dag=dag,
)

Deferrable Job Execution

# Use deferrable execution for resource efficiency
run_long_job = DbtCloudRunJobOperator(
    task_id='run_long_dbt_job',
    job_id=99999,
    deferrable=True,  # Enable async execution
    timeout=14400,    # 4 hours
    check_interval=300,  # Check every 5 minutes
    dag=dag,
)

Artifact Retrieval

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator

# Download job artifacts after completion
download_manifest = DbtCloudGetJobRunArtifactOperator(
    task_id='download_manifest',
    run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
    path='manifest.json',
    output_file_name='dbt_manifest_{{ ds }}.json',
    dag=dag,
)

download_run_results = DbtCloudGetJobRunArtifactOperator(
    task_id='download_run_results',
    run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",
    path='run_results.json',
    dag=dag,
)

# Set task dependencies
run_dbt_models >> [download_manifest, download_run_results]

Job Discovery

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudListJobsOperator

# List all jobs in an account
list_all_jobs = DbtCloudListJobsOperator(
    task_id='list_all_jobs',
    account_id=12345,
    order_by='name',
    dag=dag,
)

# List jobs for specific project
list_project_jobs = DbtCloudListJobsOperator(
    task_id='list_project_jobs',
    project_id=67890,
    dag=dag,
)

Error Handling and Monitoring

from airflow.operators.python import PythonOperator

def handle_dbt_failure(**context):
    """Custom error handling for dbt job failures."""
    run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_models')
    if run_id:
        print(f"dbt job run {run_id} failed - triggering cleanup")
        # Add custom failure handling logic here

# Add failure callback
run_dbt_models = DbtCloudRunJobOperator(
    task_id='run_dbt_models',
    job_id=12345,
    on_failure_callback=handle_dbt_failure,
    dag=dag,
)

Data Lineage Integration

# When using with OpenLineage provider for data lineage
# The operator automatically generates lineage metadata
run_with_lineage = DbtCloudRunJobOperator(
    task_id='run_with_lineage',
    job_id=12345,
    # OpenLineage integration happens automatically
    # when apache-airflow-providers-openlineage >= 2.3.0 is installed
    dag=dag,
)

Template Fields

All operators support Airflow templating for dynamic values:

DbtCloudRunJobOperator Template Fields

  • dbt_cloud_conn_id
  • job_id
  • project_name
  • environment_name
  • job_name
  • account_id
  • trigger_reason
  • steps_override
  • schema_override
  • additional_run_config

DbtCloudGetJobRunArtifactOperator Template Fields

  • dbt_cloud_conn_id
  • run_id
  • path
  • account_id
  • output_file_name

DbtCloudListJobsOperator Template Fields

  • account_id
  • project_id

Types

from typing import Any, Dict, List, Optional
from airflow.models import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.utils.context import Context

class DbtCloudRunJobOperatorLink(BaseOperatorLink):
    """Operator link for monitoring job runs in dbt Cloud UI."""
    name = "Monitor Job Run"
    
    def get_link(self, operator: BaseOperator, *, ti_key=None) -> str:
        """
        Generate link to dbt Cloud job run monitoring page.
        
        Args:
            operator: The operator instance
            ti_key: Task instance key
            
        Returns:
            str: URL to dbt Cloud job run page
        """

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloud

docs

hooks.md

index.md

openlineage.md

operators.md

sensors.md

triggers.md

tile.json