CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-dbt-cloud

Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration

84

1.00x
Overview
Eval results
Files

openlineage.mddocs/

OpenLineage Integration

The dbt Cloud provider includes comprehensive OpenLineage integration for data lineage tracking. This integration automatically generates lineage metadata from dbt Cloud job runs, enabling complete data lineage visibility across your dbt transformations and downstream Airflow workflows.

Capabilities

Automatic Lineage Generation

The provider automatically generates OpenLineage events from dbt Cloud job runs when the apache-airflow-providers-openlineage package (version 2.3.0 or higher) is installed.

def generate_openlineage_events_from_dbt_cloud_run(
    operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, 
    task_instance: TaskInstance
) -> OperatorLineage:
    """
    Generate OpenLineage events from a dbt Cloud run.
    
    Retrieves information about a dbt Cloud run, including the associated job,
    project, and execution details. It processes the run's artifacts, such as 
    the manifest and run results, in parallel for many steps. Then it generates 
    and emits OpenLineage events based on the executed dbt tasks.
    
    Args:
        operator: Instance of dbt Cloud operator that executed dbt tasks
        task_instance: Currently executed task instance
        
    Returns:
        OperatorLineage: Empty OperatorLineage object indicating completion
    """

Supported Artifacts

The integration processes the following dbt artifacts to generate lineage:

  • manifest.json: dbt project metadata and model dependencies
  • run_results.json: Execution results and statistics
  • catalog.json: Table and column metadata (when docs are generated)

Parent Job Metadata

The integration creates proper parent-child relationships between Airflow tasks and dbt runs:

class ParentRunMetadata:
    run_id: str                      # Airflow task instance run ID
    job_name: str                    # Format: "{dag_id}.{task_id}"
    job_namespace: str               # OpenLineage namespace
    root_parent_run_id: str          # DAG run ID
    root_parent_job_name: str        # DAG ID
    root_parent_job_namespace: str   # OpenLineage namespace

Usage Examples

Automatic Integration

OpenLineage integration is enabled automatically when the provider packages are installed:

from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

dag = DAG('dbt_with_lineage', start_date=datetime(2024, 1, 1))

# OpenLineage events are generated automatically
run_dbt_job = DbtCloudRunJobOperator(
    task_id='transform_data',
    job_id=12345,
    # Lineage generation happens automatically when job completes
    dag=dag,
)

Requirements

To enable OpenLineage integration, install the required packages:

pip install apache-airflow-providers-openlineage>=2.3.0

Configuration

Configure OpenLineage in your Airflow environment:

# airflow.cfg or environment variables
[openlineage]
namespace = production
transport = {"type": "http", "url": "http://marquez:5000"}

Sensor Integration

The sensor also supports automatic lineage generation:

from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

# Lineage events generated when sensor completes
monitor_dbt = DbtCloudJobRunSensor(
    task_id='wait_for_dbt',
    run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
    # OpenLineage metadata extracted automatically
    dag=dag,
)

Advanced Configuration

Step-Level Processing

The integration processes individual dbt command steps from job runs:

  • Filters only dbt invocation steps (e.g., "Invoke dbt with dbt run")
  • Matches steps against job's configured execute steps
  • Processes artifacts for each step concurrently

Error Handling

The integration gracefully handles missing artifacts:

# Catalog artifact is optional
try:
    catalog = hook.get_job_run_artifact(run_id, path="catalog.json")
except Exception:
    # Proceeds without catalog if docs weren't generated
    catalog = None

Concurrent Processing

Artifacts are retrieved concurrently for performance:

# Multiple artifacts retrieved in parallel
step_artifacts = asyncio.run(
    get_artifacts_for_steps(
        steps=steps, 
        artifacts=["manifest.json", "run_results.json"]
    )
)

Lineage Data Flow

1. Job Execution

  • dbt Cloud job runs and generates artifacts
  • Airflow operator/sensor monitors completion

2. Artifact Retrieval

  • System retrieves manifest, run_results, and optional catalog
  • Processes multiple job steps concurrently

3. Event Generation

  • Creates OpenLineage events from dbt metadata
  • Links to parent Airflow task and DAG run
  • Emits events to configured transport

4. Lineage Tracking

  • Downstream systems receive complete lineage
  • dbt model dependencies tracked end-to-end
  • Data transformations visible across pipeline

Troubleshooting

Missing Catalog Warnings

If you see "HTTP error: Not Found" for catalog.json:

Openlineage could not find dbt catalog artifact, usually available when docs are generated.
Proceeding with metadata extraction.
If you see error logs above about `HTTP error: Not Found` it's safe to ignore them.

This is normal when dbt docs generation is not included in the job steps.

Version Requirements

Ensure compatibility:

  • apache-airflow-providers-openlineage >= 2.3.0
  • OpenLineage-compatible dbt version
  • dbt Cloud job must generate required artifacts

Debugging

Enable debug logging to troubleshoot:

import logging
logging.getLogger('airflow.providers.dbt.cloud.utils.openlineage').setLevel(logging.DEBUG)

Integration Benefits

Complete Data Lineage

  • End-to-end visibility from source to dbt models to downstream systems
  • Automatic discovery of data dependencies and transformations

Impact Analysis

  • Understand downstream effects of dbt model changes
  • Track data quality issues through transformation pipeline

Compliance and Governance

  • Automated documentation of data transformations
  • Audit trail for data processing workflows
  • Schema evolution tracking

Operational Insights

  • Monitor dbt job performance and resource usage
  • Identify bottlenecks in transformation pipeline
  • Track data freshness across the stack

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloud

docs

hooks.md

index.md

openlineage.md

operators.md

sensors.md

triggers.md

tile.json