Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Template macros for accessing OpenLineage information within DAG definitions and task templates. These macros provide runtime access to lineage identifiers and metadata for use in dynamic task configurations and downstream processing.
Macros for accessing OpenLineage job names and run identifiers within templates.
def lineage_job_namespace() -> str:
"""
Get the OpenLineage namespace for the current context.
Returns:
str: Configured OpenLineage namespace
"""
def lineage_job_name(task_instance: TaskInstance) -> str:
"""
Get the OpenLineage job name for a task instance.
Args:
task_instance: Current task instance
Returns:
str: Formatted job name for OpenLineage events
"""
def lineage_run_id(task_instance: TaskInstance) -> str:
"""
Get the OpenLineage run ID for a task instance.
Args:
task_instance: Current task instance
Returns:
str: Unique run identifier for the task execution
"""Macros for accessing parent job and root execution information for hierarchical lineage tracking.
def lineage_parent_id(task_instance: TaskInstance) -> str:
"""
Get the parent run identifier for a task instance.
Used for tracking nested job relationships and DAG-level lineage.
Args:
task_instance: Current task instance
Returns:
str: Parent run identifier (typically the DAG run)
"""
def lineage_root_parent_id(task_instance: TaskInstance) -> str:
"""
Get the root parent run identifier for a task instance.
Tracks the top-level execution context across nested workflows.
Args:
task_instance: Current task instance
Returns:
str: Root parent run identifier
"""
def lineage_root_job_name(task_instance: TaskInstance) -> str:
"""
Get the root job name for a task instance.
Provides the top-level job context for nested execution hierarchies.
Args:
task_instance: Current task instance
Returns:
str: Root job name identifier
"""
def lineage_root_run_id(task_instance: TaskInstance) -> str:
"""
Get the root run ID for a task instance.
Tracks the original execution that triggered nested workflows.
Args:
task_instance: Current task instance
Returns:
str: Root run identifier
"""from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
'lineage_macro_example',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)
# Use lineage macros in bash command
lineage_task = BashOperator(
task_id='log_lineage_info',
bash_command='''
echo "Job Namespace: {{ lineage_job_namespace() }}"
echo "Job Name: {{ lineage_job_name(task_instance) }}"
echo "Run ID: {{ lineage_run_id(task_instance) }}"
echo "Parent ID: {{ lineage_parent_id(task_instance) }}"
''',
dag=dag
)from airflow.operators.python import PythonOperator
def process_with_lineage_info(**context):
"""Process data with lineage information."""
# Access lineage info through context
namespace = context['lineage_job_namespace']()
job_name = context['lineage_job_name'](context['task_instance'])
run_id = context['lineage_run_id'](context['task_instance'])
parent_id = context['lineage_parent_id'](context['task_instance'])
print(f"Processing in namespace: {namespace}")
print(f"Job: {job_name}")
print(f"Run ID: {run_id}")
print(f"Parent Run: {parent_id}")
# Use lineage info in processing logic
output_path = f"/data/processed/{namespace}/{job_name}/{run_id}/result.parquet"
# ... processing logic ...
return output_path
python_task = PythonOperator(
task_id='process_with_lineage',
python_callable=process_with_lineage_info,
provide_context=True,
dag=dag
)from airflow.providers.postgres.operators.postgres import PostgresOperator
# Use lineage macros in SQL templates
sql_task = PostgresOperator(
task_id='insert_lineage_metadata',
postgres_conn_id='analytics_db',
sql='''
INSERT INTO job_execution_log (
namespace,
job_name,
run_id,
parent_run_id,
execution_date,
created_at
) VALUES (
'{{ lineage_job_namespace() }}',
'{{ lineage_job_name(task_instance) }}',
'{{ lineage_run_id(task_instance) }}',
'{{ lineage_parent_id(task_instance) }}',
'{{ ds }}',
NOW()
);
''',
dag=dag
)from airflow.operators.python import PythonOperator
def create_output_paths(**context):
"""Create standardized output paths using lineage information."""
namespace = context['lineage_job_namespace']()
job_name = context['lineage_job_name'](context['task_instance'])
run_id = context['lineage_run_id'](context['task_instance'])
# Create hierarchical paths
base_path = f"/data/warehouse/{namespace}"
job_path = f"{base_path}/{job_name}"
run_path = f"{job_path}/{run_id}"
paths = {
'base_path': base_path,
'job_path': job_path,
'run_path': run_path,
'output_file': f"{run_path}/processed_data.parquet",
'metadata_file': f"{run_path}/metadata.json"
}
return paths
path_task = PythonOperator(
task_id='create_paths',
python_callable=create_output_paths,
dag=dag
)
# Use XCom to pass paths to downstream tasks
def process_data(**context):
"""Process data using generated paths."""
paths = context['task_instance'].xcom_pull(task_ids='create_paths')
print(f"Writing output to: {paths['output_file']}")
print(f"Writing metadata to: {paths['metadata_file']}")
# ... processing logic ...
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag
)
path_task >> process_taskfrom airflow.operators.python import BranchPythonOperator
def choose_processing_branch(**context):
"""Choose processing branch based on lineage context."""
namespace = context['lineage_job_namespace']()
job_name = context['lineage_job_name'](context['task_instance'])
# Different processing for different namespaces
if namespace == 'production':
return 'production_processing'
elif namespace == 'staging':
return 'staging_processing'
else:
return 'development_processing'
branch_task = BranchPythonOperator(
task_id='choose_branch',
python_callable=choose_processing_branch,
dag=dag
)
# Different tasks for different environments
production_task = PythonOperator(
task_id='production_processing',
python_callable=lambda: print("Production processing"),
dag=dag
)
staging_task = PythonOperator(
task_id='staging_processing',
python_callable=lambda: print("Staging processing"),
dag=dag
)
development_task = PythonOperator(
task_id='development_processing',
python_callable=lambda: print("Development processing"),
dag=dag
)
branch_task >> [production_task, staging_task, development_task]from airflow.operators.bash import BashOperator
# Call external API with lineage information
api_call_task = BashOperator(
task_id='notify_external_system',
bash_command='''
curl -X POST https://external-system.com/api/job-started \
-H "Content-Type: application/json" \
-d '{
"namespace": "{{ lineage_job_namespace() }}",
"job_name": "{{ lineage_job_name(task_instance) }}",
"run_id": "{{ lineage_run_id(task_instance) }}",
"parent_run_id": "{{ lineage_parent_id(task_instance) }}",
"execution_date": "{{ ds }}",
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}"
}'
''',
dag=dag
)from airflow.operators.python import PythonOperator
def track_execution_hierarchy(**context):
"""Track complete execution hierarchy using root context."""
current_run = context['lineage_run_id'](context['task_instance'])
parent_run = context['lineage_parent_id'](context['task_instance'])
root_run = context['lineage_root_run_id'](context['task_instance'])
root_job = context['lineage_root_job_name'](context['task_instance'])
hierarchy = {
'current_run': current_run,
'parent_run': parent_run,
'root_run': root_run,
'root_job': root_job,
'hierarchy_depth': 0 if current_run == root_run else 1
}
print(f"Execution hierarchy: {hierarchy}")
# Store hierarchy for downstream processing
return hierarchy
hierarchy_task = PythonOperator(
task_id='track_hierarchy',
python_callable=track_execution_hierarchy,
dag=dag
)from airflow.operators.python import PythonOperator
def custom_lineage_processing(**context):
"""Custom processing using all available lineage macros."""
ti = context['task_instance']
lineage_info = {
'namespace': context['lineage_job_namespace'](),
'job_name': context['lineage_job_name'](ti),
'run_id': context['lineage_run_id'](ti),
'parent_id': context['lineage_parent_id'](ti),
'root_parent_id': context['lineage_root_parent_id'](ti),
'root_job_name': context['lineage_root_job_name'](ti),
'root_run_id': context['lineage_root_run_id'](ti)
}
print("Complete lineage context:")
for key, value in lineage_info.items():
print(f" {key}: {value}")
# Use in business logic
unique_id = f"{lineage_info['namespace']}.{lineage_info['job_name']}.{lineage_info['run_id']}"
return {
'lineage_info': lineage_info,
'unique_id': unique_id
}
comprehensive_task = PythonOperator(
task_id='comprehensive_lineage',
python_callable=custom_lineage_processing,
dag=dag
)from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def create_traceable_dag():
"""Create DAG with comprehensive lineage tracing."""
dag = DAG(
'traceable_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)
def log_execution_start(**context):
lineage_info = {
'namespace': context['lineage_job_namespace'](),
'job_name': context['lineage_job_name'](context['task_instance']),
'run_id': context['lineage_run_id'](context['task_instance']),
'parent_id': context['lineage_parent_id'](context['task_instance'])
}
# Log to external tracing system
print(f"TRACE: Starting execution {lineage_info}")
return lineage_info
def process_data(**context):
lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')
# Use lineage info in processing
print(f"TRACE: Processing data for {lineage_info['job_name']}")
# ... data processing ...
return "Processing complete"
def log_execution_end(**context):
lineage_info = context['task_instance'].xcom_pull(task_ids='log_start')
print(f"TRACE: Completed execution {lineage_info}")
start_task = PythonOperator(
task_id='log_start',
python_callable=log_execution_start,
dag=dag
)
process_task = PythonOperator(
task_id='process',
python_callable=process_data,
dag=dag
)
end_task = PythonOperator(
task_id='log_end',
python_callable=log_execution_end,
dag=dag
)
start_task >> process_task >> end_task
return dag
traceable_dag = create_traceable_dag()import os
def create_environment_dag():
"""Create DAG with environment-specific lineage handling."""
environment = os.getenv('AIRFLOW_ENV', 'development')
dag = DAG(
f'multi_env_pipeline_{environment}',
start_date=datetime(2023, 1, 1)
)
def environment_specific_processing(**context):
namespace = context['lineage_job_namespace']()
job_name = context['lineage_job_name'](context['task_instance'])
# Environment-specific logic
if 'production' in namespace:
# Production-specific processing
output_path = f"/prod/data/{job_name}"
elif 'staging' in namespace:
# Staging-specific processing
output_path = f"/staging/data/{job_name}"
else:
# Development processing
output_path = f"/dev/data/{job_name}"
print(f"Processing for {environment} environment: {output_path}")
return output_path
process_task = PythonOperator(
task_id='environment_process',
python_callable=environment_specific_processing,
dag=dag
)
return dag
env_dag = create_environment_dag()The lineage macros are automatically available in Airflow templates when the OpenLineage provider is installed:
# Available in all template contexts:
# - Bash commands
# - SQL queries
# - Python operator arguments
# - Email templates
# - Any Airflow template field
# Example template usage across operators:
email_task = EmailOperator(
task_id='send_notification',
to=['admin@company.com'],
subject='Job {{ lineage_job_name(task_instance) }} completed',
html_content='''
<h2>Job Execution Complete</h2>
<p><strong>Namespace:</strong> {{ lineage_job_namespace() }}</p>
<p><strong>Job:</strong> {{ lineage_job_name(task_instance) }}</p>
<p><strong>Run ID:</strong> {{ lineage_run_id(task_instance) }}</p>
<p><strong>Parent Run:</strong> {{ lineage_parent_id(task_instance) }}</p>
''',
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage