CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

facets-metadata.mddocs/

Facets and Metadata Enrichment

Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, debug data, and unknown operator handling. Facets provide extensible metadata enrichment for comprehensive lineage tracking.

Capabilities

Airflow Run Facets

Comprehensive runtime metadata facets that capture Airflow-specific execution context.

class AirflowRunFacet:
    """
    Composite run facet containing comprehensive Airflow execution metadata.
    """
    
    dag: dict  # DAG metadata and configuration
    dagRun: dict  # DAG run instance information
    task: dict  # Task/operator metadata
    taskInstance: dict  # Task instance execution details
    taskUuid: str  # Unique task identifier

class AirflowDagRunFacet:
    """
    DAG run specific facet with DAG and run metadata.
    """
    
    dag: dict  # DAG configuration and properties
    dagRun: dict  # DAG run execution details

class AirflowStateRunFacet:
    """
    DAG and task state information facet for tracking execution states.
    """
    
    dagRunState: str  # Current DAG run state
    tasksState: dict[str, str]  # Mapping of task IDs to their current states

Airflow Job Facets

Job-level metadata facets that describe DAG structure and task organization.

class AirflowJobFacet:
    """
    Composite job facet with task tree, groups, and task metadata.
    """
    
    taskTree: dict  # Hierarchical task dependency structure
    taskGroups: dict  # Task group organization and metadata
    tasks: dict  # Individual task metadata and configuration

Mapped Task Facets

Facets specific to dynamic task mapping functionality in Airflow.

class AirflowMappedTaskRunFacet:
    """
    Facet for mapped task information and dynamic task execution metadata.
    """
    
    mapIndex: int  # Index of this mapped task instance
    operatorClass: str  # Fully qualified operator class name
    
    @classmethod
    def from_task_instance(cls, task_instance):
        """
        Create facet from task instance.
        
        Args:
            task_instance: Airflow task instance
            
        Returns:
            AirflowMappedTaskRunFacet: Facet with mapped task metadata
        """

Debug and Development Facets

Facets for debugging, development, and operational visibility.

class AirflowDebugRunFacet:
    """
    Debug information facet with package details and system information.
    
    Includes comprehensive debugging metadata when debug mode is enabled,
    potentially creating large events with detailed system information.
    """
    
    packages: dict  # Installed packages and their versions

class UnknownOperatorInstance:
    """
    Descriptor for unknown or unrecognized operators.
    """
    
    name: str  # Operator name or identifier
    properties: dict[str, object]  # Operator properties and attributes
    type: str  # Operator type classification

class UnknownOperatorAttributeRunFacet:
    """
    Facet for capturing information about unknown or unhandled operators.
    
    Provides visibility into operators that don't have specific extractors
    while still capturing basic metadata for lineage tracking.
    """
    
    unknownItems: list[UnknownOperatorInstance]  # List of unknown operator instances

Usage Examples

Creating Airflow Run Facets

from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
from airflow.models import TaskInstance, DagRun

def create_run_facet(task_instance: TaskInstance, dag_run: DagRun):
    """Create comprehensive Airflow run facet."""
    
    facet = AirflowRunFacet(
        dag={
            'dag_id': dag_run.dag_id,
            'schedule_interval': str(dag_run.dag.schedule_interval),
            'start_date': dag_run.dag.start_date.isoformat(),
            'tags': dag_run.dag.tags,
            'owner': dag_run.dag.owner
        },
        dagRun={
            'run_id': dag_run.run_id,
            'execution_date': dag_run.execution_date.isoformat(),
            'start_date': dag_run.start_date.isoformat() if dag_run.start_date else None,
            'end_date': dag_run.end_date.isoformat() if dag_run.end_date else None,
            'state': dag_run.state,
            'run_type': dag_run.run_type
        },
        task={
            'task_id': task_instance.task_id,
            'operator_class': task_instance.operator.__class__.__name__,
            'pool': task_instance.pool,
            'queue': task_instance.queue,
            'priority_weight': task_instance.priority_weight
        },
        taskInstance={
            'try_number': task_instance.try_number,
            'max_tries': task_instance.max_tries,
            'start_date': task_instance.start_date.isoformat() if task_instance.start_date else None,
            'end_date': task_instance.end_date.isoformat() if task_instance.end_date else None,
            'duration': task_instance.duration,
            'state': task_instance.state,
            'hostname': task_instance.hostname,
            'pid': task_instance.pid
        },
        taskUuid=f"{dag_run.dag_id}.{task_instance.task_id}.{task_instance.execution_date}.{task_instance.try_number}"
    )
    
    return facet

Working with State Facets

from airflow.providers.openlineage.plugins.facets import AirflowStateRunFacet
from airflow.models import DagRun

def create_state_facet(dag_run: DagRun):
    """Create state tracking facet."""
    
    # Get task states from DAG run
    task_states = {}
    for task_instance in dag_run.get_task_instances():
        task_states[task_instance.task_id] = task_instance.state or 'none'
    
    facet = AirflowStateRunFacet(
        dagRunState=dag_run.state or 'none',
        tasksState=task_states
    )
    
    return facet

# Usage in lineage extraction
state_facet = create_state_facet(dag_run)
run_facets = {
    'airflow_state': state_facet
}

Mapped Task Facets

from airflow.providers.openlineage.plugins.facets import AirflowMappedTaskRunFacet
from airflow.models import TaskInstance

def handle_mapped_task(task_instance: TaskInstance):
    """Handle mapped task metadata extraction."""
    
    if hasattr(task_instance, 'map_index') and task_instance.map_index is not None:
        # Create mapped task facet
        mapped_facet = AirflowMappedTaskRunFacet.from_task_instance(task_instance)
        
        print(f"Mapped task index: {mapped_facet.mapIndex}")
        print(f"Operator class: {mapped_facet.operatorClass}")
        
        return {
            'airflow_mapped_task': mapped_facet
        }
    
    return {}

# Example with mapped operator
from airflow.operators.python import PythonOperator

@dag
def mapped_dag():
    def process_item(item):
        return f"Processed {item}"
    
    # Mapped task creates multiple instances
    mapped_task = PythonOperator.partial(
        task_id='process_items',
        python_callable=process_item
    ).expand(op_kwargs=[{'item': i} for i in range(5)])
    
    return mapped_task

# Each mapped instance gets its own facet with map_index

Job Structure Facets

from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
from airflow.models import DAG, DagRun

def create_job_facet(dag: DAG, dag_run: DagRun):
    """Create job facet with DAG structure information."""
    
    # Build task tree
    task_tree = {}
    for task_id, task in dag.task_dict.items():
        task_tree[task_id] = {
            'operator_class': task.__class__.__name__,
            'upstream_task_ids': list(task.upstream_task_ids),
            'downstream_task_ids': list(task.downstream_task_ids)
        }
    
    # Build task groups
    task_groups = {}
    if hasattr(dag, 'task_group_dict'):
        for group_id, group in dag.task_group_dict.items():
            task_groups[group_id] = {
                'tooltip': group.tooltip,
                'ui_color': group.ui_color,
                'ui_fgcolor': group.ui_fgcolor,
                'children': list(group.children.keys())
            }
    
    # Build tasks metadata
    tasks = {}
    for task_id, task in dag.task_dict.items():
        tasks[task_id] = {
            'operator_class': task.__class__.__name__,
            'template_fields': list(task.template_fields) if task.template_fields else [],
            'pool': task.pool,
            'queue': task.queue,
            'retries': task.retries,
            'retry_delay': str(task.retry_delay) if task.retry_delay else None
        }
    
    facet = AirflowJobFacet(
        taskTree=task_tree,
        taskGroups=task_groups,
        tasks=tasks
    )
    
    return facet

Unknown Operator Handling

from airflow.providers.openlineage.plugins.facets import (
    UnknownOperatorInstance, 
    UnknownOperatorAttributeRunFacet
)
from airflow.models import BaseOperator

def handle_unknown_operator(operator: BaseOperator):
    """Handle operators without specific extractors."""
    
    # Extract basic operator properties
    properties = {}
    for attr in dir(operator):
        if not attr.startswith('_') and not callable(getattr(operator, attr)):
            try:
                value = getattr(operator, attr)
                # Only include serializable values
                if isinstance(value, (str, int, float, bool, list, dict)):
                    properties[attr] = value
            except Exception:
                continue  # Skip problematic attributes
    
    # Create unknown operator instance
    unknown_instance = UnknownOperatorInstance(
        name=operator.task_id,
        properties=properties,
        type=operator.__class__.__name__
    )
    
    # Create facet
    facet = UnknownOperatorAttributeRunFacet(
        unknownItems=[unknown_instance]
    )
    
    return {
        'airflow_unknown_operator': facet
    }

# Usage in custom extractor
class GenericExtractor(BaseExtractor):
    def extract(self):
        # Try specific extraction first
        if hasattr(self._operator, 'get_openlineage_facets_on_start'):
            return self._operator.get_openlineage_facets_on_start()
        
        # Fall back to unknown operator handling
        unknown_facets = handle_unknown_operator(self._operator)
        
        return OperatorLineage(
            inputs=[],
            outputs=[],
            run_facets=unknown_facets,
            job_facets={}
        )

Debug Facets

from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
import pkg_resources

def create_debug_facet():
    """Create debug facet with system information."""
    
    # Get installed packages
    packages = {}
    for dist in pkg_resources.working_set:
        packages[dist.project_name] = dist.version
    
    facet = AirflowDebugRunFacet(
        packages=packages
    )
    
    return facet

# Usage with configuration check
from airflow.providers.openlineage.conf import debug_mode

def get_debug_facets():
    """Get debug facets if debug mode is enabled."""
    
    if debug_mode():
        return {
            'airflow_debug': create_debug_facet()
        }
    return {}

Integration with Lineage Extraction

Using Facets in Extractors

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import AirflowRunFacet

class CustomExtractor(BaseExtractor):
    def extract(self):
        # Basic lineage
        lineage = OperatorLineage(
            inputs=[],
            outputs=[],
            run_facets={},
            job_facets={}
        )
        
        # Add Airflow-specific facets
        if hasattr(self, '_task_instance') and hasattr(self, '_dag_run'):
            airflow_facet = create_run_facet(self._task_instance, self._dag_run)
            lineage.run_facets['airflow'] = airflow_facet
        
        return lineage

Facet Utilities

from airflow.providers.openlineage.utils.utils import (
    get_airflow_run_facet,
    get_airflow_job_facet,
    get_airflow_state_run_facet,
    get_airflow_debug_facet
)

def get_comprehensive_facets(task_instance, dag_run, dag):
    """Get all available Airflow facets."""
    
    facets = {}
    
    # Add run facet
    run_facet = get_airflow_run_facet(
        dag_run=dag_run,
        dag=dag,
        task_instance=task_instance,
        task=task_instance.task,
        task_uuid=f"{dag_run.dag_id}.{task_instance.task_id}"
    )
    facets.update(run_facet)
    
    # Add job facet
    job_facet = get_airflow_job_facet(dag_run)
    facets.update(job_facet)
    
    # Add state facet
    state_facet = get_airflow_state_run_facet(dag_run, dag, task_instance)
    facets.update(state_facet)
    
    # Add debug facet if enabled
    debug_facet = get_airflow_debug_facet()
    facets.update(debug_facet)
    
    return facets

Custom Facet Development

Creating Custom Facets

from openlineage.client.facet import RunFacet
from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class CustomProcessingRunFacet(RunFacet):
    """Custom facet for processing statistics."""
    
    records_processed: int
    processing_time_seconds: float
    memory_usage_mb: float
    error_count: int
    warnings: list[str]
    
    @staticmethod 
    def _get_schema() -> str:
        return "https://my-company.com/schemas/custom_processing_run_facet.json"

# Usage in extractor
def extract_with_custom_facet(self):
    # ... extraction logic ...
    
    custom_facet = CustomProcessingRunFacet(
        records_processed=1000,
        processing_time_seconds=45.2,
        memory_usage_mb=128,
        error_count=0,
        warnings=[]
    )
    
    return OperatorLineage(
        inputs=inputs,
        outputs=outputs,
        run_facets={
            'custom_processing': custom_facet
        },
        job_facets={}
    )

Facet Registration

# Register custom facets via configuration
# In airflow.cfg:
[openlineage]
custom_run_facets = my_package.facets.custom_processing_facet;my_package.facets.performance_facet

# The functions should return dict[str, RunFacet]
def custom_processing_facet(task_instance, **kwargs):
    return {
        'custom_processing': CustomProcessingRunFacet(...)
    }

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json