Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Extensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, custom extractor registration, and centralized management. This framework enables automatic discovery of data flows and transformations across different operator types.
Core classes that define the extraction interface and data structures for lineage metadata.
class OperatorLineage:
"""
Generic container for lineage data including inputs, outputs, and metadata facets.
"""
inputs: list[Dataset] # Input datasets consumed by the operation
outputs: list[Dataset] # Output datasets produced by the operation
run_facets: dict[str, BaseFacet] # Runtime metadata facets
job_facets: dict[str, BaseFacet] # Job-level metadata facets
class BaseExtractor:
"""
Abstract base class for implementing custom lineage extractors.
"""
def __init__(self, operator):
"""
Initialize extractor with operator instance.
Args:
operator: Airflow operator instance to extract lineage from
"""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""
Return list of operator class names this extractor handles.
Returns:
list[str]: Fully qualified operator class names
"""
def extract(self) -> OperatorLineage | None:
"""
Extract lineage metadata for task start events.
Returns:
OperatorLineage: Extracted lineage metadata or None if no extraction possible
"""
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""
Extract lineage metadata for task completion events.
Args:
task_instance: Completed task instance
Returns:
OperatorLineage: Extracted lineage metadata or None
"""
def extract_on_failure(self, task_instance) -> OperatorLineage | None:
"""
Extract lineage metadata for task failure events.
Args:
task_instance: Failed task instance
Returns:
OperatorLineage: Extracted lineage metadata or None
"""
class DefaultExtractor(BaseExtractor):
"""
Default implementation of BaseExtractor that provides fallback extraction.
Uses operator's built-in OpenLineage methods if available, otherwise
attempts to extract basic lineage from operator properties.
"""Central management system for registering extractors and coordinating lineage extraction.
class ExtractorManager:
"""
Central manager for registering and executing lineage extractors.
"""
def __init__(self):
"""Initialize extractor manager with built-in extractors."""
def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
"""
Register a custom extractor for specific operator class.
Args:
operator_class: Fully qualified operator class name
extractor: Extractor class to handle the operator
"""
def extract_metadata(
self,
dagrun,
task,
task_instance_state: TaskInstanceState,
task_instance=None
) -> OperatorLineage:
"""
Extract lineage metadata for a task using appropriate extractor.
Args:
dagrun: DAG run instance
task: Task/operator instance
task_instance_state: Current task instance state
task_instance: Task instance (optional)
Returns:
OperatorLineage: Extracted lineage metadata
"""
def get_extractor_class(self, task: Operator) -> type[BaseExtractor] | None:
"""
Get appropriate extractor class for a task.
Args:
task: Airflow task/operator
Returns:
type[BaseExtractor]: Extractor class or None if no match
"""
def extract_inlets_and_outlets(self, task_metadata: OperatorLineage, task):
"""
Extract additional lineage from task's inlets and outlets properties.
Args:
task_metadata: Existing task lineage metadata
task: Airflow task instance
"""
def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None:
"""
Extract lineage from database hooks if available.
Returns:
tuple: (input_datasets, output_datasets) or None
"""
@staticmethod
def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None:
"""
Convert object storage URI to OpenLineage Dataset.
Args:
uri: Object storage URI (s3://, gcs://, etc.)
Returns:
Dataset: OpenLineage dataset or None if conversion fails
"""
@staticmethod
def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
"""
Convert Airflow Table object to OpenLineage Dataset.
Args:
table: Airflow Table instance
Returns:
Dataset: OpenLineage dataset representation
"""
@staticmethod
def convert_to_ol_dataset(obj) -> Dataset | None:
"""
Convert various object types to OpenLineage Dataset.
Args:
obj: Object to convert (URI string, Table, etc.)
Returns:
Dataset: OpenLineage dataset or None if conversion not supported
"""
def validate_task_metadata(self, task_metadata) -> OperatorLineage | None:
"""
Validate and normalize task metadata.
Args:
task_metadata: Raw task metadata to validate
Returns:
OperatorLineage: Validated metadata or None if invalid
"""Pre-built extractors for common Airflow operator types.
class BashExtractor(BaseExtractor):
"""
Extractor for BashOperator tasks that captures command execution metadata.
Extracts command text, working directory, and environment variables
when source code inclusion is enabled.
"""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Returns: ['airflow.operators.bash.BashOperator']"""
class PythonExtractor(BaseExtractor):
"""
Extractor for PythonOperator tasks that captures function execution metadata.
Extracts function source code, callable information, and context variables
when source code inclusion is enabled.
"""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Returns: ['airflow.operators.python.PythonOperator']"""Method name constants for operator-level OpenLineage integration.
OL_METHOD_NAME_START: str = "get_openlineage_facets_on_start"
"""Method name for start event lineage extraction in operators."""
OL_METHOD_NAME_COMPLETE: str = "get_openlineage_facets_on_complete"
"""Method name for completion event lineage extraction in operators."""
OL_METHOD_NAME_FAIL: str = "get_openlineage_facets_on_failure"
"""Method name for failure event lineage extraction in operators."""from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from my_package.operators import CustomDataOperator
class CustomDataExtractor(BaseExtractor):
"""Custom extractor for CustomDataOperator."""
@classmethod
def get_operator_classnames(cls):
return ['my_package.operators.CustomDataOperator']
def extract(self):
# Extract lineage from operator properties
operator = self._operator
inputs = []
if hasattr(operator, 'input_path'):
inputs.append(Dataset(
namespace='file',
name=operator.input_path
))
outputs = []
if hasattr(operator, 'output_path'):
outputs.append(Dataset(
namespace='file',
name=operator.output_path
))
return OperatorLineage(
inputs=inputs,
outputs=outputs,
run_facets={},
job_facets={}
)
# Register custom extractor
from airflow.providers.openlineage.extractors.manager import ExtractorManager
manager = ExtractorManager()
manager.add_extractor('my_package.operators.CustomDataOperator', CustomDataExtractor)from airflow.providers.openlineage.extractors.manager import ExtractorManager
from airflow.operators.python import PythonOperator
from airflow.utils.state import TaskInstanceState
# Initialize manager
manager = ExtractorManager()
# Create sample task
def my_function():
return "Hello World"
task = PythonOperator(
task_id='python_task',
python_callable=my_function,
dag=dag
)
# Extract lineage metadata
lineage = manager.extract_metadata(
dagrun=dag_run,
task=task,
task_instance_state=TaskInstanceState.RUNNING,
task_instance=task_instance
)
print(f"Inputs: {lineage.inputs}")
print(f"Outputs: {lineage.outputs}")
print(f"Run facets: {lineage.run_facets}")from airflow.providers.openlineage.extractors.manager import ExtractorManager
from airflow.models import Table
# Convert S3 URI to dataset
s3_uri = "s3://my-bucket/data/users.parquet"
s3_dataset = ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(s3_uri)
print(f"S3 Dataset: {s3_dataset}")
# Convert Airflow Table to dataset
table = Table(
name='users',
schema='public',
database='analytics'
)
table_dataset = ExtractorManager.convert_to_ol_dataset_from_table(table)
print(f"Table Dataset: {table_dataset}")
# Generic conversion
mixed_objects = [
"s3://bucket/file.csv",
table,
{"name": "custom_dataset"}
]
datasets = [
ExtractorManager.convert_to_ol_dataset(obj)
for obj in mixed_objects
if ExtractorManager.convert_to_ol_dataset(obj) is not None
]
print(f"Converted datasets: {datasets}")Custom operators can implement OpenLineage methods directly:
from airflow import BaseOperator
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyCustomOperator(BaseOperator):
def __init__(self, input_file: str, output_file: str, **kwargs):
super().__init__(**kwargs)
self.input_file = input_file
self.output_file = output_file
def execute(self, context):
# Operator logic here
pass
def get_openlineage_facets_on_start(self) -> OperatorLineage:
"""Called when task starts."""
return OperatorLineage(
inputs=[Dataset(namespace='file', name=self.input_file)],
outputs=[Dataset(namespace='file', name=self.output_file)],
run_facets={},
job_facets={}
)
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
"""Called when task completes successfully."""
# Can include actual file sizes, row counts, etc.
return OperatorLineage(
inputs=[Dataset(namespace='file', name=self.input_file)],
outputs=[Dataset(namespace='file', name=self.output_file)],
run_facets={
'processing_stats': {
'rows_processed': 1000,
'execution_time': task_instance.duration
}
},
job_facets={}
)from airflow.providers.openlineage.extractors.manager import ExtractorManager
# Extract from task's inlets/outlets
task_with_lineage = PythonOperator(
task_id='process_data',
python_callable=my_function,
inlets=[
Dataset(namespace='db', name='raw.users'),
Dataset(namespace='db', name='raw.orders')
],
outlets=[
Dataset(namespace='db', name='analytics.user_metrics')
],
dag=dag
)
manager = ExtractorManager()
base_lineage = OperatorLineage(inputs=[], outputs=[], run_facets={}, job_facets={})
# This will merge inlets/outlets into the lineage
manager.extract_inlets_and_outlets(base_lineage, task_with_lineage)
print(f"Final inputs: {base_lineage.inputs}")
print(f"Final outputs: {base_lineage.outputs}")Register extractors via Airflow configuration:
[openlineage]
extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractorfrom airflow.providers.openlineage.extractors.manager import ExtractorManager
# Get the global manager instance
manager = ExtractorManager()
# Register multiple extractors
extractors = {
'my_package.operators.S3Operator': 'my_package.extractors.S3Extractor',
'my_package.operators.KafkaOperator': 'my_package.extractors.KafkaExtractor',
}
for operator_class, extractor_class in extractors.items():
# Import and register
extractor = __import__(extractor_class, fromlist=[''])
manager.add_extractor(operator_class, extractor)class ConditionalExtractor(BaseExtractor):
def extract(self):
# Only extract if certain conditions are met
if not hasattr(self._operator, 'enable_lineage') or not self._operator.enable_lineage:
return None
# Normal extraction logic
return OperatorLineage(...)class FileExtractor(BaseExtractor):
def extract(self):
file_path = self._operator.file_path
# Determine format and create appropriate metadata
if file_path.endswith('.parquet'):
namespace = 'parquet'
elif file_path.endswith('.csv'):
namespace = 'csv'
else:
namespace = 'file'
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=file_path)],
outputs=[],
run_facets={},
job_facets={}
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage