Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Utilities for fine-grained control over lineage collection, allowing selective enabling/disabling at DAG and task levels. This provides granular control over which components emit OpenLineage events, enabling performance optimization and privacy controls.
Functions for enabling and disabling lineage collection on DAGs and tasks.
def enable_lineage(obj: T) -> T:
"""
Enable lineage collection for a DAG or task.
Args:
obj: DAG or task instance to enable lineage for
Returns:
T: The same object with lineage enabled (for method chaining)
"""
def disable_lineage(obj: T) -> T:
"""
Disable lineage collection for a DAG or task.
Args:
obj: DAG or task instance to disable lineage for
Returns:
T: The same object with lineage disabled (for method chaining)
"""Functions to check whether lineage is enabled for specific DAGs and tasks.
def is_task_lineage_enabled(task: BaseOperator | MappedOperator) -> bool:
"""
Check if lineage collection is enabled for a specific task.
Args:
task: Task/operator to check
Returns:
bool: True if lineage is enabled for this task
"""
def is_dag_lineage_enabled(dag: DAG) -> bool:
"""
Check if lineage collection is enabled for a specific DAG.
Args:
dag: DAG to check
Returns:
bool: True if lineage is enabled for this DAG
"""Constants for the lineage control parameter system.
ENABLE_OL_PARAM_NAME: str
"""Name of the parameter used to control OpenLineage enablement."""
ENABLE_OL_PARAM: Param
"""Parameter object for enabling OpenLineage on DAGs/tasks."""
DISABLE_OL_PARAM: Param
"""Parameter object for disabling OpenLineage on DAGs/tasks."""from airflow import DAG
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
from datetime import datetime
# Enable lineage for entire DAG
dag = enable_lineage(DAG(
'analytics_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
description='Analytics data processing pipeline'
))
# Disable lineage for a different DAG
sensitive_dag = disable_lineage(DAG(
'sensitive_data_processing',
start_date=datetime(2023, 1, 1),
schedule_interval='@hourly',
description='Sensitive data processing - no lineage tracking'
))from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
def process_data():
return "Processing complete"
def sensitive_operation():
return "Sensitive processing complete"
# Enable lineage for specific task
process_task = enable_lineage(PythonOperator(
task_id='process_public_data',
python_callable=process_data,
dag=dag
))
# Disable lineage for sensitive task
sensitive_task = disable_lineage(PythonOperator(
task_id='process_sensitive_data',
python_callable=sensitive_operation,
dag=dag
))
# Mix with regular tasks (inherit DAG setting)
regular_task = BashOperator(
task_id='cleanup_temp_files',
bash_command='rm -rf /tmp/processing/*',
dag=dag
)from airflow.providers.openlineage.utils.selective_enable import (
is_dag_lineage_enabled,
is_task_lineage_enabled
)
# Check DAG lineage status
if is_dag_lineage_enabled(dag):
print(f"Lineage is enabled for DAG: {dag.dag_id}")
else:
print(f"Lineage is disabled for DAG: {dag.dag_id}")
# Check task lineage status
for task_id, task in dag.task_dict.items():
if is_task_lineage_enabled(task):
print(f"Task {task_id}: lineage enabled")
else:
print(f"Task {task_id}: lineage disabled")from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
def create_processing_dag(environment: str):
"""Create DAG with environment-specific lineage settings."""
dag = DAG(
f'data_processing_{environment}',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)
# Enable lineage only for production
if environment == 'production':
dag = enable_lineage(dag)
else:
dag = disable_lineage(dag)
return dag
# Create environment-specific DAGs
prod_dag = create_processing_dag('production') # Lineage enabled
dev_dag = create_processing_dag('development') # Lineage disabled
test_dag = create_processing_dag('testing') # Lineage disabledfrom airflow.providers.openlineage.conf import selective_enable
from airflow.providers.openlineage.utils.selective_enable import enable_lineage
# Check if selective enable mode is active
if selective_enable():
print("Selective enable mode: Only explicitly enabled DAGs/tasks will emit lineage")
# In selective mode, must explicitly enable lineage
dag = enable_lineage(DAG(
'important_pipeline',
start_date=datetime(2023, 1, 1)
))
else:
print("Normal mode: All DAGs/tasks emit lineage unless explicitly disabled")
# In normal mode, lineage is enabled by default
dag = DAG(
'standard_pipeline',
start_date=datetime(2023, 1, 1)
)from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
class LineageControlledDAG(DAG):
"""Custom DAG class with built-in lineage control."""
def __init__(self, enable_lineage_tracking=True, **kwargs):
super().__init__(**kwargs)
if enable_lineage_tracking:
enable_lineage(self)
else:
disable_lineage(self)
# Usage
production_dag = LineageControlledDAG(
'production_pipeline',
enable_lineage_tracking=True,
start_date=datetime(2023, 1, 1)
)
development_dag = LineageControlledDAG(
'development_pipeline',
enable_lineage_tracking=False,
start_date=datetime(2023, 1, 1)
)from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
with DAG('grouped_pipeline', start_date=datetime(2023, 1, 1)) as dag:
# Enable lineage for entire task group
with TaskGroup('data_ingestion') as ingestion_group:
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_function
)
validate_task = PythonOperator(
task_id='validate_data',
python_callable=validate_function
)
# Enable lineage for the group
enable_lineage(ingestion_group)
# Disable lineage for sensitive processing group
with TaskGroup('sensitive_processing') as sensitive_group:
process_pii = PythonOperator(
task_id='process_pii',
python_callable=process_pii_function
)
anonymize_data = PythonOperator(
task_id='anonymize_data',
python_callable=anonymize_function
)
# Disable lineage for the entire group
disable_lineage(sensitive_group)from airflow.providers.openlineage.utils.selective_enable import enable_lineage, disable_lineage
from airflow.models import Variable
def create_dynamic_dag():
"""Create DAG with dynamic lineage control based on Airflow Variables."""
dag = DAG(
'dynamic_lineage_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)
# Check Airflow Variable for lineage setting
lineage_enabled = Variable.get('enable_lineage_tracking', default_var='true').lower() == 'true'
if lineage_enabled:
dag = enable_lineage(dag)
print("Lineage tracking enabled via Variable")
else:
dag = disable_lineage(dag)
print("Lineage tracking disabled via Variable")
return dag
# Usage
dynamic_dag = create_dynamic_dag()from airflow.providers.openlineage.conf import selective_enable, disabled_operators
from airflow.providers.openlineage.utils.selective_enable import is_task_lineage_enabled
def should_collect_lineage(task):
"""Comprehensive lineage collection decision logic."""
# Check if operator type is disabled
operator_class = f"{task.__class__.__module__}.{task.__class__.__name__}"
if operator_class in disabled_operators():
return False
# Check selective enable mode
if selective_enable():
return is_task_lineage_enabled(task)
# Check task-specific disable
return is_task_lineage_enabled(task)
# Usage in custom extractor
class SmartExtractor(BaseExtractor):
def extract(self):
if not should_collect_lineage(self._operator):
return None
# Normal extraction logic
return OperatorLineage(...)The selective enable functionality integrates with Airflow configuration:
# airflow.cfg
[openlineage]
selective_enable = true
disabled_for_operators = airflow.operators.bash.BashOperator;airflow.operators.dummy.DummyOperator# Enable selective mode
export AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
# Disable specific operators
export AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS="airflow.operators.python.PythonOperator"# Enable lineage for critical data pipelines
critical_dag = enable_lineage(DAG(
'financial_reporting',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
))
# Disable for high-frequency operational tasks
operational_dag = disable_lineage(DAG(
'system_monitoring',
start_date=datetime(2023, 1, 1),
schedule_interval='*/5 * * * *' # Every 5 minutes
))import os
def create_environment_aware_dag(dag_id: str, **kwargs):
"""Create DAG with environment-aware lineage settings."""
dag = DAG(dag_id, **kwargs)
# Enable lineage only in production and staging
environment = os.getenv('AIRFLOW_ENV', 'development')
if environment in ['production', 'staging']:
dag = enable_lineage(dag)
else:
dag = disable_lineage(dag)
return dag# Disable lineage for resource-intensive DAGs in development
if os.getenv('AIRFLOW_ENV') == 'development':
heavy_processing_dag = disable_lineage(DAG(
'ml_model_training',
start_date=datetime(2023, 1, 1)
))
else:
heavy_processing_dag = enable_lineage(DAG(
'ml_model_training',
start_date=datetime(2023, 1, 1)
))Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage