Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Custom facet definitions for enriching OpenLineage events with Airflow-specific metadata, including DAG information, task states, debug data, and unknown operator handling. Facets provide extensible metadata enrichment for comprehensive lineage tracking.
Comprehensive runtime metadata facets that capture Airflow-specific execution context.
class AirflowRunFacet:
"""
Composite run facet containing comprehensive Airflow execution metadata.
"""
dag: dict # DAG metadata and configuration
dagRun: dict # DAG run instance information
task: dict # Task/operator metadata
taskInstance: dict # Task instance execution details
taskUuid: str # Unique task identifier
class AirflowDagRunFacet:
"""
DAG run specific facet with DAG and run metadata.
"""
dag: dict # DAG configuration and properties
dagRun: dict # DAG run execution details
class AirflowStateRunFacet:
"""
DAG and task state information facet for tracking execution states.
"""
dagRunState: str # Current DAG run state
tasksState: dict[str, str] # Mapping of task IDs to their current statesJob-level metadata facets that describe DAG structure and task organization.
class AirflowJobFacet:
"""
Composite job facet with task tree, groups, and task metadata.
"""
taskTree: dict # Hierarchical task dependency structure
taskGroups: dict # Task group organization and metadata
tasks: dict # Individual task metadata and configurationFacets specific to dynamic task mapping functionality in Airflow.
class AirflowMappedTaskRunFacet:
"""
Facet for mapped task information and dynamic task execution metadata.
"""
mapIndex: int # Index of this mapped task instance
operatorClass: str # Fully qualified operator class name
@classmethod
def from_task_instance(cls, task_instance):
"""
Create facet from task instance.
Args:
task_instance: Airflow task instance
Returns:
AirflowMappedTaskRunFacet: Facet with mapped task metadata
"""Facets for debugging, development, and operational visibility.
class AirflowDebugRunFacet:
"""
Debug information facet with package details and system information.
Includes comprehensive debugging metadata when debug mode is enabled,
potentially creating large events with detailed system information.
"""
packages: dict # Installed packages and their versions
class UnknownOperatorInstance:
"""
Descriptor for unknown or unrecognized operators.
"""
name: str # Operator name or identifier
properties: dict[str, object] # Operator properties and attributes
type: str # Operator type classification
class UnknownOperatorAttributeRunFacet:
"""
Facet for capturing information about unknown or unhandled operators.
Provides visibility into operators that don't have specific extractors
while still capturing basic metadata for lineage tracking.
"""
unknownItems: list[UnknownOperatorInstance] # List of unknown operator instancesfrom airflow.providers.openlineage.plugins.facets import AirflowRunFacet
from airflow.models import TaskInstance, DagRun
def create_run_facet(task_instance: TaskInstance, dag_run: DagRun):
"""Create comprehensive Airflow run facet."""
facet = AirflowRunFacet(
dag={
'dag_id': dag_run.dag_id,
'schedule_interval': str(dag_run.dag.schedule_interval),
'start_date': dag_run.dag.start_date.isoformat(),
'tags': dag_run.dag.tags,
'owner': dag_run.dag.owner
},
dagRun={
'run_id': dag_run.run_id,
'execution_date': dag_run.execution_date.isoformat(),
'start_date': dag_run.start_date.isoformat() if dag_run.start_date else None,
'end_date': dag_run.end_date.isoformat() if dag_run.end_date else None,
'state': dag_run.state,
'run_type': dag_run.run_type
},
task={
'task_id': task_instance.task_id,
'operator_class': task_instance.operator.__class__.__name__,
'pool': task_instance.pool,
'queue': task_instance.queue,
'priority_weight': task_instance.priority_weight
},
taskInstance={
'try_number': task_instance.try_number,
'max_tries': task_instance.max_tries,
'start_date': task_instance.start_date.isoformat() if task_instance.start_date else None,
'end_date': task_instance.end_date.isoformat() if task_instance.end_date else None,
'duration': task_instance.duration,
'state': task_instance.state,
'hostname': task_instance.hostname,
'pid': task_instance.pid
},
taskUuid=f"{dag_run.dag_id}.{task_instance.task_id}.{task_instance.execution_date}.{task_instance.try_number}"
)
return facetfrom airflow.providers.openlineage.plugins.facets import AirflowStateRunFacet
from airflow.models import DagRun
def create_state_facet(dag_run: DagRun):
"""Create state tracking facet."""
# Get task states from DAG run
task_states = {}
for task_instance in dag_run.get_task_instances():
task_states[task_instance.task_id] = task_instance.state or 'none'
facet = AirflowStateRunFacet(
dagRunState=dag_run.state or 'none',
tasksState=task_states
)
return facet
# Usage in lineage extraction
state_facet = create_state_facet(dag_run)
run_facets = {
'airflow_state': state_facet
}from airflow.providers.openlineage.plugins.facets import AirflowMappedTaskRunFacet
from airflow.models import TaskInstance
def handle_mapped_task(task_instance: TaskInstance):
"""Handle mapped task metadata extraction."""
if hasattr(task_instance, 'map_index') and task_instance.map_index is not None:
# Create mapped task facet
mapped_facet = AirflowMappedTaskRunFacet.from_task_instance(task_instance)
print(f"Mapped task index: {mapped_facet.mapIndex}")
print(f"Operator class: {mapped_facet.operatorClass}")
return {
'airflow_mapped_task': mapped_facet
}
return {}
# Example with mapped operator
from airflow.operators.python import PythonOperator
@dag
def mapped_dag():
def process_item(item):
return f"Processed {item}"
# Mapped task creates multiple instances
mapped_task = PythonOperator.partial(
task_id='process_items',
python_callable=process_item
).expand(op_kwargs=[{'item': i} for i in range(5)])
return mapped_task
# Each mapped instance gets its own facet with map_indexfrom airflow.providers.openlineage.plugins.facets import AirflowJobFacet
from airflow.models import DAG, DagRun
def create_job_facet(dag: DAG, dag_run: DagRun):
"""Create job facet with DAG structure information."""
# Build task tree
task_tree = {}
for task_id, task in dag.task_dict.items():
task_tree[task_id] = {
'operator_class': task.__class__.__name__,
'upstream_task_ids': list(task.upstream_task_ids),
'downstream_task_ids': list(task.downstream_task_ids)
}
# Build task groups
task_groups = {}
if hasattr(dag, 'task_group_dict'):
for group_id, group in dag.task_group_dict.items():
task_groups[group_id] = {
'tooltip': group.tooltip,
'ui_color': group.ui_color,
'ui_fgcolor': group.ui_fgcolor,
'children': list(group.children.keys())
}
# Build tasks metadata
tasks = {}
for task_id, task in dag.task_dict.items():
tasks[task_id] = {
'operator_class': task.__class__.__name__,
'template_fields': list(task.template_fields) if task.template_fields else [],
'pool': task.pool,
'queue': task.queue,
'retries': task.retries,
'retry_delay': str(task.retry_delay) if task.retry_delay else None
}
facet = AirflowJobFacet(
taskTree=task_tree,
taskGroups=task_groups,
tasks=tasks
)
return facetfrom airflow.providers.openlineage.plugins.facets import (
UnknownOperatorInstance,
UnknownOperatorAttributeRunFacet
)
from airflow.models import BaseOperator
def handle_unknown_operator(operator: BaseOperator):
"""Handle operators without specific extractors."""
# Extract basic operator properties
properties = {}
for attr in dir(operator):
if not attr.startswith('_') and not callable(getattr(operator, attr)):
try:
value = getattr(operator, attr)
# Only include serializable values
if isinstance(value, (str, int, float, bool, list, dict)):
properties[attr] = value
except Exception:
continue # Skip problematic attributes
# Create unknown operator instance
unknown_instance = UnknownOperatorInstance(
name=operator.task_id,
properties=properties,
type=operator.__class__.__name__
)
# Create facet
facet = UnknownOperatorAttributeRunFacet(
unknownItems=[unknown_instance]
)
return {
'airflow_unknown_operator': facet
}
# Usage in custom extractor
class GenericExtractor(BaseExtractor):
def extract(self):
# Try specific extraction first
if hasattr(self._operator, 'get_openlineage_facets_on_start'):
return self._operator.get_openlineage_facets_on_start()
# Fall back to unknown operator handling
unknown_facets = handle_unknown_operator(self._operator)
return OperatorLineage(
inputs=[],
outputs=[],
run_facets=unknown_facets,
job_facets={}
)from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
import pkg_resources
def create_debug_facet():
"""Create debug facet with system information."""
# Get installed packages
packages = {}
for dist in pkg_resources.working_set:
packages[dist.project_name] = dist.version
facet = AirflowDebugRunFacet(
packages=packages
)
return facet
# Usage with configuration check
from airflow.providers.openlineage.conf import debug_mode
def get_debug_facets():
"""Get debug facets if debug mode is enabled."""
if debug_mode():
return {
'airflow_debug': create_debug_facet()
}
return {}from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
class CustomExtractor(BaseExtractor):
def extract(self):
# Basic lineage
lineage = OperatorLineage(
inputs=[],
outputs=[],
run_facets={},
job_facets={}
)
# Add Airflow-specific facets
if hasattr(self, '_task_instance') and hasattr(self, '_dag_run'):
airflow_facet = create_run_facet(self._task_instance, self._dag_run)
lineage.run_facets['airflow'] = airflow_facet
return lineagefrom airflow.providers.openlineage.utils.utils import (
get_airflow_run_facet,
get_airflow_job_facet,
get_airflow_state_run_facet,
get_airflow_debug_facet
)
def get_comprehensive_facets(task_instance, dag_run, dag):
"""Get all available Airflow facets."""
facets = {}
# Add run facet
run_facet = get_airflow_run_facet(
dag_run=dag_run,
dag=dag,
task_instance=task_instance,
task=task_instance.task,
task_uuid=f"{dag_run.dag_id}.{task_instance.task_id}"
)
facets.update(run_facet)
# Add job facet
job_facet = get_airflow_job_facet(dag_run)
facets.update(job_facet)
# Add state facet
state_facet = get_airflow_state_run_facet(dag_run, dag, task_instance)
facets.update(state_facet)
# Add debug facet if enabled
debug_facet = get_airflow_debug_facet()
facets.update(debug_facet)
return facetsfrom openlineage.client.facet import RunFacet
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class CustomProcessingRunFacet(RunFacet):
"""Custom facet for processing statistics."""
records_processed: int
processing_time_seconds: float
memory_usage_mb: float
error_count: int
warnings: list[str]
@staticmethod
def _get_schema() -> str:
return "https://my-company.com/schemas/custom_processing_run_facet.json"
# Usage in extractor
def extract_with_custom_facet(self):
# ... extraction logic ...
custom_facet = CustomProcessingRunFacet(
records_processed=1000,
processing_time_seconds=45.2,
memory_usage_mb=128,
error_count=0,
warnings=[]
)
return OperatorLineage(
inputs=inputs,
outputs=outputs,
run_facets={
'custom_processing': custom_facet
},
job_facets={}
)# Register custom facets via configuration
# In airflow.cfg:
[openlineage]
custom_run_facets = my_package.facets.custom_processing_facet;my_package.facets.performance_facet
# The functions should return dict[str, RunFacet]
def custom_processing_facet(task_instance, **kwargs):
return {
'custom_processing': CustomProcessingRunFacet(...)
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage