or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdopenlineage.mdoperators.mdsensors.mdtriggers.md

openlineage.mddocs/

0

# OpenLineage Integration

1

2

The dbt Cloud provider includes comprehensive OpenLineage integration for data lineage tracking. This integration automatically generates lineage metadata from dbt Cloud job runs, enabling complete data lineage visibility across your dbt transformations and downstream Airflow workflows.

3

4

## Capabilities

5

6

### Automatic Lineage Generation

7

8

The provider automatically generates OpenLineage events from dbt Cloud job runs when the `apache-airflow-providers-openlineage` package (version 2.3.0 or higher) is installed.

9

10

```python { .api }

11

def generate_openlineage_events_from_dbt_cloud_run(

12

operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor,

13

task_instance: TaskInstance

14

) -> OperatorLineage:

15

"""

16

Generate OpenLineage events from a dbt Cloud run.

17

18

Retrieves information about a dbt Cloud run, including the associated job,

19

project, and execution details. It processes the run's artifacts, such as

20

the manifest and run results, in parallel for many steps. Then it generates

21

and emits OpenLineage events based on the executed dbt tasks.

22

23

Args:

24

operator: Instance of dbt Cloud operator that executed dbt tasks

25

task_instance: Currently executed task instance

26

27

Returns:

28

OperatorLineage: Empty OperatorLineage object indicating completion

29

"""

30

```

31

32

### Supported Artifacts

33

34

The integration processes the following dbt artifacts to generate lineage:

35

36

- **manifest.json**: dbt project metadata and model dependencies

37

- **run_results.json**: Execution results and statistics

38

- **catalog.json**: Table and column metadata (when docs are generated)

39

40

### Parent Job Metadata

41

42

The integration creates proper parent-child relationships between Airflow tasks and dbt runs:

43

44

```python { .api }

45

class ParentRunMetadata:

46

run_id: str # Airflow task instance run ID

47

job_name: str # Format: "{dag_id}.{task_id}"

48

job_namespace: str # OpenLineage namespace

49

root_parent_run_id: str # DAG run ID

50

root_parent_job_name: str # DAG ID

51

root_parent_job_namespace: str # OpenLineage namespace

52

```

53

54

## Usage Examples

55

56

### Automatic Integration

57

58

OpenLineage integration is enabled automatically when the provider packages are installed:

59

60

```python

61

from airflow import DAG

62

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

63

64

dag = DAG('dbt_with_lineage', start_date=datetime(2024, 1, 1))

65

66

# OpenLineage events are generated automatically

67

run_dbt_job = DbtCloudRunJobOperator(

68

task_id='transform_data',

69

job_id=12345,

70

# Lineage generation happens automatically when job completes

71

dag=dag,

72

)

73

```

74

75

### Requirements

76

77

To enable OpenLineage integration, install the required packages:

78

79

```bash

80

pip install apache-airflow-providers-openlineage>=2.3.0

81

```

82

83

### Configuration

84

85

Configure OpenLineage in your Airflow environment:

86

87

```python

88

# airflow.cfg or environment variables

89

[openlineage]

90

namespace = production

91

transport = {"type": "http", "url": "http://marquez:5000"}

92

```

93

94

### Sensor Integration

95

96

The sensor also supports automatic lineage generation:

97

98

```python

99

from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

100

101

# Lineage events generated when sensor completes

102

monitor_dbt = DbtCloudJobRunSensor(

103

task_id='wait_for_dbt',

104

run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",

105

# OpenLineage metadata extracted automatically

106

dag=dag,

107

)

108

```

109

110

## Advanced Configuration

111

112

### Step-Level Processing

113

114

The integration processes individual dbt command steps from job runs:

115

116

- Filters only dbt invocation steps (e.g., "Invoke dbt with `dbt run`")

117

- Matches steps against job's configured execute steps

118

- Processes artifacts for each step concurrently

119

120

### Error Handling

121

122

The integration gracefully handles missing artifacts:

123

124

```python

125

# Catalog artifact is optional

126

try:

127

catalog = hook.get_job_run_artifact(run_id, path="catalog.json")

128

except Exception:

129

# Proceeds without catalog if docs weren't generated

130

catalog = None

131

```

132

133

### Concurrent Processing

134

135

Artifacts are retrieved concurrently for performance:

136

137

```python

138

# Multiple artifacts retrieved in parallel

139

step_artifacts = asyncio.run(

140

get_artifacts_for_steps(

141

steps=steps,

142

artifacts=["manifest.json", "run_results.json"]

143

)

144

)

145

```

146

147

## Lineage Data Flow

148

149

### 1. Job Execution

150

- dbt Cloud job runs and generates artifacts

151

- Airflow operator/sensor monitors completion

152

153

### 2. Artifact Retrieval

154

- System retrieves manifest, run_results, and optional catalog

155

- Processes multiple job steps concurrently

156

157

### 3. Event Generation

158

- Creates OpenLineage events from dbt metadata

159

- Links to parent Airflow task and DAG run

160

- Emits events to configured transport

161

162

### 4. Lineage Tracking

163

- Downstream systems receive complete lineage

164

- dbt model dependencies tracked end-to-end

165

- Data transformations visible across pipeline

166

167

## Troubleshooting

168

169

### Missing Catalog Warnings

170

171

If you see "HTTP error: Not Found" for catalog.json:

172

173

```

174

Openlineage could not find dbt catalog artifact, usually available when docs are generated.

175

Proceeding with metadata extraction.

176

If you see error logs above about `HTTP error: Not Found` it's safe to ignore them.

177

```

178

179

This is normal when dbt docs generation is not included in the job steps.

180

181

### Version Requirements

182

183

Ensure compatibility:

184

- `apache-airflow-providers-openlineage >= 2.3.0`

185

- OpenLineage-compatible dbt version

186

- dbt Cloud job must generate required artifacts

187

188

### Debugging

189

190

Enable debug logging to troubleshoot:

191

192

```python

193

import logging

194

logging.getLogger('airflow.providers.dbt.cloud.utils.openlineage').setLevel(logging.DEBUG)

195

```

196

197

## Integration Benefits

198

199

### Complete Data Lineage

200

- End-to-end visibility from source to dbt models to downstream systems

201

- Automatic discovery of data dependencies and transformations

202

203

### Impact Analysis

204

- Understand downstream effects of dbt model changes

205

- Track data quality issues through transformation pipeline

206

207

### Compliance and Governance

208

- Automated documentation of data transformations

209

- Audit trail for data processing workflows

210

- Schema evolution tracking

211

212

### Operational Insights

213

- Monitor dbt job performance and resource usage

214

- Identify bottlenecks in transformation pipeline

215

- Track data freshness across the stack