Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-openlineage@2.6.0A comprehensive provider package for Apache Airflow that enables OpenLineage data lineage tracking and observability for data pipelines. This provider integrates with the OpenLineage ecosystem to automatically collect and emit metadata about data transformations, job executions, and data flows across various data processing engines and databases.
pip install apache-airflow-providers-openlineagefrom airflow.providers.openlineage import __version__Configuration access:
from airflow.providers.openlineage.conf import (
is_disabled, namespace, transport, selective_enable, custom_extractors
)Plugin integration (automatic via Airflow):
# Automatically loaded when provider is installed
# Plugin class: airflow.providers.openlineage.plugins.openlineage.OpenLineageProviderPluginfrom airflow import DAG
from airflow.providers.openlineage.utils.selective_enable import enable_lineage
from airflow.operators.empty import EmptyOperator
from datetime import datetime
# Enable lineage for entire DAG
dag = enable_lineage(DAG(
'example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
))
# Tasks automatically emit lineage events
task = EmptyOperator(task_id='example_task', dag=dag)from airflow.providers.openlineage.operators.empty import EmptyOperator
# OpenLineage-aware empty operator
empty_task = EmptyOperator(
task_id='openlineage_empty',
dag=dag
)from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class CustomExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls):
return ['MyCustomOperator']
def extract(self):
inputs = [Dataset(namespace="example", name="input_table")]
outputs = [Dataset(namespace="example", name="output_table")]
return OperatorLineage(inputs=inputs, outputs=outputs)The OpenLineage provider uses a plugin-based architecture for collecting and emitting lineage events:
Access and control OpenLineage settings, including transport configuration, selective enabling, custom extractors, and debugging options.
def config_path(check_legacy_env_var: bool = True) -> str: ...
def is_source_enabled() -> bool: ...
def disabled_operators() -> set[str]: ...
def selective_enable() -> bool: ...
def spark_inject_parent_job_info() -> bool: ...
def spark_inject_transport_info() -> bool: ...
def custom_extractors() -> set[str]: ...
def custom_run_facets() -> set[str]: ...
def namespace() -> str: ...
def transport() -> dict[str, Any]: ...
def is_disabled() -> bool: ...
def dag_state_change_process_pool_size() -> int: ...
def execution_timeout() -> int: ...
def include_full_task_info() -> bool: ...
def debug_mode() -> bool: ...Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information.
class SQLParser:
def __init__(self, dialect: str | None = None, default_schema: str | None = None): ...
def parse(self, sql: list[str] | str) -> SqlMeta | None: ...
def generate_openlineage_metadata_from_sql(...) -> OperatorLineage: ...
class DatabaseInfo:
scheme: str
authority: str | None
database: str | None
# ... additional configuration attributesExtensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, and custom extractor registration.
class BaseExtractor:
def __init__(self, operator): ...
def extract() -> OperatorLineage | None: ...
def extract_on_complete(task_instance) -> OperatorLineage | None: ...
def extract_on_failure(task_instance) -> OperatorLineage | None: ...
class ExtractorManager:
def __init__(self): ...
def add_extractor(operator_class: str, extractor: type[BaseExtractor]): ...
def extract_metadata(dagrun, task, task_instance_state, task_instance=None) -> OperatorLineage: ...
class OperatorLineage:
inputs: list[Dataset]
outputs: list[Dataset]
run_facets: dict[str, BaseFacet]
job_facets: dict[str, BaseFacet]Core plugin components for Airflow integration, including event listeners, adapters, and automatic event emission.
class OpenLineageAdapter:
def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None): ...
def emit(event: RunEvent) -> RunEvent: ...
def start_task(...) -> RunEvent: ...
def complete_task(...) -> RunEvent: ...
def fail_task(...) -> RunEvent: ...
class OpenLineageListener:
# Event listener methods for DAG and task lifecycle
pass
def get_openlineage_listener() -> OpenLineageListener: ...Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, and debug data.
class AirflowRunFacet:
dag: dict
dagRun: dict
task: dict
taskInstance: dict
taskUuid: str
class AirflowJobFacet:
taskTree: dict
taskGroups: dict
tasks: dict
class AirflowStateRunFacet:
dagRunState: str
tasksState: dict[str, str]
class AirflowDebugRunFacet:
packages: dictTemplate macros for accessing OpenLineage information within DAG definitions and task templates.
def lineage_job_namespace() -> str: ...
def lineage_job_name(task_instance: TaskInstance) -> str: ...
def lineage_run_id(task_instance: TaskInstance) -> str: ...
def lineage_parent_id(task_instance: TaskInstance) -> str: ...
def lineage_root_parent_id(task_instance: TaskInstance) -> str: ...
def lineage_root_job_name(task_instance: TaskInstance) -> str: ...
def lineage_root_run_id(task_instance: TaskInstance) -> str: ...Utilities for fine-grained control over lineage collection, allowing selective enabling/disabling at DAG and task levels.
def enable_lineage(obj: T) -> T: ...
def disable_lineage(obj: T) -> T: ...
def is_task_lineage_enabled(task: BaseOperator | MappedOperator) -> bool: ...
def is_dag_lineage_enabled(dag: DAG) -> bool: ...Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, and information schema querying.
class TableSchema:
def to_dataset(namespace: str, database: str | None = None, schema: str | None = None) -> Dataset: ...
def get_table_schemas(...) -> tuple[list[Dataset], list[Dataset]]: ...
def parse_query_result(cursor) -> list[TableSchema]: ...
def create_information_schema_query(...) -> str: ...Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties.
def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict: ...
def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict: ...General utility functions for working with OpenLineage data, including operator analysis, documentation extraction, and data conversion.
def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str: ...
def get_operator_class(task: BaseOperator) -> type: ...
def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool: ...
def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str: ...
def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset | None: ...# In airflow.cfg or environment variables
[openlineage]
transport = {"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}
namespace = my_airflow_instancefrom airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
# Enable for specific DAG
dag = enable_lineage(DAG(...))
# Disable for specific task
task = disable_lineage(PythonOperator(...))# In airflow.cfg
[openlineage]
extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractor