0
# OpenLineage Integration
1
2
The dbt Cloud provider includes comprehensive OpenLineage integration for data lineage tracking. This integration automatically generates lineage metadata from dbt Cloud job runs, enabling complete data lineage visibility across your dbt transformations and downstream Airflow workflows.
3
4
## Capabilities
5
6
### Automatic Lineage Generation
7
8
The provider automatically generates OpenLineage events from dbt Cloud job runs when the `apache-airflow-providers-openlineage` package (version 2.3.0 or higher) is installed.
9
10
```python { .api }
11
def generate_openlineage_events_from_dbt_cloud_run(
12
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor,
13
task_instance: TaskInstance
14
) -> OperatorLineage:
15
"""
16
Generate OpenLineage events from a dbt Cloud run.
17
18
Retrieves information about a dbt Cloud run, including the associated job,
19
project, and execution details. It processes the run's artifacts, such as
20
the manifest and run results, in parallel for many steps. Then it generates
21
and emits OpenLineage events based on the executed dbt tasks.
22
23
Args:
24
operator: Instance of dbt Cloud operator that executed dbt tasks
25
task_instance: Currently executed task instance
26
27
Returns:
28
OperatorLineage: Empty OperatorLineage object indicating completion
29
"""
30
```
31
32
### Supported Artifacts
33
34
The integration processes the following dbt artifacts to generate lineage:
35
36
- **manifest.json**: dbt project metadata and model dependencies
37
- **run_results.json**: Execution results and statistics
38
- **catalog.json**: Table and column metadata (when docs are generated)
39
40
### Parent Job Metadata
41
42
The integration creates proper parent-child relationships between Airflow tasks and dbt runs:
43
44
```python { .api }
45
class ParentRunMetadata:
46
run_id: str # Airflow task instance run ID
47
job_name: str # Format: "{dag_id}.{task_id}"
48
job_namespace: str # OpenLineage namespace
49
root_parent_run_id: str # DAG run ID
50
root_parent_job_name: str # DAG ID
51
root_parent_job_namespace: str # OpenLineage namespace
52
```
53
54
## Usage Examples
55
56
### Automatic Integration
57
58
OpenLineage integration is enabled automatically when the provider packages are installed:
59
60
```python
61
from airflow import DAG
62
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
63
64
dag = DAG('dbt_with_lineage', start_date=datetime(2024, 1, 1))
65
66
# OpenLineage events are generated automatically
67
run_dbt_job = DbtCloudRunJobOperator(
68
task_id='transform_data',
69
job_id=12345,
70
# Lineage generation happens automatically when job completes
71
dag=dag,
72
)
73
```
74
75
### Requirements
76
77
To enable OpenLineage integration, install the required packages:
78
79
```bash
80
pip install apache-airflow-providers-openlineage>=2.3.0
81
```
82
83
### Configuration
84
85
Configure OpenLineage in your Airflow environment:
86
87
```python
88
# airflow.cfg or environment variables
89
[openlineage]
90
namespace = production
91
transport = {"type": "http", "url": "http://marquez:5000"}
92
```
93
94
### Sensor Integration
95
96
The sensor also supports automatic lineage generation:
97
98
```python
99
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
100
101
# Lineage events generated when sensor completes
102
monitor_dbt = DbtCloudJobRunSensor(
103
task_id='wait_for_dbt',
104
run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",
105
# OpenLineage metadata extracted automatically
106
dag=dag,
107
)
108
```
109
110
## Advanced Configuration
111
112
### Step-Level Processing
113
114
The integration processes individual dbt command steps from job runs:
115
116
- Filters only dbt invocation steps (e.g., "Invoke dbt with `dbt run`")
117
- Matches steps against job's configured execute steps
118
- Processes artifacts for each step concurrently
119
120
### Error Handling
121
122
The integration gracefully handles missing artifacts:
123
124
```python
125
# Catalog artifact is optional
126
try:
127
catalog = hook.get_job_run_artifact(run_id, path="catalog.json")
128
except Exception:
129
# Proceeds without catalog if docs weren't generated
130
catalog = None
131
```
132
133
### Concurrent Processing
134
135
Artifacts are retrieved concurrently for performance:
136
137
```python
138
# Multiple artifacts retrieved in parallel
139
step_artifacts = asyncio.run(
140
get_artifacts_for_steps(
141
steps=steps,
142
artifacts=["manifest.json", "run_results.json"]
143
)
144
)
145
```
146
147
## Lineage Data Flow
148
149
### 1. Job Execution
150
- dbt Cloud job runs and generates artifacts
151
- Airflow operator/sensor monitors completion
152
153
### 2. Artifact Retrieval
154
- System retrieves manifest, run_results, and optional catalog
155
- Processes multiple job steps concurrently
156
157
### 3. Event Generation
158
- Creates OpenLineage events from dbt metadata
159
- Links to parent Airflow task and DAG run
160
- Emits events to configured transport
161
162
### 4. Lineage Tracking
163
- Downstream systems receive complete lineage
164
- dbt model dependencies tracked end-to-end
165
- Data transformations visible across pipeline
166
167
## Troubleshooting
168
169
### Missing Catalog Warnings
170
171
If you see "HTTP error: Not Found" for catalog.json:
172
173
```
174
Openlineage could not find dbt catalog artifact, usually available when docs are generated.
175
Proceeding with metadata extraction.
176
If you see error logs above about `HTTP error: Not Found` it's safe to ignore them.
177
```
178
179
This is normal when dbt docs generation is not included in the job steps.
180
181
### Version Requirements
182
183
Ensure compatibility:
184
- `apache-airflow-providers-openlineage >= 2.3.0`
185
- OpenLineage-compatible dbt version
186
- dbt Cloud job must generate required artifacts
187
188
### Debugging
189
190
Enable debug logging to troubleshoot:
191
192
```python
193
import logging
194
logging.getLogger('airflow.providers.dbt.cloud.utils.openlineage').setLevel(logging.DEBUG)
195
```
196
197
## Integration Benefits
198
199
### Complete Data Lineage
200
- End-to-end visibility from source to dbt models to downstream systems
201
- Automatic discovery of data dependencies and transformations
202
203
### Impact Analysis
204
- Understand downstream effects of dbt model changes
205
- Track data quality issues through transformation pipeline
206
207
### Compliance and Governance
208
- Automated documentation of data transformations
209
- Audit trail for data processing workflows
210
- Schema evolution tracking
211
212
### Operational Insights
213
- Monitor dbt job performance and resource usage
214
- Identify bottlenecks in transformation pipeline
215
- Track data freshness across the stack