CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-compat

Common Compatibility Provider - providing compatibility code for previous Airflow versions

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

standard-components.mddocs/

Standard Components

Version-compatible standard Airflow operators, triggers, and utilities including PythonOperator, TimeDeltaTrigger, and virtualenv preparation functions. These components provide consistent interfaces to core Airflow functionality across different versions.

Capabilities

Python Operator

Version-compatible Python operator for executing Python functions in Airflow tasks.

class PythonOperator(BaseOperator):
    """
    Version-compatible Python operator.
    
    Maps to airflow.providers.standard.operators.python.PythonOperator in Airflow 3.0+
    Maps to airflow.operators.python.PythonOperator in Airflow < 3.0
    """

Short Circuit Operator

Operator that allows conditional workflow execution by short-circuiting downstream tasks.

class ShortCircuitOperator(BaseOperator):
    """
    Version-compatible short circuit operator.
    
    Maps to airflow.providers.standard.operators.python.ShortCircuitOperator in Airflow 3.0+
    Maps to airflow.operators.python.ShortCircuitOperator in Airflow < 3.0
    """

Serializers

Serialization utilities for Python operator data persistence.

_SERIALIZERS: dict
    """
    Serializers for Python operator data.
    Contains serialization functions for various data types.
    """

Context Functions

Utilities for accessing Airflow execution context within tasks.

def get_current_context():
    """
    Get the current Airflow execution context.
    
    Returns:
        Context: Current task execution context containing dag_run, task_instance, etc.
        
    Maps to airflow.providers.standard.operators.python.get_current_context in Airflow 3.0+
    Maps to airflow.operators.python.get_current_context in Airflow < 3.0
    """

Time Delta Trigger

Deferrable trigger that waits for a specified time duration.

class TimeDeltaTrigger:
    """
    Time delta trigger for deferrable operators.
    
    Maps to airflow.providers.standard.triggers.temporal.TimeDeltaTrigger in Airflow 3.0+
    Maps to airflow.triggers.temporal.TimeDeltaTrigger in Airflow < 3.0
    """

Virtual Environment Utilities

Functions for preparing and managing Python virtual environments for task execution.

def write_python_script(...):
    """
    Write Python scripts for virtualenv execution.
    
    Maps to airflow.providers.standard.operators.python.write_python_script in Airflow 3.0+
    Maps to airflow.operators.python.write_python_script in Airflow < 3.0
    """

def prepare_virtualenv(...):
    """
    Prepare virtual environment for Python execution.
    
    Maps to airflow.providers.standard.operators.python.prepare_virtualenv in Airflow 3.0+
    Maps to airflow.operators.python.prepare_virtualenv in Airflow < 3.0
    """

Usage Examples

from airflow.providers.common.compat.standard.operators import (
    PythonOperator,
    ShortCircuitOperator,
    get_current_context
)
from airflow.providers.common.compat.standard.triggers import TimeDeltaTrigger
from airflow.providers.common.compat.standard.utils import write_python_script, prepare_virtualenv

from airflow import DAG
from datetime import datetime, timedelta

# Create DAG
dag = DAG(
    "example_standard_components",
    start_date=datetime(2024, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False
)

# Python task function
def my_python_function(**context):
    # Access current context
    current_context = get_current_context()
    
    print(f"Task ID: {current_context['task_instance'].task_id}")
    print(f"Execution date: {current_context['execution_date']}")
    
    return "Task completed successfully"

# Create PythonOperator task
python_task = PythonOperator(
    task_id="python_task",
    python_callable=my_python_function,
    dag=dag
)

# Short circuit condition function
def should_continue(**context):
    # Some business logic to determine if workflow should continue
    execution_date = context['execution_date']
    return execution_date.weekday() < 5  # Only run on weekdays

# Create ShortCircuitOperator task
gate_task = ShortCircuitOperator(
    task_id="weekday_gate",
    python_callable=should_continue,
    dag=dag
)

# Deferrable task using TimeDeltaTrigger
from airflow.sensors.base import BaseSensorOperator

class WaitSensor(BaseSensorOperator):
    def __init__(self, wait_duration: timedelta, **kwargs):
        super().__init__(**kwargs)
        self.wait_duration = wait_duration
    
    def execute(self, context):
        if not self.poke(context):
            self.defer(
                trigger=TimeDeltaTrigger(delta=self.wait_duration),
                method_name="execute_complete"
            )
    
    def poke(self, context):
        return False  # Always defer
    
    def execute_complete(self, context, event):
        return "Wait completed"

wait_task = WaitSensor(
    task_id="wait_5_minutes",
    wait_duration=timedelta(minutes=5),
    dag=dag
)

# Virtual environment task
def use_virtualenv_utilities():
    # Prepare virtual environment
    venv_dir = prepare_virtualenv(
        venv_directory="/tmp/my_venv",
        python_bin="python3.9",
        requirements=["pandas==1.5.0", "numpy==1.24.0"]
    )
    
    # Write Python script for execution
    script_path = write_python_script(
        jinja_context={},
        template_filename="my_script.py.j2",
        op_args=[],
        op_kwargs={}
    )
    
    return f"Prepared venv at {venv_dir}, script at {script_path}"

venv_task = PythonOperator(
    task_id="virtualenv_prep",
    python_callable=use_virtualenv_utilities,
    dag=dag
)

# Set task dependencies
gate_task >> python_task >> wait_task >> venv_task

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-compat

docs

asset-management.md

index.md

lineage-entities.md

notifier-compatibility.md

openlineage-integration.md

provider-verification.md

security-permissions.md

standard-components.md

version-compatibility.md

tile.json