or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

index.mddocs/

0

# Apache Airflow OpenLineage Provider

1

2

A comprehensive provider package for Apache Airflow that enables OpenLineage data lineage tracking and observability for data pipelines. This provider integrates with the OpenLineage ecosystem to automatically collect and emit metadata about data transformations, job executions, and data flows across various data processing engines and databases.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-openlineage

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-openlineage`

9

- **Minimum Airflow Version**: 2.10.0+

10

11

## Core Imports

12

13

```python

14

from airflow.providers.openlineage import __version__

15

```

16

17

Configuration access:

18

```python

19

from airflow.providers.openlineage.conf import (

20

is_disabled, namespace, transport, selective_enable, custom_extractors

21

)

22

```

23

24

Plugin integration (automatic via Airflow):

25

```python

26

# Automatically loaded when provider is installed

27

# Plugin class: airflow.providers.openlineage.plugins.openlineage.OpenLineageProviderPlugin

28

```

29

30

## Basic Usage

31

32

### Enabling OpenLineage for DAGs

33

34

```python

35

from airflow import DAG

36

from airflow.providers.openlineage.utils.selective_enable import enable_lineage

37

from airflow.operators.empty import EmptyOperator

38

from datetime import datetime

39

40

# Enable lineage for entire DAG

41

dag = enable_lineage(DAG(

42

'example_dag',

43

start_date=datetime(2023, 1, 1),

44

schedule_interval='@daily'

45

))

46

47

# Tasks automatically emit lineage events

48

task = EmptyOperator(task_id='example_task', dag=dag)

49

```

50

51

### Using OpenLineage-aware Operators

52

53

```python

54

from airflow.providers.openlineage.operators.empty import EmptyOperator

55

56

# OpenLineage-aware empty operator

57

empty_task = EmptyOperator(

58

task_id='openlineage_empty',

59

dag=dag

60

)

61

```

62

63

### Custom Lineage Extraction

64

65

```python

66

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage

67

from openlineage.client.event_v2 import Dataset

68

69

class CustomExtractor(BaseExtractor):

70

@classmethod

71

def get_operator_classnames(cls):

72

return ['MyCustomOperator']

73

74

def extract(self):

75

inputs = [Dataset(namespace="example", name="input_table")]

76

outputs = [Dataset(namespace="example", name="output_table")]

77

return OperatorLineage(inputs=inputs, outputs=outputs)

78

```

79

80

## Architecture

81

82

The OpenLineage provider uses a plugin-based architecture for collecting and emitting lineage events:

83

84

- **Plugin System**: Automatically integrates with Airflow's plugin mechanism to capture DAG and task lifecycle events

85

- **Extractor Framework**: Modular system for extracting lineage metadata from different operator types

86

- **Event Listener**: Captures Airflow events (DAG runs, task instances) and transforms them into OpenLineage events

87

- **Transport Layer**: Configurable transport mechanisms (HTTP, Kafka, File, Console) for sending events to OpenLineage backends

88

- **Facet System**: Extensible metadata enrichment through custom facets for additional context

89

90

## Capabilities

91

92

### Configuration Management

93

94

Access and control OpenLineage settings, including transport configuration, selective enabling, custom extractors, and debugging options.

95

96

```python { .api }

97

def config_path(check_legacy_env_var: bool = True) -> str: ...

98

def is_source_enabled() -> bool: ...

99

def disabled_operators() -> set[str]: ...

100

def selective_enable() -> bool: ...

101

def spark_inject_parent_job_info() -> bool: ...

102

def spark_inject_transport_info() -> bool: ...

103

def custom_extractors() -> set[str]: ...

104

def custom_run_facets() -> set[str]: ...

105

def namespace() -> str: ...

106

def transport() -> dict[str, Any]: ...

107

def is_disabled() -> bool: ...

108

def dag_state_change_process_pool_size() -> int: ...

109

def execution_timeout() -> int: ...

110

def include_full_task_info() -> bool: ...

111

def debug_mode() -> bool: ...

112

```

113

114

[Configuration](./configuration.md)

115

116

### SQL Parsing and Analysis

117

118

Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information.

119

120

```python { .api }

121

class SQLParser:

122

def __init__(self, dialect: str | None = None, default_schema: str | None = None): ...

123

def parse(self, sql: list[str] | str) -> SqlMeta | None: ...

124

def generate_openlineage_metadata_from_sql(...) -> OperatorLineage: ...

125

126

class DatabaseInfo:

127

scheme: str

128

authority: str | None

129

database: str | None

130

# ... additional configuration attributes

131

```

132

133

[SQL Parsing](./sql-parsing.md)

134

135

### Lineage Extraction Framework

136

137

Extensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, and custom extractor registration.

138

139

```python { .api }

140

class BaseExtractor:

141

def __init__(self, operator): ...

142

def extract() -> OperatorLineage | None: ...

143

def extract_on_complete(task_instance) -> OperatorLineage | None: ...

144

def extract_on_failure(task_instance) -> OperatorLineage | None: ...

145

146

class ExtractorManager:

147

def __init__(self): ...

148

def add_extractor(operator_class: str, extractor: type[BaseExtractor]): ...

149

def extract_metadata(dagrun, task, task_instance_state, task_instance=None) -> OperatorLineage: ...

150

151

class OperatorLineage:

152

inputs: list[Dataset]

153

outputs: list[Dataset]

154

run_facets: dict[str, BaseFacet]

155

job_facets: dict[str, BaseFacet]

156

```

157

158

[Lineage Extraction](./lineage-extraction.md)

159

160

### OpenLineage Plugin Integration

161

162

Core plugin components for Airflow integration, including event listeners, adapters, and automatic event emission.

163

164

```python { .api }

165

class OpenLineageAdapter:

166

def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None): ...

167

def emit(event: RunEvent) -> RunEvent: ...

168

def start_task(...) -> RunEvent: ...

169

def complete_task(...) -> RunEvent: ...

170

def fail_task(...) -> RunEvent: ...

171

172

class OpenLineageListener:

173

# Event listener methods for DAG and task lifecycle

174

pass

175

176

def get_openlineage_listener() -> OpenLineageListener: ...

177

```

178

179

[Plugin Integration](./plugin-integration.md)

180

181

### Facets and Metadata Enrichment

182

183

Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, and debug data.

184

185

```python { .api }

186

class AirflowRunFacet:

187

dag: dict

188

dagRun: dict

189

task: dict

190

taskInstance: dict

191

taskUuid: str

192

193

class AirflowJobFacet:

194

taskTree: dict

195

taskGroups: dict

196

tasks: dict

197

198

class AirflowStateRunFacet:

199

dagRunState: str

200

tasksState: dict[str, str]

201

202

class AirflowDebugRunFacet:

203

packages: dict

204

```

205

206

[Facets and Metadata](./facets-metadata.md)

207

208

### Template Macros

209

210

Template macros for accessing OpenLineage information within DAG definitions and task templates.

211

212

```python { .api }

213

def lineage_job_namespace() -> str: ...

214

def lineage_job_name(task_instance: TaskInstance) -> str: ...

215

def lineage_run_id(task_instance: TaskInstance) -> str: ...

216

def lineage_parent_id(task_instance: TaskInstance) -> str: ...

217

def lineage_root_parent_id(task_instance: TaskInstance) -> str: ...

218

def lineage_root_job_name(task_instance: TaskInstance) -> str: ...

219

def lineage_root_run_id(task_instance: TaskInstance) -> str: ...

220

```

221

222

[Template Macros](./template-macros.md)

223

224

### Selective Lineage Control

225

226

Utilities for fine-grained control over lineage collection, allowing selective enabling/disabling at DAG and task levels.

227

228

```python { .api }

229

def enable_lineage(obj: T) -> T: ...

230

def disable_lineage(obj: T) -> T: ...

231

def is_task_lineage_enabled(task: BaseOperator | MappedOperator) -> bool: ...

232

def is_dag_lineage_enabled(dag: DAG) -> bool: ...

233

```

234

235

[Selective Control](./selective-control.md)

236

237

### SQL Utilities

238

239

Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, and information schema querying.

240

241

```python { .api }

242

class TableSchema:

243

def to_dataset(namespace: str, database: str | None = None, schema: str | None = None) -> Dataset: ...

244

245

def get_table_schemas(...) -> tuple[list[Dataset], list[Dataset]]: ...

246

def parse_query_result(cursor) -> list[TableSchema]: ...

247

def create_information_schema_query(...) -> str: ...

248

```

249

250

[SQL Utilities](./sql-utilities.md)

251

252

### Spark Integration

253

254

Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties.

255

256

```python { .api }

257

def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict: ...

258

def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict: ...

259

```

260

261

[Spark Integration](./spark-integration.md)

262

263

### Utility Functions and Helpers

264

265

General utility functions for working with OpenLineage data, including operator analysis, documentation extraction, and data conversion.

266

267

```python { .api }

268

def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str: ...

269

def get_operator_class(task: BaseOperator) -> type: ...

270

def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool: ...

271

def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str: ...

272

def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset | None: ...

273

```

274

275

[Utility Functions](./utility-functions.md)

276

277

## Common Use Cases

278

279

### Setting Up Transport Configuration

280

281

```python

282

# In airflow.cfg or environment variables

283

[openlineage]

284

transport = {"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}

285

namespace = my_airflow_instance

286

```

287

288

### Selective Lineage Enabling

289

290

```python

291

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

292

293

# Enable for specific DAG

294

dag = enable_lineage(DAG(...))

295

296

# Disable for specific task

297

task = disable_lineage(PythonOperator(...))

298

```

299

300

### Custom Extractor Registration

301

302

```python

303

# In airflow.cfg

304

[openlineage]

305

extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractor

306

```