CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

template-macros.mddocs/

Template Macros

Template macros for accessing OpenLineage information within DAG definitions and task templates. These macros provide runtime access to lineage identifiers and metadata for use in dynamic task configurations and downstream processing.

Capabilities

Job and Run Identification Macros

Macros for accessing OpenLineage job names and run identifiers within templates.

def lineage_job_namespace() -> str:
    """
    Get the OpenLineage namespace for the current context.
    
    Returns:
        str: Configured OpenLineage namespace
    """

def lineage_job_name(task_instance: TaskInstance) -> str:
    """
    Get the OpenLineage job name for a task instance.
    
    Args:
        task_instance: Current task instance
        
    Returns:
        str: Formatted job name for OpenLineage events
    """

def lineage_run_id(task_instance: TaskInstance) -> str:
    """
    Get the OpenLineage run ID for a task instance.
    
    Args:
        task_instance: Current task instance
        
    Returns:
        str: Unique run identifier for the task execution
    """

Parent and Root Tracking Macros

Macros for accessing parent job and root execution information for hierarchical lineage tracking.

def lineage_parent_id(task_instance: TaskInstance) -> str:
    """
    Get the parent run identifier for a task instance.
    
    Used for tracking nested job relationships and DAG-level lineage.
    
    Args:
        task_instance: Current task instance
        
    Returns:
        str: Parent run identifier (typically the DAG run)
    """

def lineage_root_parent_id(task_instance: TaskInstance) -> str:
    """
    Get the root parent run identifier for a task instance.
    
    Tracks the top-level execution context across nested workflows.
    
    Args:
        task_instance: Current task instance
        
    Returns:
        str: Root parent run identifier
    """

def lineage_root_job_name(task_instance: TaskInstance) -> str:
    """
    Get the root job name for a task instance.
    
    Provides the top-level job context for nested execution hierarchies.
    
    Args:
        task_instance: Current task instance
        
    Returns:
        str: Root job name identifier
    """

def lineage_root_run_id(task_instance: TaskInstance) -> str:
    """
    Get the root run ID for a task instance.
    
    Tracks the original execution that triggered nested workflows.
    
    Args:
        task_instance: Current task instance
        
    Returns:
        str: Root run identifier
    """

Usage Examples

Basic Template Usage

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    'lineage_macro_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
)

# Use lineage macros in bash command
lineage_task = BashOperator(
    task_id='log_lineage_info',
    bash_command='''
    echo "Job Namespace: {{ lineage_job_namespace() }}"
    echo "Job Name: {{ lineage_job_name(task_instance) }}"
    echo "Run ID: {{ lineage_run_id(task_instance) }}"
    echo "Parent ID: {{ lineage_parent_id(task_instance) }}"
    ''',
    dag=dag
)

Python Operator Template Usage

from airflow.operators.python import PythonOperator

def process_with_lineage_info(**context):
    """Process data with lineage information."""
    
    # Access lineage info through context
    namespace = context['lineage_job_namespace']()
    job_name = context['lineage_job_name'](context['task_instance'])
    run_id = context['lineage_run_id'](context['task_instance'])
    parent_id = context['lineage_parent_id'](context['task_instance'])
    
    print(f"Processing in namespace: {namespace}")
    print(f"Job: {job_name}")
    print(f"Run ID: {run_id}")
    print(f"Parent Run: {parent_id}")
    
    # Use lineage info in processing logic
    output_path = f"/data/processed/{namespace}/{job_name}/{run_id}/result.parquet"
    
    # ... processing logic ...
    
    return output_path

python_task = PythonOperator(
    task_id='process_with_lineage',
    python_callable=process_with_lineage_info,
    provide_context=True,
    dag=dag
)

SQL Operator Template Usage

from airflow.providers.postgres.operators.postgres import PostgresOperator

# Use lineage macros in SQL templates
sql_task = PostgresOperator(
    task_id='insert_lineage_metadata',
    postgres_conn_id='analytics_db',
    sql='''
    INSERT INTO job_execution_log (
        namespace,
        job_name,
        run_id,
        parent_run_id,
        execution_date,
        created_at
    ) VALUES (
        '{{ lineage_job_namespace() }}',
        '{{ lineage_job_name(task_instance) }}', 
        '{{ lineage_run_id(task_instance) }}',
        '{{ lineage_parent_id(task_instance) }}',
        '{{ ds }}',
        NOW()
    );
    ''',
    dag=dag
)

Dynamic File Path Generation

from airflow.operators.python import PythonOperator

def create_output_paths(**context):
    """Create standardized output paths using lineage information."""
    
    namespace = context['lineage_job_namespace']()
    job_name = context['lineage_job_name'](context['task_instance'])
    run_id = context['lineage_run_id'](context['task_instance'])
    
    # Create hierarchical paths
    base_path = f"/data/warehouse/{namespace}"
    job_path = f"{base_path}/{job_name}"
    run_path = f"{job_path}/{run_id}"
    
    paths = {
        'base_path': base_path,
        'job_path': job_path, 
        'run_path': run_path,
        'output_file': f"{run_path}/processed_data.parquet",
        'metadata_file': f"{run_path}/metadata.json"
    }
    
    return paths

path_task = PythonOperator(
    task_id='create_paths',
    python_callable=create_output_paths,
    dag=dag
)

# Use XCom to pass paths to downstream tasks
def process_data(**context):
    """Process data using generated paths."""
    
    paths = context['task_instance'].xcom_pull(task_ids='create_paths')
    
    print(f"Writing output to: {paths['output_file']}")
    print(f"Writing metadata to: {paths['metadata_file']}")
    
    # ... processing logic ...

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag
)

path_task >> process_task

Conditional Logic with Lineage Macros

from airflow.operators.python import BranchPythonOperator

def choose_processing_branch(**context):
    """Choose processing branch based on lineage context."""
    
    namespace = context['lineage_job_namespace']()
    job_name = context['lineage_job_name'](context['task_instance'])
    
    # Different processing for different namespaces
    if namespace == 'production':
        return 'production_processing'
    elif namespace == 'staging':
        return 'staging_processing'
    else:
        return 'development_processing'

branch_task = BranchPythonOperator(
    task_id='choose_branch',
    python_callable=choose_processing_branch,
    dag=dag
)

# Different tasks for different environments
production_task = PythonOperator(
    task_id='production_processing',
    python_callable=lambda: print("Production processing"),
    dag=dag
)

staging_task = PythonOperator(
    task_id='staging_processing', 
    python_callable=lambda: print("Staging processing"),
    dag=dag
)

development_task = PythonOperator(
    task_id='development_processing',
    python_callable=lambda: print("Development processing"),
    dag=dag
)

branch_task >> [production_task, staging_task, development_task]

External System Integration

from airflow.operators.bash import BashOperator

# Call external API with lineage information
api_call_task = BashOperator(
    task_id='notify_external_system',
    bash_command='''
    curl -X POST https://external-system.com/api/job-started \
        -H "Content-Type: application/json" \
        -d '{
            "namespace": "{{ lineage_job_namespace() }}",
            "job_name": "{{ lineage_job_name(task_instance) }}",
            "run_id": "{{ lineage_run_id(task_instance) }}",
            "parent_run_id": "{{ lineage_parent_id(task_instance) }}",
            "execution_date": "{{ ds }}",
            "dag_id": "{{ dag.dag_id }}",
            "task_id": "{{ task.task_id }}"
        }'
    ''',
    dag=dag
)

Root Context Tracking

from airflow.operators.python import PythonOperator

def track_execution_hierarchy(**context):
    """Track complete execution hierarchy using root context."""
    
    current_run = context['lineage_run_id'](context['task_instance'])
    parent_run = context['lineage_parent_id'](context['task_instance'])
    root_run = context['lineage_root_run_id'](context['task_instance'])
    root_job = context['lineage_root_job_name'](context['task_instance'])
    
    hierarchy = {
        'current_run': current_run,
        'parent_run': parent_run,
        'root_run': root_run,
        'root_job': root_job,
        'hierarchy_depth': 0 if current_run == root_run else 1
    }
    
    print(f"Execution hierarchy: {hierarchy}")
    
    # Store hierarchy for downstream processing
    return hierarchy

hierarchy_task = PythonOperator(
    task_id='track_hierarchy',
    python_callable=track_execution_hierarchy,
    dag=dag
)

Custom Macro Usage

from airflow.operators.python import PythonOperator

def custom_lineage_processing(**context):
    """Custom processing using all available lineage macros."""
    
    ti = context['task_instance']
    
    lineage_info = {
        'namespace': context['lineage_job_namespace'](),
        'job_name': context['lineage_job_name'](ti),
        'run_id': context['lineage_run_id'](ti),
        'parent_id': context['lineage_parent_id'](ti),
        'root_parent_id': context['lineage_root_parent_id'](ti),
        'root_job_name': context['lineage_root_job_name'](ti),
        'root_run_id': context['lineage_root_run_id'](ti)
    }
    
    print("Complete lineage context:")
    for key, value in lineage_info.items():
        print(f"  {key}: {value}")
    
    # Use in business logic
    unique_id = f"{lineage_info['namespace']}.{lineage_info['job_name']}.{lineage_info['run_id']}"
    
    return {
        'lineage_info': lineage_info,
        'unique_id': unique_id
    }

comprehensive_task = PythonOperator(
    task_id='comprehensive_lineage',
    python_callable=custom_lineage_processing,
    dag=dag
)

Integration Patterns

Data Pipeline Traceability

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def create_traceable_dag():
    """Create DAG with comprehensive lineage tracing."""
    
    dag = DAG(
        'traceable_pipeline',
        start_date=datetime(2023, 1, 1),
        schedule_interval='@daily'  
    )
    
    def log_execution_start(**context):
        lineage_info = {
            'namespace': context['lineage_job_namespace'](),
            'job_name': context['lineage_job_name'](context['task_instance']),
            'run_id': context['lineage_run_id'](context['task_instance']),
            'parent_id': context['lineage_parent_id'](context['task_instance'])
        }
        
        # Log to external tracing system
        print(f"TRACE: Starting execution {lineage_info}")
        return lineage_info
    
    def process_data(**context):
        lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')
        
        # Use lineage info in processing
        print(f"TRACE: Processing data for {lineage_info['job_name']}")
        
        # ... data processing ...
        
        return "Processing complete"
    
    def log_execution_end(**context):
        lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')
        
        print(f"TRACE: Completed execution {lineage_info}")
    
    start_task = PythonOperator(
        task_id='log_start',
        python_callable=log_execution_start,
        dag=dag
    )
    
    process_task = PythonOperator(
        task_id='process',
        python_callable=process_data,
        dag=dag
    )
    
    end_task = PythonOperator(
        task_id='log_end',
        python_callable=log_execution_end,
        dag=dag
    )
    
    start_task >> process_task >> end_task
    
    return dag

traceable_dag = create_traceable_dag()

Multi-Environment Configuration

import os

def create_environment_dag():
    """Create DAG with environment-specific lineage handling."""
    
    environment = os.getenv('AIRFLOW_ENV', 'development')
    
    dag = DAG(
        f'multi_env_pipeline_{environment}',
        start_date=datetime(2023, 1, 1)
    )
    
    def environment_specific_processing(**context):
        namespace = context['lineage_job_namespace']()
        job_name = context['lineage_job_name'](context['task_instance'])
        
        # Environment-specific logic
        if 'production' in namespace:
            # Production-specific processing
            output_path = f"/prod/data/{job_name}"
        elif 'staging' in namespace:
            # Staging-specific processing
            output_path = f"/staging/data/{job_name}"
        else:
            # Development processing
            output_path = f"/dev/data/{job_name}"
        
        print(f"Processing for {environment} environment: {output_path}")
        return output_path
    
    process_task = PythonOperator(
        task_id='environment_process',
        python_callable=environment_specific_processing,
        dag=dag
    )
    
    return dag

env_dag = create_environment_dag()

Macro Availability

The lineage macros are automatically available in Airflow templates when the OpenLineage provider is installed:

# Available in all template contexts:
# - Bash commands
# - SQL queries  
# - Python operator arguments
# - Email templates
# - Any Airflow template field

# Example template usage across operators:
email_task = EmailOperator(
    task_id='send_notification',
    to=['admin@company.com'],
    subject='Job {{ lineage_job_name(task_instance) }} completed',
    html_content='''
    <h2>Job Execution Complete</h2>
    <p><strong>Namespace:</strong> {{ lineage_job_namespace() }}</p>
    <p><strong>Job:</strong> {{ lineage_job_name(task_instance) }}</p>
    <p><strong>Run ID:</strong> {{ lineage_run_id(task_instance) }}</p>
    <p><strong>Parent Run:</strong> {{ lineage_parent_id(task_instance) }}</p>
    ''',
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json