or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md
tile.json

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-openlineage@2.6.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-openlineage@2.6.0

index.mddocs/

Apache Airflow OpenLineage Provider

A comprehensive provider package for Apache Airflow that enables OpenLineage data lineage tracking and observability for data pipelines. This provider integrates with the OpenLineage ecosystem to automatically collect and emit metadata about data transformations, job executions, and data flows across various data processing engines and databases.

Package Information

  • Package Name: apache-airflow-providers-openlineage
  • Language: Python
  • Installation: pip install apache-airflow-providers-openlineage
  • Minimum Airflow Version: 2.10.0+

Core Imports

from airflow.providers.openlineage import __version__

Configuration access:

from airflow.providers.openlineage.conf import (
    is_disabled, namespace, transport, selective_enable, custom_extractors
)

Plugin integration (automatic via Airflow):

# Automatically loaded when provider is installed
# Plugin class: airflow.providers.openlineage.plugins.openlineage.OpenLineageProviderPlugin

Basic Usage

Enabling OpenLineage for DAGs

from airflow import DAG
from airflow.providers.openlineage.utils.selective_enable import enable_lineage
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Enable lineage for entire DAG
dag = enable_lineage(DAG(
    'example_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
))

# Tasks automatically emit lineage events
task = EmptyOperator(task_id='example_task', dag=dag)

Using OpenLineage-aware Operators

from airflow.providers.openlineage.operators.empty import EmptyOperator

# OpenLineage-aware empty operator
empty_task = EmptyOperator(
    task_id='openlineage_empty',
    dag=dag
)

Custom Lineage Extraction

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset

class CustomExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls):
        return ['MyCustomOperator']
    
    def extract(self):
        inputs = [Dataset(namespace="example", name="input_table")]
        outputs = [Dataset(namespace="example", name="output_table")]
        return OperatorLineage(inputs=inputs, outputs=outputs)

Architecture

The OpenLineage provider uses a plugin-based architecture for collecting and emitting lineage events:

  • Plugin System: Automatically integrates with Airflow's plugin mechanism to capture DAG and task lifecycle events
  • Extractor Framework: Modular system for extracting lineage metadata from different operator types
  • Event Listener: Captures Airflow events (DAG runs, task instances) and transforms them into OpenLineage events
  • Transport Layer: Configurable transport mechanisms (HTTP, Kafka, File, Console) for sending events to OpenLineage backends
  • Facet System: Extensible metadata enrichment through custom facets for additional context

Capabilities

Configuration Management

Access and control OpenLineage settings, including transport configuration, selective enabling, custom extractors, and debugging options.

def config_path(check_legacy_env_var: bool = True) -> str: ...
def is_source_enabled() -> bool: ...
def disabled_operators() -> set[str]: ...
def selective_enable() -> bool: ...
def spark_inject_parent_job_info() -> bool: ...
def spark_inject_transport_info() -> bool: ...
def custom_extractors() -> set[str]: ...
def custom_run_facets() -> set[str]: ...
def namespace() -> str: ...
def transport() -> dict[str, Any]: ...
def is_disabled() -> bool: ...
def dag_state_change_process_pool_size() -> int: ...
def execution_timeout() -> int: ...
def include_full_task_info() -> bool: ...
def debug_mode() -> bool: ...

Configuration

SQL Parsing and Analysis

Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information.

class SQLParser:
    def __init__(self, dialect: str | None = None, default_schema: str | None = None): ...
    def parse(self, sql: list[str] | str) -> SqlMeta | None: ...
    def generate_openlineage_metadata_from_sql(...) -> OperatorLineage: ...

class DatabaseInfo:
    scheme: str
    authority: str | None
    database: str | None
    # ... additional configuration attributes

SQL Parsing

Lineage Extraction Framework

Extensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, and custom extractor registration.

class BaseExtractor:
    def __init__(self, operator): ...
    def extract() -> OperatorLineage | None: ...
    def extract_on_complete(task_instance) -> OperatorLineage | None: ...
    def extract_on_failure(task_instance) -> OperatorLineage | None: ...

class ExtractorManager:
    def __init__(self): ...
    def add_extractor(operator_class: str, extractor: type[BaseExtractor]): ...
    def extract_metadata(dagrun, task, task_instance_state, task_instance=None) -> OperatorLineage: ...

class OperatorLineage:
    inputs: list[Dataset]
    outputs: list[Dataset] 
    run_facets: dict[str, BaseFacet]
    job_facets: dict[str, BaseFacet]

Lineage Extraction

OpenLineage Plugin Integration

Core plugin components for Airflow integration, including event listeners, adapters, and automatic event emission.

class OpenLineageAdapter:
    def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None): ...
    def emit(event: RunEvent) -> RunEvent: ...
    def start_task(...) -> RunEvent: ...
    def complete_task(...) -> RunEvent: ...
    def fail_task(...) -> RunEvent: ...

class OpenLineageListener:
    # Event listener methods for DAG and task lifecycle
    pass

def get_openlineage_listener() -> OpenLineageListener: ...

Plugin Integration

Facets and Metadata Enrichment

Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, and debug data.

class AirflowRunFacet:
    dag: dict
    dagRun: dict
    task: dict
    taskInstance: dict
    taskUuid: str

class AirflowJobFacet:
    taskTree: dict
    taskGroups: dict 
    tasks: dict

class AirflowStateRunFacet:
    dagRunState: str
    tasksState: dict[str, str]

class AirflowDebugRunFacet:
    packages: dict

Facets and Metadata

Template Macros

Template macros for accessing OpenLineage information within DAG definitions and task templates.

def lineage_job_namespace() -> str: ...
def lineage_job_name(task_instance: TaskInstance) -> str: ...
def lineage_run_id(task_instance: TaskInstance) -> str: ...
def lineage_parent_id(task_instance: TaskInstance) -> str: ...
def lineage_root_parent_id(task_instance: TaskInstance) -> str: ...
def lineage_root_job_name(task_instance: TaskInstance) -> str: ...
def lineage_root_run_id(task_instance: TaskInstance) -> str: ...

Template Macros

Selective Lineage Control

Utilities for fine-grained control over lineage collection, allowing selective enabling/disabling at DAG and task levels.

def enable_lineage(obj: T) -> T: ...
def disable_lineage(obj: T) -> T: ...
def is_task_lineage_enabled(task: BaseOperator | MappedOperator) -> bool: ...
def is_dag_lineage_enabled(dag: DAG) -> bool: ...

Selective Control

SQL Utilities

Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, and information schema querying.

class TableSchema:
    def to_dataset(namespace: str, database: str | None = None, schema: str | None = None) -> Dataset: ...

def get_table_schemas(...) -> tuple[list[Dataset], list[Dataset]]: ...
def parse_query_result(cursor) -> list[TableSchema]: ...
def create_information_schema_query(...) -> str: ...

SQL Utilities

Spark Integration

Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties.

def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict: ...
def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict: ...

Spark Integration

Utility Functions and Helpers

General utility functions for working with OpenLineage data, including operator analysis, documentation extraction, and data conversion.

def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str: ...
def get_operator_class(task: BaseOperator) -> type: ...
def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool: ...
def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str: ...
def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset | None: ...

Utility Functions

Common Use Cases

Setting Up Transport Configuration

# In airflow.cfg or environment variables
[openlineage]
transport = {"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}
namespace = my_airflow_instance

Selective Lineage Enabling

from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage

# Enable for specific DAG
dag = enable_lineage(DAG(...))

# Disable for specific task
task = disable_lineage(PythonOperator(...))

Custom Extractor Registration

# In airflow.cfg
[openlineage]
extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractor