CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

lineage-extraction.mddocs/

Lineage Extraction Framework

Extensible framework for extracting lineage metadata from Airflow operators, including base classes, built-in extractors, custom extractor registration, and centralized management. This framework enables automatic discovery of data flows and transformations across different operator types.

Capabilities

Base Extractor Classes

Core classes that define the extraction interface and data structures for lineage metadata.

class OperatorLineage:
    """
    Generic container for lineage data including inputs, outputs, and metadata facets.
    """
    
    inputs: list[Dataset]  # Input datasets consumed by the operation
    outputs: list[Dataset]  # Output datasets produced by the operation  
    run_facets: dict[str, BaseFacet]  # Runtime metadata facets
    job_facets: dict[str, BaseFacet]  # Job-level metadata facets

class BaseExtractor:
    """
    Abstract base class for implementing custom lineage extractors.
    """
    
    def __init__(self, operator):
        """
        Initialize extractor with operator instance.
        
        Args:
            operator: Airflow operator instance to extract lineage from
        """
    
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """
        Return list of operator class names this extractor handles.
        
        Returns:
            list[str]: Fully qualified operator class names
        """
    
    def extract(self) -> OperatorLineage | None:
        """
        Extract lineage metadata for task start events.
        
        Returns:
            OperatorLineage: Extracted lineage metadata or None if no extraction possible
        """
    
    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """
        Extract lineage metadata for task completion events.
        
        Args:
            task_instance: Completed task instance
            
        Returns:
            OperatorLineage: Extracted lineage metadata or None
        """
    
    def extract_on_failure(self, task_instance) -> OperatorLineage | None:
        """
        Extract lineage metadata for task failure events.
        
        Args:
            task_instance: Failed task instance
            
        Returns:
            OperatorLineage: Extracted lineage metadata or None
        """

class DefaultExtractor(BaseExtractor):
    """
    Default implementation of BaseExtractor that provides fallback extraction.
    
    Uses operator's built-in OpenLineage methods if available, otherwise
    attempts to extract basic lineage from operator properties.
    """

Extractor Manager

Central management system for registering extractors and coordinating lineage extraction.

class ExtractorManager:
    """
    Central manager for registering and executing lineage extractors.
    """
    
    def __init__(self):
        """Initialize extractor manager with built-in extractors."""
    
    def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
        """
        Register a custom extractor for specific operator class.
        
        Args:
            operator_class: Fully qualified operator class name
            extractor: Extractor class to handle the operator
        """
    
    def extract_metadata(
        self,
        dagrun,
        task,
        task_instance_state: TaskInstanceState,
        task_instance=None
    ) -> OperatorLineage:
        """
        Extract lineage metadata for a task using appropriate extractor.
        
        Args:
            dagrun: DAG run instance
            task: Task/operator instance
            task_instance_state: Current task instance state
            task_instance: Task instance (optional)
            
        Returns:
            OperatorLineage: Extracted lineage metadata
        """
    
    def get_extractor_class(self, task: Operator) -> type[BaseExtractor] | None:
        """
        Get appropriate extractor class for a task.
        
        Args:
            task: Airflow task/operator
            
        Returns:
            type[BaseExtractor]: Extractor class or None if no match
        """
    
    def extract_inlets_and_outlets(self, task_metadata: OperatorLineage, task):
        """
        Extract additional lineage from task's inlets and outlets properties.
        
        Args:
            task_metadata: Existing task lineage metadata
            task: Airflow task instance
        """
    
    def get_hook_lineage(self) -> tuple[list[Dataset], list[Dataset]] | None:
        """
        Extract lineage from database hooks if available.
        
        Returns:
            tuple: (input_datasets, output_datasets) or None
        """
    
    @staticmethod
    def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | None:
        """
        Convert object storage URI to OpenLineage Dataset.
        
        Args:
            uri: Object storage URI (s3://, gcs://, etc.)
            
        Returns:
            Dataset: OpenLineage dataset or None if conversion fails
        """
    
    @staticmethod  
    def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
        """
        Convert Airflow Table object to OpenLineage Dataset.
        
        Args:
            table: Airflow Table instance
            
        Returns:
            Dataset: OpenLineage dataset representation
        """
    
    @staticmethod
    def convert_to_ol_dataset(obj) -> Dataset | None:
        """
        Convert various object types to OpenLineage Dataset.
        
        Args:
            obj: Object to convert (URI string, Table, etc.)
            
        Returns:
            Dataset: OpenLineage dataset or None if conversion not supported
        """
    
    def validate_task_metadata(self, task_metadata) -> OperatorLineage | None:
        """
        Validate and normalize task metadata.
        
        Args:
            task_metadata: Raw task metadata to validate
            
        Returns:
            OperatorLineage: Validated metadata or None if invalid
        """

Built-in Extractors

Pre-built extractors for common Airflow operator types.

class BashExtractor(BaseExtractor):
    """
    Extractor for BashOperator tasks that captures command execution metadata.
    
    Extracts command text, working directory, and environment variables
    when source code inclusion is enabled.
    """
    
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Returns: ['airflow.operators.bash.BashOperator']"""

class PythonExtractor(BaseExtractor):
    """
    Extractor for PythonOperator tasks that captures function execution metadata.
    
    Extracts function source code, callable information, and context variables
    when source code inclusion is enabled.
    """
    
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Returns: ['airflow.operators.python.PythonOperator']"""

Extraction Constants

Method name constants for operator-level OpenLineage integration.

OL_METHOD_NAME_START: str = "get_openlineage_facets_on_start"
"""Method name for start event lineage extraction in operators."""

OL_METHOD_NAME_COMPLETE: str = "get_openlineage_facets_on_complete" 
"""Method name for completion event lineage extraction in operators."""

OL_METHOD_NAME_FAIL: str = "get_openlineage_facets_on_failure"
"""Method name for failure event lineage extraction in operators."""

Usage Examples

Creating Custom Extractor

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from my_package.operators import CustomDataOperator

class CustomDataExtractor(BaseExtractor):
    """Custom extractor for CustomDataOperator."""
    
    @classmethod
    def get_operator_classnames(cls):
        return ['my_package.operators.CustomDataOperator']
    
    def extract(self):
        # Extract lineage from operator properties
        operator = self._operator
        
        inputs = []
        if hasattr(operator, 'input_path'):
            inputs.append(Dataset(
                namespace='file',
                name=operator.input_path
            ))
        
        outputs = []
        if hasattr(operator, 'output_path'):
            outputs.append(Dataset(
                namespace='file',
                name=operator.output_path
            ))
        
        return OperatorLineage(
            inputs=inputs,
            outputs=outputs,
            run_facets={},
            job_facets={}
        )

# Register custom extractor
from airflow.providers.openlineage.extractors.manager import ExtractorManager

manager = ExtractorManager()
manager.add_extractor('my_package.operators.CustomDataOperator', CustomDataExtractor)

Using Extractor Manager

from airflow.providers.openlineage.extractors.manager import ExtractorManager
from airflow.operators.python import PythonOperator
from airflow.utils.state import TaskInstanceState

# Initialize manager
manager = ExtractorManager()

# Create sample task
def my_function():
    return "Hello World"

task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    dag=dag
)

# Extract lineage metadata
lineage = manager.extract_metadata(
    dagrun=dag_run,
    task=task,
    task_instance_state=TaskInstanceState.RUNNING,
    task_instance=task_instance
)

print(f"Inputs: {lineage.inputs}")
print(f"Outputs: {lineage.outputs}")
print(f"Run facets: {lineage.run_facets}")

Dataset Conversion Utilities

from airflow.providers.openlineage.extractors.manager import ExtractorManager
from airflow.models import Table

# Convert S3 URI to dataset
s3_uri = "s3://my-bucket/data/users.parquet"
s3_dataset = ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(s3_uri)
print(f"S3 Dataset: {s3_dataset}")

# Convert Airflow Table to dataset
table = Table(
    name='users',
    schema='public',
    database='analytics'
)
table_dataset = ExtractorManager.convert_to_ol_dataset_from_table(table)
print(f"Table Dataset: {table_dataset}")

# Generic conversion
mixed_objects = [
    "s3://bucket/file.csv",
    table,
    {"name": "custom_dataset"}
]

datasets = [
    ExtractorManager.convert_to_ol_dataset(obj) 
    for obj in mixed_objects
    if ExtractorManager.convert_to_ol_dataset(obj) is not None
]
print(f"Converted datasets: {datasets}")

Operator Integration

Custom operators can implement OpenLineage methods directly:

from airflow import BaseOperator
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset

class MyCustomOperator(BaseOperator):
    def __init__(self, input_file: str, output_file: str, **kwargs):
        super().__init__(**kwargs)
        self.input_file = input_file
        self.output_file = output_file
    
    def execute(self, context):
        # Operator logic here
        pass
    
    def get_openlineage_facets_on_start(self) -> OperatorLineage:
        """Called when task starts."""
        return OperatorLineage(
            inputs=[Dataset(namespace='file', name=self.input_file)],
            outputs=[Dataset(namespace='file', name=self.output_file)],
            run_facets={},
            job_facets={}
        )
    
    def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
        """Called when task completes successfully."""
        # Can include actual file sizes, row counts, etc.
        return OperatorLineage(
            inputs=[Dataset(namespace='file', name=self.input_file)],
            outputs=[Dataset(namespace='file', name=self.output_file)],
            run_facets={
                'processing_stats': {
                    'rows_processed': 1000,
                    'execution_time': task_instance.duration
                }
            },
            job_facets={}
        )

Extracting from Task Properties

from airflow.providers.openlineage.extractors.manager import ExtractorManager

# Extract from task's inlets/outlets
task_with_lineage = PythonOperator(
    task_id='process_data',
    python_callable=my_function,
    inlets=[
        Dataset(namespace='db', name='raw.users'),
        Dataset(namespace='db', name='raw.orders')
    ],
    outlets=[
        Dataset(namespace='db', name='analytics.user_metrics')
    ],
    dag=dag
)

manager = ExtractorManager()
base_lineage = OperatorLineage(inputs=[], outputs=[], run_facets={}, job_facets={})

# This will merge inlets/outlets into the lineage
manager.extract_inlets_and_outlets(base_lineage, task_with_lineage)

print(f"Final inputs: {base_lineage.inputs}")
print(f"Final outputs: {base_lineage.outputs}")

Extractor Registration

Configuration-based Registration

Register extractors via Airflow configuration:

[openlineage]
extractors = my_package.extractors.CustomSQLExtractor;my_package.extractors.KafkaExtractor

Programmatic Registration

from airflow.providers.openlineage.extractors.manager import ExtractorManager

# Get the global manager instance
manager = ExtractorManager()

# Register multiple extractors
extractors = {
    'my_package.operators.S3Operator': 'my_package.extractors.S3Extractor',
    'my_package.operators.KafkaOperator': 'my_package.extractors.KafkaExtractor',
}

for operator_class, extractor_class in extractors.items():
    # Import and register
    extractor = __import__(extractor_class, fromlist=[''])
    manager.add_extractor(operator_class, extractor)

Advanced Patterns

Conditional Extraction

class ConditionalExtractor(BaseExtractor):
    def extract(self):
        # Only extract if certain conditions are met
        if not hasattr(self._operator, 'enable_lineage') or not self._operator.enable_lineage:
            return None
            
        # Normal extraction logic
        return OperatorLineage(...)

Multi-format Support

class FileExtractor(BaseExtractor):
    def extract(self):
        file_path = self._operator.file_path
        
        # Determine format and create appropriate metadata
        if file_path.endswith('.parquet'):
            namespace = 'parquet'
        elif file_path.endswith('.csv'):
            namespace = 'csv'
        else:
            namespace = 'file'
            
        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=file_path)],
            outputs=[],
            run_facets={},
            job_facets={}
        )

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json