Common Compatibility Provider - providing compatibility code for previous Airflow versions
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Version-compatible standard Airflow operators, triggers, and utilities including PythonOperator, TimeDeltaTrigger, and virtualenv preparation functions. These components provide consistent interfaces to core Airflow functionality across different versions.
Version-compatible Python operator for executing Python functions in Airflow tasks.
class PythonOperator(BaseOperator):
"""
Version-compatible Python operator.
Maps to airflow.providers.standard.operators.python.PythonOperator in Airflow 3.0+
Maps to airflow.operators.python.PythonOperator in Airflow < 3.0
"""Operator that allows conditional workflow execution by short-circuiting downstream tasks.
class ShortCircuitOperator(BaseOperator):
"""
Version-compatible short circuit operator.
Maps to airflow.providers.standard.operators.python.ShortCircuitOperator in Airflow 3.0+
Maps to airflow.operators.python.ShortCircuitOperator in Airflow < 3.0
"""Serialization utilities for Python operator data persistence.
_SERIALIZERS: dict
"""
Serializers for Python operator data.
Contains serialization functions for various data types.
"""Utilities for accessing Airflow execution context within tasks.
def get_current_context():
"""
Get the current Airflow execution context.
Returns:
Context: Current task execution context containing dag_run, task_instance, etc.
Maps to airflow.providers.standard.operators.python.get_current_context in Airflow 3.0+
Maps to airflow.operators.python.get_current_context in Airflow < 3.0
"""Deferrable trigger that waits for a specified time duration.
class TimeDeltaTrigger:
"""
Time delta trigger for deferrable operators.
Maps to airflow.providers.standard.triggers.temporal.TimeDeltaTrigger in Airflow 3.0+
Maps to airflow.triggers.temporal.TimeDeltaTrigger in Airflow < 3.0
"""Functions for preparing and managing Python virtual environments for task execution.
def write_python_script(...):
"""
Write Python scripts for virtualenv execution.
Maps to airflow.providers.standard.operators.python.write_python_script in Airflow 3.0+
Maps to airflow.operators.python.write_python_script in Airflow < 3.0
"""
def prepare_virtualenv(...):
"""
Prepare virtual environment for Python execution.
Maps to airflow.providers.standard.operators.python.prepare_virtualenv in Airflow 3.0+
Maps to airflow.operators.python.prepare_virtualenv in Airflow < 3.0
"""from airflow.providers.common.compat.standard.operators import (
PythonOperator,
ShortCircuitOperator,
get_current_context
)
from airflow.providers.common.compat.standard.triggers import TimeDeltaTrigger
from airflow.providers.common.compat.standard.utils import write_python_script, prepare_virtualenv
from airflow import DAG
from datetime import datetime, timedelta
# Create DAG
dag = DAG(
"example_standard_components",
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False
)
# Python task function
def my_python_function(**context):
# Access current context
current_context = get_current_context()
print(f"Task ID: {current_context['task_instance'].task_id}")
print(f"Execution date: {current_context['execution_date']}")
return "Task completed successfully"
# Create PythonOperator task
python_task = PythonOperator(
task_id="python_task",
python_callable=my_python_function,
dag=dag
)
# Short circuit condition function
def should_continue(**context):
# Some business logic to determine if workflow should continue
execution_date = context['execution_date']
return execution_date.weekday() < 5 # Only run on weekdays
# Create ShortCircuitOperator task
gate_task = ShortCircuitOperator(
task_id="weekday_gate",
python_callable=should_continue,
dag=dag
)
# Deferrable task using TimeDeltaTrigger
from airflow.sensors.base import BaseSensorOperator
class WaitSensor(BaseSensorOperator):
def __init__(self, wait_duration: timedelta, **kwargs):
super().__init__(**kwargs)
self.wait_duration = wait_duration
def execute(self, context):
if not self.poke(context):
self.defer(
trigger=TimeDeltaTrigger(delta=self.wait_duration),
method_name="execute_complete"
)
def poke(self, context):
return False # Always defer
def execute_complete(self, context, event):
return "Wait completed"
wait_task = WaitSensor(
task_id="wait_5_minutes",
wait_duration=timedelta(minutes=5),
dag=dag
)
# Virtual environment task
def use_virtualenv_utilities():
# Prepare virtual environment
venv_dir = prepare_virtualenv(
venv_directory="/tmp/my_venv",
python_bin="python3.9",
requirements=["pandas==1.5.0", "numpy==1.24.0"]
)
# Write Python script for execution
script_path = write_python_script(
jinja_context={},
template_filename="my_script.py.j2",
op_args=[],
op_kwargs={}
)
return f"Prepared venv at {venv_dir}, script at {script_path}"
venv_task = PythonOperator(
task_id="virtualenv_prep",
python_callable=use_virtualenv_utilities,
dag=dag
)
# Set task dependencies
gate_task >> python_task >> wait_task >> venv_taskInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-compat