Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core plugin components for Airflow integration, including event listeners, adapters, automatic event emission, and OpenLineage client management. This integration provides seamless data lineage tracking across the entire Airflow ecosystem.
Core adapter for creating and emitting OpenLineage events, managing the OpenLineage client, and coordinating event lifecycle.
class OpenLineageAdapter:
"""
Core adapter for OpenLineage event creation and emission.
Manages OpenLineage client instances, event building, and emission to
configured transport backends.
"""
def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None):
"""
Initialize adapter with optional client and secrets masker.
Args:
client: Pre-configured OpenLineage client
secrets_masker: Secrets masking utility for sensitive data
"""
def get_or_create_openlineage_client(self) -> OpenLineageClient:
"""
Get existing or create new OpenLineage client from configuration.
Returns:
OpenLineageClient: Configured client for event emission
"""
def get_openlineage_config(self) -> dict | None:
"""
Get complete OpenLineage configuration dictionary.
Returns:
dict: Configuration settings or None if not configured
"""
@staticmethod
def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number: int) -> str:
"""
Build unique DAG run identifier for OpenLineage events.
Args:
dag_id: DAG identifier
logical_date: DAG run logical execution date
clear_number: Clear/retry number for the run
Returns:
str: Unique DAG run identifier
"""
@staticmethod
def build_task_instance_run_id(
dag_id: str,
task_id: str,
execution_date: datetime,
try_number: int
) -> str:
"""
Build unique task instance run identifier.
Args:
dag_id: DAG identifier
task_id: Task identifier
execution_date: Task execution date
try_number: Task attempt number
Returns:
str: Unique task run identifier
"""
def emit(self, event: RunEvent) -> RunEvent:
"""
Emit OpenLineage event to configured transport.
Args:
event: OpenLineage run event to emit
Returns:
RunEvent: The emitted event (for chaining/logging)
"""
def start_task(
self,
run_id: str,
job_name: str,
job_description: str,
event_time: datetime,
parent_run_id: str | None,
code_location: str | None,
nominal_start_time: datetime | None,
inputs: list[Dataset],
outputs: list[Dataset],
run_facets: dict[str, RunFacet] | None = None,
job_facets: dict[str, JobFacet] | None = None
) -> RunEvent:
"""
Create and emit task start event.
Args:
run_id: Unique run identifier
job_name: Job name
job_description: Job description
event_time: Event timestamp
parent_run_id: Parent run identifier (for nested jobs)
code_location: Source code location
nominal_start_time: Scheduled start time
inputs: Input datasets
outputs: Output datasets
run_facets: Runtime metadata facets
job_facets: Job-level metadata facets
Returns:
RunEvent: Created and emitted start event
"""
def complete_task(
self,
run_id: str,
job_name: str,
job_description: str,
event_time: datetime,
parent_run_id: str | None,
code_location: str | None,
nominal_start_time: datetime | None,
inputs: list[Dataset],
outputs: list[Dataset],
run_facets: dict[str, RunFacet] | None = None,
job_facets: dict[str, JobFacet] | None = None
) -> RunEvent:
"""
Create and emit task completion event.
Args:
run_id: Unique run identifier
job_name: Job name
job_description: Job description
event_time: Event timestamp
parent_run_id: Parent run identifier
code_location: Source code location
nominal_start_time: Scheduled start time
inputs: Input datasets
outputs: Output datasets
run_facets: Runtime metadata facets
job_facets: Job-level metadata facets
Returns:
RunEvent: Created and emitted completion event
"""
def fail_task(
self,
run_id: str,
job_name: str,
job_description: str,
event_time: datetime,
parent_run_id: str | None,
code_location: str | None,
nominal_start_time: datetime | None,
inputs: list[Dataset],
outputs: list[Dataset],
run_facets: dict[str, RunFacet] | None = None,
job_facets: dict[str, JobFacet] | None = None
) -> RunEvent:
"""
Create and emit task failure event.
Args:
run_id: Unique run identifier
job_name: Job name
job_description: Job description
event_time: Event timestamp
parent_run_id: Parent run identifier
code_location: Source code location
nominal_start_time: Scheduled start time
inputs: Input datasets
outputs: Output datasets
run_facets: Runtime metadata facets
job_facets: Job-level metadata facets
Returns:
RunEvent: Created and emitted failure event
"""
def dag_started(
self,
dag_run: DagRun,
msg: str,
nominal_start_time: datetime,
dag: DAG
):
"""
Handle DAG start event and emit corresponding OpenLineage event.
Args:
dag_run: Started DAG run instance
msg: Event message
nominal_start_time: Scheduled start time
dag: DAG instance
"""
def dag_success(
self,
dag_run: DagRun,
msg: str,
nominal_start_time: datetime,
dag: DAG
):
"""
Handle DAG success event and emit corresponding OpenLineage event.
Args:
dag_run: Successful DAG run instance
msg: Event message
nominal_start_time: Scheduled start time
dag: DAG instance
"""
def dag_failed(
self,
dag_run: DagRun,
msg: str,
nominal_start_time: datetime,
dag: DAG
):
"""
Handle DAG failure event and emit corresponding OpenLineage event.
Args:
dag_run: Failed DAG run instance
msg: Event message
nominal_start_time: Scheduled start time
dag: DAG instance
"""Event listener that captures Airflow DAG and task lifecycle events and coordinates with the adapter for OpenLineage event emission.
class OpenLineageListener:
"""
Event listener for DAG and task lifecycle events.
Integrates with Airflow's event system to automatically capture
DAG runs, task instances, and their state changes for lineage tracking.
"""
# Implementation details are internal - provides event listening capabilities
def get_openlineage_listener() -> OpenLineageListener:
"""
Get singleton instance of OpenLineage event listener.
Returns:
OpenLineageListener: Global listener instance for event capture
"""Primary Airflow plugin that registers OpenLineage functionality with the Airflow ecosystem.
class OpenLineageProviderPlugin:
"""
Main Airflow plugin class for OpenLineage integration.
Automatically registers OpenLineage listeners, macros, and other
components when the provider package is installed.
"""Enumeration for OpenLineage run states used in event creation.
class RunState(Enum):
"""
Enumeration for OpenLineage run states.
Values:
START: Job/task is starting
RUNNING: Job/task is currently running
COMPLETE: Job/task completed successfully
ABORT: Job/task was aborted
FAIL: Job/task failed with error
"""
START = "START"
RUNNING = "RUNNING"
COMPLETE = "COMPLETE"
ABORT = "ABORT"
FAIL = "FAIL"from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from openlineage.client.event_v2 import Dataset
from datetime import datetime
# Initialize adapter
adapter = OpenLineageAdapter()
# Create datasets
inputs = [Dataset(namespace='db', name='raw.users')]
outputs = [Dataset(namespace='db', name='analytics.user_summary')]
# Emit task start event
start_event = adapter.start_task(
run_id='my-task-run-123',
job_name='process_users',
job_description='Process user data for analytics',
event_time=datetime.utcnow(),
parent_run_id=None,
code_location='dags/user_processing.py',
nominal_start_time=datetime.utcnow(),
inputs=inputs,
outputs=outputs,
run_facets={},
job_facets={}
)
print(f"Emitted start event: {start_event.eventType}")from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from openlineage.client.event_v2 import RunEvent, Run, Job
from datetime import datetime
adapter = OpenLineageAdapter()
# Create custom run event
event = RunEvent(
eventTime=datetime.utcnow(),
eventType='START',
run=Run(runId='custom-run-456'),
job=Job(namespace='my-namespace', name='custom-job'),
inputs=[],
outputs=[],
producer='airflow-openlineage-provider'
)
# Emit event
emitted_event = adapter.emit(event)
print(f"Emitted custom event: {emitted_event}")from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
# Initialize adapter (will create client from config)
adapter = OpenLineageAdapter()
# Get client for direct usage
client = adapter.get_or_create_openlineage_client()
print(f"Client transport: {client.transport}")
# Get configuration
config = adapter.get_openlineage_config()
if config:
print(f"Transport type: {config.get('transport', {}).get('type')}")
print(f"Namespace: {config.get('namespace')}")from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from datetime import datetime
# Generate DAG run ID
dag_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id='user_processing_dag',
logical_date=datetime(2023, 12, 1),
clear_number=0
)
print(f"DAG run ID: {dag_run_id}")
# Generate task run ID
task_run_id = OpenLineageAdapter.build_task_instance_run_id(
dag_id='user_processing_dag',
task_id='extract_users',
execution_date=datetime(2023, 12, 1),
try_number=1
)
print(f"Task run ID: {task_run_id}")from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from airflow.models import DagRun, DAG
from datetime import datetime
adapter = OpenLineageAdapter()
# Handle DAG lifecycle events
dag_run = DagRun(
dag_id='my_dag',
execution_date=datetime.utcnow(),
run_id='manual_2023-12-01'
)
dag = DAG('my_dag', start_date=datetime(2023, 1, 1))
# Emit DAG start event
adapter.dag_started(
dag_run=dag_run,
msg='DAG started execution',
nominal_start_time=datetime.utcnow(),
dag=dag
)
# Later, emit DAG completion
adapter.dag_success(
dag_run=dag_run,
msg='DAG completed successfully',
nominal_start_time=datetime.utcnow(),
dag=dag
)from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from airflow.operators.python import PythonOperator
def my_task_function(**context):
# Access adapter within task
adapter = OpenLineageAdapter()
# Get task instance info
task_instance = context['task_instance']
# Build run ID for current task
run_id = OpenLineageAdapter.build_task_instance_run_id(
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
execution_date=task_instance.execution_date,
try_number=task_instance.try_number
)
print(f"Current task run ID: {run_id}")
# Task logic here...
# Use in DAG
task = PythonOperator(
task_id='my_task',
python_callable=my_task_function,
provide_context=True,
dag=dag
)The plugin integrates automatically when the provider is installed:
# In pyproject.toml
[project.entry-points."airflow.plugins"]
openlineage = "airflow.providers.openlineage.plugins.openlineage:OpenLineageProviderPlugin"The plugin automatically registers the OpenLineage listener with Airflow's event system:
# Automatic registration in Airflow
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
# Listener is automatically registered to capture:
# - DAG run events (start, success, failure)
# - Task instance events (start, success, failure, retry)
# - Task state changesThe adapter automatically reads configuration from Airflow settings:
# airflow.cfg
[openlineage]
transport = {"type": "http", "url": "http://marquez:5000"}
namespace = production_airflow
disabled = falsefrom airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
adapter = OpenLineageAdapter()
try:
event = adapter.start_task(...)
print("Event emitted successfully")
except Exception as e:
print(f"Failed to emit event: {e}")
# Task continues executing - lineage failures don't break DAGs# Adapter handles client failures gracefully
adapter = OpenLineageAdapter()
# If client creation fails, adapter continues with no-op behavior
client = adapter.get_or_create_openlineage_client()
# Events may be silently dropped if transport is unavailable
# This ensures DAG execution is not impacted by lineage issuesfrom openlineage.client import OpenLineageClient
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
# Create custom client
custom_client = OpenLineageClient(
url='http://my-lineage-backend:8080',
session_timeout=30
)
# Use with adapter
adapter = OpenLineageAdapter(client=custom_client)from openlineage.client.secrets import SecretsMasker
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
# Create custom secrets masker
masker = SecretsMasker(
patterns=['password', 'api_key', 'token'],
replacement='***MASKED***'
)
# Use with adapter
adapter = OpenLineageAdapter(secrets_masker=masker)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage