CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

utility-functions.mddocs/

Utility Functions and Helpers

General utility functions for working with OpenLineage data, including operator analysis, documentation extraction, data conversion, and helper functions for common OpenLineage operations.

Capabilities

Operator Analysis Functions

Functions for analyzing and extracting information from Airflow operators.

def get_operator_class(task: BaseOperator) -> type:
    """
    Get the operator class from a task instance.
    
    Args:
        task: Airflow task/operator instance
        
    Returns:
        type: Operator class type
    """

def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str:
    """
    Get the fully qualified class name of an operator.
    
    Args:
        operator: Airflow operator instance
        
    Returns:
        str: Fully qualified class name (e.g., 'airflow.operators.python.PythonOperator')
    """

def get_operator_provider_version(operator: BaseOperator | MappedOperator) -> str | None:
    """
    Get the provider version for an operator.
    
    Args:
        operator: Airflow operator instance
        
    Returns:
        str | None: Provider version string or None if not available
    """

def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool:
    """
    Check if an operator is disabled for lineage collection.
    
    Args:
        operator: Airflow operator instance
        
    Returns:
        bool: True if operator is disabled for lineage
    """

Task and Job Identification

Functions for generating job names and identifiers for OpenLineage events.

def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str:
    """
    Get the OpenLineage job name for a task instance.
    
    Args:
        task: Task instance
        
    Returns:
        str: Formatted job name for OpenLineage events
    """

Documentation Extraction

Functions for extracting documentation and metadata from DAGs and tasks.

def get_task_documentation(operator: BaseOperator | MappedOperator | None) -> tuple[str | None, str | None]:
    """
    Extract documentation from a task/operator.
    
    Args:
        operator: Airflow operator instance
        
    Returns:
        tuple: (doc_md, description) - documentation markdown and description
    """

def get_dag_documentation(dag: DAG | None) -> tuple[str | None, str | None]:
    """
    Extract documentation from a DAG.
    
    Args:
        dag: Airflow DAG instance
        
    Returns:
        tuple: (doc_md, description) - documentation markdown and description
    """

Facet Generation Functions

Functions for generating Airflow-specific facets and metadata.

def get_task_parent_run_facet(
    task_instance: TaskInstance,
    dag_run: DagRun,
    dag: DAG
) -> dict[str, Any]:
    """
    Get parent run facet information for a task.
    
    Args:
        task_instance: Task instance
        dag_run: DAG run instance
        dag: DAG instance
        
    Returns:
        dict: Parent run facet data
    """

def get_airflow_mapped_task_facet(task_instance: TaskInstance) -> dict[str, Any]:
    """
    Get mapped task facet for dynamic task mapping.
    
    Args:
        task_instance: Task instance (mapped task)
        
    Returns:
        dict: Mapped task facet data
    """

def get_user_provided_run_facets(ti: TaskInstance, ti_state: TaskInstanceState) -> dict[str, RunFacet]:
    """
    Get user-provided custom run facets.
    
    Args:
        ti: Task instance
        ti_state: Task instance state
        
    Returns:
        dict: User-provided run facets
    """

def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, RunFacet]:
    """
    Get Airflow DAG run facet.
    
    Args:
        dag_run: DAG run instance
        
    Returns:
        dict: DAG run facet data
    """

def get_processing_engine_facet() -> dict[str, processing_engine_run.ProcessingEngineRunFacet]:
    """
    Get processing engine facet with Airflow version information.
    
    Returns:
        dict: Processing engine facet
    """

def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
    """
    Get Airflow debug facet with system information.
    
    Returns:
        dict: Debug facet data (only if debug mode is enabled)
    """

def get_airflow_run_facet(
    dag_run: DagRun,
    dag: DAG,
    task_instance: TaskInstance,
    task: BaseOperator,
    task_uuid: str
) -> dict[str, AirflowRunFacet]:
    """
    Get comprehensive Airflow run facet.
    
    Args:
        dag_run: DAG run instance
        dag: DAG instance
        task_instance: Task instance
        task: Task/operator
        task_uuid: Unique task identifier
        
    Returns:
        dict: Comprehensive Airflow run facet
    """

def get_airflow_job_facet(dag_run: DagRun) -> dict[str, AirflowJobFacet]:
    """
    Get Airflow job facet with DAG structure.
    
    Args:
        dag_run: DAG run instance
        
    Returns:
        dict: Airflow job facet data
    """

def get_airflow_state_run_facet(
    dag_run: DagRun,
    dag: DAG,
    task_instance: TaskInstance
) -> dict[str, AirflowStateRunFacet]:
    """
    Get Airflow state run facet.
    
    Args:
        dag_run: DAG run instance
        dag: DAG instance
        task_instance: Task instance
        
    Returns:
        dict: State run facet data
    """

def get_unknown_source_attribute_run_facet(task: BaseOperator, name: str | None = None):
    """
    Get unknown operator facet for unhandled operators.
    
    Args:
        task: Airflow task/operator
        name: Optional name override
        
    Returns:
        dict: Unknown operator facet data
    """

Data Conversion Functions

Functions for converting between different data formats and structures.

def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset | None:
    """
    Convert Airflow Asset to OpenLineage Dataset.
    
    Args:
        asset: Airflow Asset instance
        lineage_context: Lineage extraction context
        
    Returns:
        OpenLineageDataset | None: Converted dataset or None if conversion fails
    """

Configuration and Environment Functions

Functions for checking configuration and environment settings.

def is_selective_lineage_enabled(obj: DAG | BaseOperator | MappedOperator) -> bool:
    """
    Check if selective lineage is enabled for a DAG or task.
    
    Args:
        obj: DAG or task instance
        
    Returns:
        bool: True if selective lineage is enabled
    """

def should_use_external_connection(hook) -> bool:
    """
    Check if external connection should be used for a hook.
    
    Args:
        hook: Database or connection hook
        
    Returns:
        bool: True if external connection should be used
    """

Utility Classes and Helpers

Helper classes for data serialization and processing.

class InfoJsonEncodable:
    """Base class for JSON serializable info objects."""

class DagInfo(InfoJsonEncodable):
    """DAG information encoder for JSON serialization."""

class DagRunInfo(InfoJsonEncodable):
    """DAG run information encoder for JSON serialization."""

class TaskInstanceInfo(InfoJsonEncodable):
    """Task instance information encoder for JSON serialization."""

class AssetInfo(InfoJsonEncodable):
    """Asset information encoder for JSON serialization."""

class TaskInfo(InfoJsonEncodable):
    """Task information encoder for JSON serialization."""

class TaskInfoComplete(InfoJsonEncodable):
    """Complete task information encoder for JSON serialization."""

class TaskGroupInfo(InfoJsonEncodable):
    """Task group information encoder for JSON serialization."""

class OpenLineageRedactor:
    """OpenLineage-specific secrets redactor for sensitive data masking."""

def try_import_from_string(string: str) -> Any:
    """
    Safe import utility with error handling.
    
    Args:
        string: Module or class path to import
        
    Returns:
        Any: Imported object or None if import fails
    """

def print_warning(log):
    """
    Warning decorator function for logging warnings.
    
    Args:
        log: Logger instance
    """

Version Compatibility

Version compatibility constants and utilities.

AIRFLOW_V_3_0_PLUS: bool
"""Boolean flag indicating Airflow 3.0+ compatibility."""

Usage Examples

Operator Analysis

from airflow.providers.openlineage.utils.utils import (
    get_operator_class,
    get_fully_qualified_class_name,
    is_operator_disabled
)
from airflow.operators.python import PythonOperator

def my_function():
    return "Hello World"

task = PythonOperator(
    task_id='example_task',
    python_callable=my_function,
    dag=dag
)

# Analyze operator
operator_class = get_operator_class(task)
class_name = get_fully_qualified_class_name(task)
is_disabled = is_operator_disabled(task)

print(f"Operator class: {operator_class}")
print(f"Class name: {class_name}")  
print(f"Is disabled: {is_disabled}")

Documentation Extraction

from airflow.providers.openlineage.utils.utils import get_task_documentation, get_dag_documentation
from airflow import DAG
from airflow.operators.python import PythonOperator

# DAG with documentation
dag = DAG(
    'documented_dag',
    start_date=datetime(2023, 1, 1),
    description='A well-documented data processing pipeline',
    doc_md="""
    # Data Processing Pipeline
    
    This pipeline processes user data and generates analytics reports.
    """
)

def documented_function():
    """Process user data and return results."""
    return "Processing complete"

# Task with documentation
task = PythonOperator(
    task_id='process_data',
    python_callable=documented_function,
    doc_md='Processes user data using advanced algorithms',
    dag=dag
)

# Extract documentation
dag_doc_md, dag_description = get_dag_documentation(dag)
task_doc_md, task_description = get_task_documentation(task)

print(f"DAG documentation: {dag_doc_md}")
print(f"DAG description: {dag_description}")
print(f"Task documentation: {task_doc_md}")
print(f"Task description: {task_description}")

Facet Generation

from airflow.providers.openlineage.utils.utils import (
    get_airflow_run_facet,
    get_processing_engine_facet,
    get_airflow_debug_facet
)

def generate_comprehensive_facets(dag_run, dag, task_instance, task):
    """Generate comprehensive facets for a task execution."""
    
    # Generate task UUID
    task_uuid = f"{dag_run.dag_id}.{task_instance.task_id}.{task_instance.execution_date}.{task_instance.try_number}"
    
    # Get all facets
    facets = {}
    
    # Airflow run facet
    run_facet = get_airflow_run_facet(dag_run, dag, task_instance, task, task_uuid)
    facets.update(run_facet)
    
    # Processing engine facet
    engine_facet = get_processing_engine_facet()
    facets.update(engine_facet)
    
    # Debug facet (if enabled)
    debug_facet = get_airflow_debug_facet()
    facets.update(debug_facet)
    
    return facets

# Usage
comprehensive_facets = generate_comprehensive_facets(dag_run, dag, task_instance, task)
print(f"Generated facets: {list(comprehensive_facets.keys())}")

Data Conversion

from airflow.providers.openlineage.utils.utils import translate_airflow_asset
from airflow.models import Asset

# Create Airflow Asset
asset = Asset(
    uri="s3://my-bucket/data/users.parquet",
    name="user_data"
)

# Convert to OpenLineage Dataset
lineage_context = {'namespace': 'production'}
ol_dataset = translate_airflow_asset(asset, lineage_context)

if ol_dataset:
    print(f"OpenLineage Dataset: {ol_dataset.namespace}/{ol_dataset.name}")
else:
    print("Asset conversion failed")

Safe Import Utility

from airflow.providers.openlineage.utils.utils import try_import_from_string

# Safely import optional dependencies
pandas = try_import_from_string('pandas')
if pandas:
    # Use pandas functionality
    df = pandas.DataFrame({'a': [1, 2, 3]})
    print("Pandas available")
else:
    print("Pandas not available")

# Try importing custom modules
custom_processor = try_import_from_string('my_package.processors.CustomProcessor')
if custom_processor:
    processor = custom_processor()
    processor.process()
else:
    print("Custom processor not available")

JSON Serialization

from airflow.providers.openlineage.utils.utils import DagInfo, TaskInfo, DagRunInfo

# Create serializable info objects
dag_info = DagInfo()
dag_info.dag_id = dag.dag_id
dag_info.schedule_interval = str(dag.schedule_interval)
dag_info.start_date = dag.start_date.isoformat()

task_info = TaskInfo()
task_info.task_id = task.task_id
task_info.operator_class = task.__class__.__name__

dag_run_info = DagRunInfo()
dag_run_info.run_id = dag_run.run_id
dag_run_info.execution_date = dag_run.execution_date.isoformat()

# Serialize to JSON
import json
serialized_data = {
    'dag': dag_info.__dict__,
    'task': task_info.__dict__,
    'dag_run': dag_run_info.__dict__
}

json_string = json.dumps(serialized_data, indent=2)
print(f"Serialized data: {json_string}")

Secrets Redaction

from airflow.providers.openlineage.utils.utils import OpenLineageRedactor

# Create redactor
redactor = OpenLineageRedactor()

# Sample data with sensitive information
sensitive_data = {
    'connection_string': 'postgresql://user:password@localhost/db',
    'api_key': 'secret-api-key-12345',
    'config': {
        'database_password': 'super-secret',
        'username': 'admin'
    }
}

# Redact sensitive data
redacted_data = redactor.redact(sensitive_data)
print(f"Redacted data: {redacted_data}")

Selective Lineage Checking

from airflow.providers.openlineage.utils.utils import is_selective_lineage_enabled
from airflow.providers.openlineage.utils.selective_enable import enable_lineage

# Check DAG lineage status
if is_selective_lineage_enabled(dag):
    print("Selective lineage is enabled for this DAG")
else:
    print("Selective lineage is not enabled for this DAG")

# Enable selective lineage
enabled_dag = enable_lineage(dag)

# Check again
if is_selective_lineage_enabled(enabled_dag):
    print("Selective lineage is now enabled")

Hook Connection Analysis

from airflow.providers.openlineage.utils.utils import should_use_external_connection
from airflow.hooks.postgres_hook import PostgresHook

# Create hook
hook = PostgresHook(postgres_conn_id='analytics_db')

# Check if external connection should be used
use_external = should_use_external_connection(hook)

if use_external:
    print("Using external connection for lineage")
else:
    print("Using standard connection handling")

Integration Patterns

Custom Extractor with Utilities

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.utils.utils import (
    get_fully_qualified_class_name,
    get_task_documentation,
    get_airflow_run_facet
)

class UtilityExtractor(BaseExtractor):
    def extract(self):
        # Use utilities for comprehensive extraction
        class_name = get_fully_qualified_class_name(self._operator)
        doc_md, description = get_task_documentation(self._operator)
        
        # Create lineage with metadata
        lineage = OperatorLineage(
            inputs=[],
            outputs=[],
            run_facets={
                'operator_info': {
                    'class_name': class_name,
                    'documentation': doc_md,
                    'description': description
                }
            },
            job_facets={}
        )
        
        return lineage

Comprehensive Metadata Collection

from airflow.providers.openlineage.utils.utils import *

def collect_comprehensive_metadata(dag, dag_run, task_instance, task):
    """Collect all available metadata using utility functions."""
    
    metadata = {
        'operator': {
            'class': get_operator_class(task),
            'fully_qualified_name': get_fully_qualified_class_name(task),
            'provider_version': get_operator_provider_version(task),
            'is_disabled': is_operator_disabled(task)
        },
        'job': {
            'name': get_job_name(task_instance)
        },
        'documentation': {
            'dag': get_dag_documentation(dag),
            'task': get_task_documentation(task)
        },
        'facets': {
            'run': get_airflow_run_facet(dag_run, dag, task_instance, task, 'uuid'),
            'job': get_airflow_job_facet(dag_run),
            'state': get_airflow_state_run_facet(dag_run, dag, task_instance),
            'engine': get_processing_engine_facet(),
            'debug': get_airflow_debug_facet()
        },
        'config': {
            'selective_enabled': is_selective_lineage_enabled(task)
        }
    }
    
    return metadata

# Usage
comprehensive_metadata = collect_comprehensive_metadata(dag, dag_run, task_instance, task)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json