CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airflow

Core operators and hooks for Apache Airflow workflow orchestration including BashOperator, PythonOperator, EmailOperator, and essential database and HTTP connectivity

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core.mddocs/

Core Framework

Foundation classes and utilities that provide the essential framework for Apache Airflow operator development, state management, error handling, and workflow control. These components form the building blocks for creating custom operators and managing task execution.

Capabilities

Base Operator Framework

Abstract foundation class that all operators inherit from, providing core task functionality, dependency management, templating support, and DAG integration.

class BaseOperator:
    def __init__(
        self,
        task_id,
        owner,
        email=None,
        email_on_retry=True,
        email_on_failure=True,
        retries=0,
        retry_delay=timedelta(seconds=300),
        start_date=None,
        end_date=None,
        schedule_interval=None,
        depends_on_past=False,
        wait_for_downstream=False,
        dag=None,
        params=None,
        default_args=None,
        adhoc=False,
        priority_weight=1,
        queue='default',
        pool=None,
        sla=None,
        execution_timeout=None,
        on_failure_callback=None,
        on_success_callback=None,
        on_retry_callback=None,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        **kwargs
    ):
        """
        Abstract base class for all operators. Contains recursive methods for DAG crawling behavior.
        
        Key Parameters:
        - task_id (str): Unique, meaningful identifier for the task
        - owner (str): Owner of the task (unix username recommended)
        - retries (int): Number of retries before task failure
        - retry_delay (timedelta): Delay between retries
        - start_date (datetime): Task start date
        - depends_on_past (bool): Task instance depends on success of previous schedule
        - dag (DAG): The DAG this task belongs to
        - trigger_rule (str): Rule for triggering task based on upstream states
        - pool (str): Resource pool to use for task execution
        - priority_weight (int): Priority weight for task scheduling
        """

    template_fields = ()
    template_ext = ()
    ui_color = '#fff'
    ui_fgcolor = '#000'

    def execute(self, context):
        """
        Execute the task logic (must be implemented by subclasses).
        
        Parameters:
        - context (dict): Task execution context containing runtime information
        
        Raises:
        - NotImplementedError: If not implemented by subclass
        """

    def pre_execute(self, context):
        """Hook called before task execution."""

    def post_execute(self, context, result):
        """Hook called after task execution."""

    def on_kill(self):
        """Override to perform cleanup when task is killed."""

    def set_upstream(self, task_or_task_list):
        """Set upstream task dependencies."""

    def set_downstream(self, task_or_task_list):
        """Set downstream task dependencies."""

    def __rshift__(self, other):
        """Implement >> operator for task dependencies."""

    def __lshift__(self, other):
        """Implement << operator for task dependencies."""

Usage Example:

from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from datetime import datetime, timedelta

class CustomOperator(BaseOperator):
    # Define template fields and UI colors
    template_fields = ('input_path', 'output_path')
    ui_color = '#87CEEB'
    
    @apply_defaults
    def __init__(
        self,
        input_path,
        output_path,
        processing_options=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.input_path = input_path
        self.output_path = output_path
        self.processing_options = processing_options or {}
    
    def execute(self, context):
        """Implement custom task logic."""
        print(f"Processing {self.input_path} -> {self.output_path}")
        print(f"Execution date: {context['ds']}")
        
        # Custom processing logic here
        result = self._process_data()
        
        # Return value for XCom
        return result
    
    def _process_data(self):
        # Implementation details
        return "Processing completed"
    
    def on_kill(self):
        """Cleanup when task is killed."""
        print("Task was killed, performing cleanup")

# Using the custom operator
custom_task = CustomOperator(
    task_id='custom_processing',
    input_path='/data/input/{{ ds }}',  # Templated
    output_path='/data/output/{{ ds }}', # Templated
    processing_options={'threads': 4},
    retries=2,
    retry_delay=timedelta(minutes=5),
    dag=dag
)

Task State Management

Constants and utilities for managing task instance states throughout the execution lifecycle with color coding for UI display.

class State:
    QUEUED = "queued"
    RUNNING = "running"
    SUCCESS = "success"
    SHUTDOWN = "shutdown"
    FAILED = "failed"
    UP_FOR_RETRY = "up_for_retry"
    UPSTREAM_FAILED = "upstream_failed"
    SKIPPED = "skipped"
    NONE = "none"
    
    @classmethod
    def color(cls, state):
        """
        Get UI color for a given state.
        
        Parameters:
        - state (str): State name
        
        Returns:
        - str: Color string for UI display
        """
    
    @classmethod
    def runnable(cls):
        """
        Get list of states that are considered runnable.
        
        Returns:
        - list: List of runnable state values
        """

Usage Examples:

from airflow.utils import State

def check_task_state(**context):
    task_instance = context['ti']
    
    # Check current state
    if task_instance.state == State.RUNNING:
        print("Task is currently running")
    elif task_instance.state == State.SUCCESS:
        print("Task completed successfully")
    elif task_instance.state == State.FAILED:
        print("Task failed")
    
    # Get UI color for state
    color = State.color(task_instance.state)
    print(f"UI color for state '{task_instance.state}': {color}")
    
    # Check if state is runnable
    runnable_states = State.runnable()
    if task_instance.state in runnable_states:
        print("Task is in a runnable state")

# Custom state checking in operators
class StateAwareOperator(BaseOperator):
    @apply_defaults
    def __init__(self, check_upstream_states=False, **kwargs):
        super().__init__(**kwargs)
        self.check_upstream_states = check_upstream_states
    
    def execute(self, context):
        if self.check_upstream_states:
            dag = context['dag']
            execution_date = context['execution_date']
            
            for upstream_task_id in self.upstream_task_ids:
                ti = dag.get_task(upstream_task_id).get_task_instance(
                    execution_date=execution_date
                )
                
                if ti.state != State.SUCCESS:
                    raise AirflowException(
                        f"Upstream task {upstream_task_id} is in state {ti.state}"
                    )
        
        # Continue with task execution
        return "Task completed"

Trigger Rules for Task Dependencies

Constants defining when tasks should be triggered based on upstream task completion states, enabling complex workflow control patterns.

class TriggerRule:
    ALL_SUCCESS = 'all_success'
    ALL_FAILED = 'all_failed'
    ALL_DONE = 'all_done'
    ONE_SUCCESS = 'one_success'
    ONE_FAILED = 'one_failed'
    DUMMY = 'dummy'

Usage Examples:

from airflow.utils import TriggerRule
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

# Task that runs only if all upstream tasks succeed (default)
success_task = DummyOperator(
    task_id='all_success_task',
    trigger_rule=TriggerRule.ALL_SUCCESS,  # Default behavior
    dag=dag
)

# Task that runs if any upstream task fails (error handling)
error_handler = PythonOperator(
    task_id='error_handler',
    python_callable=handle_errors,
    trigger_rule=TriggerRule.ONE_FAILED,
    dag=dag
)

# Task that runs regardless of upstream task states (cleanup)
cleanup_task = DummyOperator(
    task_id='cleanup',
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag
)

# Task that runs if no upstream tasks failed
continue_task = DummyOperator(
    task_id='continue_processing',
    trigger_rule=TriggerRule.NONE_FAILED,
    dag=dag
)

# Always run (ignore dependencies)
monitoring_task = PythonOperator(
    task_id='monitoring',
    python_callable=send_metrics,
    trigger_rule=TriggerRule.DUMMY,
    dag=dag
)

# Complex workflow with multiple trigger rules
task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag) 
task_c = DummyOperator(task_id='task_c', dag=dag)

# Success path - continues only if all upstream succeed
success_path = DummyOperator(
    task_id='success_path',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag
)

# Failure path - handles any upstream failures
failure_path = PythonOperator(
    task_id='failure_path',
    python_callable=handle_failure,
    trigger_rule=TriggerRule.ONE_FAILED,
    dag=dag
)

# Cleanup - always runs at the end
final_cleanup = DummyOperator(
    task_id='final_cleanup',
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag
)

# Set dependencies
[task_a, task_b, task_c] >> success_path
[task_a, task_b, task_c] >> failure_path
[success_path, failure_path] >> final_cleanup

Exception Handling

Base exception class for Airflow-specific errors with proper error propagation and logging integration.

class AirflowException(Exception):
    """
    Base exception class for Airflow-specific errors.
    
    All Airflow operators and hooks should raise this exception type
    for proper error handling by the scheduler and executor.
    """
    pass

Usage Examples:

from airflow.utils import AirflowException

def validate_input_data(**context):
    data_path = f"/data/{context['ds']}"
    
    # Check if data exists
    import os
    if not os.path.exists(data_path):
        raise AirflowException(f"Input data not found at {data_path}")
    
    # Check data quality
    import pandas as pd
    df = pd.read_csv(data_path)
    
    if df.empty:
        raise AirflowException(f"Data file {data_path} is empty")
    
    if df.isnull().sum().sum() > len(df) * 0.1:  # >10% missing values
        raise AirflowException(f"Data quality check failed: too many missing values")
    
    return f"Data validation passed for {len(df)} records"

def safe_api_call(url, **context):
    import requests
    
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        
        if not response.json():
            raise AirflowException("API returned empty response")
        
        return response.json()
        
    except requests.exceptions.Timeout:
        raise AirflowException(f"API call to {url} timed out after 30 seconds")
    except requests.exceptions.ConnectionError:
        raise AirflowException(f"Failed to connect to API at {url}")
    except requests.exceptions.HTTPError as e:
        raise AirflowException(f"HTTP error {e.response.status_code}: {e.response.text}")
    except Exception as e:
        raise AirflowException(f"Unexpected error calling API: {str(e)}")

# Custom operator with proper exception handling
class DataValidationOperator(BaseOperator):
    @apply_defaults
    def __init__(self, validation_rules=None, **kwargs):
        super().__init__(**kwargs)
        self.validation_rules = validation_rules or {}
    
    def execute(self, context):
        try:
            # Perform validation
            result = self._validate_data(context)
            
            if not result['valid']:
                raise AirflowException(
                    f"Data validation failed: {result['errors']}"
                )
            
            return result
            
        except FileNotFoundError as e:
            raise AirflowException(f"Required file not found: {e}")
        except ValueError as e:
            raise AirflowException(f"Data validation error: {e}")
        except Exception as e:
            # Wrap unexpected exceptions
            raise AirflowException(f"Validation failed unexpectedly: {str(e)}")
    
    def _validate_data(self, context):
        # Implementation details
        return {'valid': True, 'errors': []}

Default Arguments Decorator

Function decorator that automatically applies default arguments from DAG configuration, enabling consistent operator parameter management across workflows.

def apply_defaults(func):
    """
    Function decorator that looks for an argument named "default_args" and fills 
    unspecified arguments from it.
    
    Features:
    - Searches for "default_args" parameter and applies missing arguments
    - Provides specific information about missing arguments for debugging
    - Enforces keyword argument usage when initializing operators
    - Integrates with DAG-level default arguments
    
    Parameters:
    - func (callable): Function to decorate (typically __init__ method)
    
    Returns:
    - callable: Decorated function with default argument application
    """

Usage Examples:

from airflow.utils import apply_defaults
from airflow.models import BaseOperator
from datetime import timedelta

class MyCustomOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        my_param,
        optional_param=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.my_param = my_param
        self.optional_param = optional_param
    
    def execute(self, context):
        return f"Executed with {self.my_param}"

# DAG with default arguments
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email_on_retry': False,
    'email': ['admin@example.com']
}

dag = DAG(
    'example_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1)
)

# Operator automatically inherits default_args
task1 = MyCustomOperator(
    task_id='task1',
    my_param='value1',
    dag=dag
    # owner, retries, retry_delay, etc. are applied automatically
)

# Override specific defaults
task2 = MyCustomOperator(
    task_id='task2',
    my_param='value2',
    retries=5,  # Override default retries
    owner='specific_owner',  # Override default owner
    dag=dag
)

# Complex operator with multiple parameter types
class AdvancedOperator(BaseOperator):
    template_fields = ('input_template', 'output_template')
    
    @apply_defaults
    def __init__(
        self,
        input_path,
        output_path,
        processing_config=None,
        input_template=None,
        output_template=None,
        validation_enabled=True,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.input_path = input_path
        self.output_path = output_path
        self.processing_config = processing_config or {}
        self.input_template = input_template
        self.output_template = output_template
        self.validation_enabled = validation_enabled
    
    def execute(self, context):
        # Implementation uses all parameters
        print(f"Processing {self.input_path} -> {self.output_path}")
        print(f"Config: {self.processing_config}")
        return "Processing complete"

# Enhanced default arguments with custom parameters
enhanced_defaults = {
    'owner': 'data_pipeline',
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'processing_config': {'threads': 4, 'memory_limit': '2GB'},
    'validation_enabled': True,
    'email_on_failure': True
}

enhanced_dag = DAG(
    'enhanced_pipeline',
    default_args=enhanced_defaults,
    schedule_interval='@daily'
)

# Operator inherits both standard and custom defaults
advanced_task = AdvancedOperator(
    task_id='advanced_processing',
    input_path='/data/{{ ds }}',
    output_path='/processed/{{ ds }}',
    dag=enhanced_dag
    # All default_args are applied automatically
)

Framework Integration Patterns

Custom Operator Development

from airflow.models import BaseOperator
from airflow.utils import apply_defaults, AirflowException
from airflow.hooks.base_hook import BaseHook

class DatabaseETLOperator(BaseOperator):
    """
    Custom operator that combines multiple framework components.
    """
    template_fields = ('source_sql', 'target_table')
    ui_color = '#4CAF50'
    
    @apply_defaults
    def __init__(
        self,
        source_conn_id,
        target_conn_id,
        source_sql,
        target_table,
        chunk_size=10000,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.source_conn_id = source_conn_id
        self.target_conn_id = target_conn_id
        self.source_sql = source_sql
        self.target_table = target_table
        self.chunk_size = chunk_size
    
    def execute(self, context):
        try:
            # Use hooks for database connectivity
            source_hook = BaseHook.get_hook(self.source_conn_id)
            target_hook = BaseHook.get_hook(self.target_conn_id)
            
            # Extract data
            data = source_hook.get_records(self.source_sql)
            
            if not data:
                raise AirflowException("No data returned from source query")
            
            # Load data in chunks
            for i in range(0, len(data), self.chunk_size):
                chunk = data[i:i + self.chunk_size]
                target_hook.insert_rows(
                    table=self.target_table,
                    rows=chunk
                )
            
            return f"Loaded {len(data)} records to {self.target_table}"
            
        except Exception as e:
            raise AirflowException(f"ETL operation failed: {str(e)}")
    
    def on_kill(self):
        # Cleanup resources when task is killed
        print("ETL operation was killed, performing cleanup")

State and Trigger Rule Combinations

# Complex workflow with multiple paths and trigger rules
def create_robust_workflow():
    # Data processing tasks
    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data,
        dag=dag
    )
    
    validate_task = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data,
        dag=dag
    )
    
    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        dag=dag
    )
    
    # Success path
    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        dag=dag
    )
    
    # Error handling
    error_notification = EmailOperator(
        task_id='error_notification',
        to=['admin@example.com'],
        subject='Pipeline Failed - {{ ds }}',
        html_content='Pipeline failed at task {{ ti.task_id }}',
        trigger_rule=TriggerRule.ONE_FAILED,
        dag=dag
    )
    
    # Cleanup (always runs)
    cleanup_task = BashOperator(
        task_id='cleanup',
        bash_command='rm -rf /tmp/pipeline_{{ ds }}',
        trigger_rule=TriggerRule.ALL_DONE,
        dag=dag
    )
    
    # Set up dependencies
    extract_task >> validate_task >> transform_task >> load_task
    [extract_task, validate_task, transform_task, load_task] >> error_notification
    [load_task, error_notification] >> cleanup_task
    
    return dag

Install with Tessl CLI

npx tessl i tessl/pypi-airflow

docs

core.md

hooks.md

index.md

operators.md

tile.json