Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration
84
The dbt Cloud provider includes comprehensive OpenLineage integration for data lineage tracking. This integration automatically generates lineage metadata from dbt Cloud job runs, enabling complete data lineage visibility across your dbt transformations and downstream Airflow workflows.
The provider automatically generates OpenLineage events from dbt Cloud job runs when the apache-airflow-providers-openlineage package (version 2.3.0 or higher) is installed.
def generate_openlineage_events_from_dbt_cloud_run(
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor,
task_instance: TaskInstance
) -> OperatorLineage:
"""
Generate OpenLineage events from a dbt Cloud run.
Retrieves information about a dbt Cloud run, including the associated job,
project, and execution details. It processes the run's artifacts, such as
the manifest and run results, in parallel for many steps. Then it generates
and emits OpenLineage events based on the executed dbt tasks.
Args:
operator: Instance of dbt Cloud operator that executed dbt tasks
task_instance: Currently executed task instance
Returns:
OperatorLineage: Empty OperatorLineage object indicating completion
"""The integration processes the following dbt artifacts to generate lineage:
The integration creates proper parent-child relationships between Airflow tasks and dbt runs:
class ParentRunMetadata:
run_id: str # Airflow task instance run ID
job_name: str # Format: "{dag_id}.{task_id}"
job_namespace: str # OpenLineage namespace
root_parent_run_id: str # DAG run ID
root_parent_job_name: str # DAG ID
root_parent_job_namespace: str # OpenLineage namespaceOpenLineage integration is enabled automatically when the provider packages are installed:
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
dag = DAG('dbt_with_lineage', start_date=datetime(2024, 1, 1))
# OpenLineage events are generated automatically
run_dbt_job = DbtCloudRunJobOperator(
task_id='transform_data',
job_id=12345,
# Lineage generation happens automatically when job completes
dag=dag,
)To enable OpenLineage integration, install the required packages:
pip install apache-airflow-providers-openlineage>=2.3.0Configure OpenLineage in your Airflow environment:
# airflow.cfg or environment variables
[openlineage]
namespace = production
transport = {"type": "http", "url": "http://marquez:5000"}The sensor also supports automatic lineage generation:
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
# Lineage events generated when sensor completes
monitor_dbt = DbtCloudJobRunSensor(
task_id='wait_for_dbt',
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
# OpenLineage metadata extracted automatically
dag=dag,
)The integration processes individual dbt command steps from job runs:
dbt run")The integration gracefully handles missing artifacts:
# Catalog artifact is optional
try:
catalog = hook.get_job_run_artifact(run_id, path="catalog.json")
except Exception:
# Proceeds without catalog if docs weren't generated
catalog = NoneArtifacts are retrieved concurrently for performance:
# Multiple artifacts retrieved in parallel
step_artifacts = asyncio.run(
get_artifacts_for_steps(
steps=steps,
artifacts=["manifest.json", "run_results.json"]
)
)If you see "HTTP error: Not Found" for catalog.json:
Openlineage could not find dbt catalog artifact, usually available when docs are generated.
Proceeding with metadata extraction.
If you see error logs above about `HTTP error: Not Found` it's safe to ignore them.This is normal when dbt docs generation is not included in the job steps.
Ensure compatibility:
apache-airflow-providers-openlineage >= 2.3.0Enable debug logging to troubleshoot:
import logging
logging.getLogger('airflow.providers.dbt.cloud.utils.openlineage').setLevel(logging.DEBUG)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloudevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10