Core operators and hooks for Apache Airflow workflow orchestration including BashOperator, PythonOperator, EmailOperator, and essential database and HTTP connectivity
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Foundation classes and utilities that provide the essential framework for Apache Airflow operator development, state management, error handling, and workflow control. These components form the building blocks for creating custom operators and managing task execution.
Abstract foundation class that all operators inherit from, providing core task functionality, dependency management, templating support, and DAG integration.
class BaseOperator:
def __init__(
self,
task_id,
owner,
email=None,
email_on_retry=True,
email_on_failure=True,
retries=0,
retry_delay=timedelta(seconds=300),
start_date=None,
end_date=None,
schedule_interval=None,
depends_on_past=False,
wait_for_downstream=False,
dag=None,
params=None,
default_args=None,
adhoc=False,
priority_weight=1,
queue='default',
pool=None,
sla=None,
execution_timeout=None,
on_failure_callback=None,
on_success_callback=None,
on_retry_callback=None,
trigger_rule=TriggerRule.ALL_SUCCESS,
**kwargs
):
"""
Abstract base class for all operators. Contains recursive methods for DAG crawling behavior.
Key Parameters:
- task_id (str): Unique, meaningful identifier for the task
- owner (str): Owner of the task (unix username recommended)
- retries (int): Number of retries before task failure
- retry_delay (timedelta): Delay between retries
- start_date (datetime): Task start date
- depends_on_past (bool): Task instance depends on success of previous schedule
- dag (DAG): The DAG this task belongs to
- trigger_rule (str): Rule for triggering task based on upstream states
- pool (str): Resource pool to use for task execution
- priority_weight (int): Priority weight for task scheduling
"""
template_fields = ()
template_ext = ()
ui_color = '#fff'
ui_fgcolor = '#000'
def execute(self, context):
"""
Execute the task logic (must be implemented by subclasses).
Parameters:
- context (dict): Task execution context containing runtime information
Raises:
- NotImplementedError: If not implemented by subclass
"""
def pre_execute(self, context):
"""Hook called before task execution."""
def post_execute(self, context, result):
"""Hook called after task execution."""
def on_kill(self):
"""Override to perform cleanup when task is killed."""
def set_upstream(self, task_or_task_list):
"""Set upstream task dependencies."""
def set_downstream(self, task_or_task_list):
"""Set downstream task dependencies."""
def __rshift__(self, other):
"""Implement >> operator for task dependencies."""
def __lshift__(self, other):
"""Implement << operator for task dependencies."""Usage Example:
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from datetime import datetime, timedelta
class CustomOperator(BaseOperator):
# Define template fields and UI colors
template_fields = ('input_path', 'output_path')
ui_color = '#87CEEB'
@apply_defaults
def __init__(
self,
input_path,
output_path,
processing_options=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.input_path = input_path
self.output_path = output_path
self.processing_options = processing_options or {}
def execute(self, context):
"""Implement custom task logic."""
print(f"Processing {self.input_path} -> {self.output_path}")
print(f"Execution date: {context['ds']}")
# Custom processing logic here
result = self._process_data()
# Return value for XCom
return result
def _process_data(self):
# Implementation details
return "Processing completed"
def on_kill(self):
"""Cleanup when task is killed."""
print("Task was killed, performing cleanup")
# Using the custom operator
custom_task = CustomOperator(
task_id='custom_processing',
input_path='/data/input/{{ ds }}', # Templated
output_path='/data/output/{{ ds }}', # Templated
processing_options={'threads': 4},
retries=2,
retry_delay=timedelta(minutes=5),
dag=dag
)Constants and utilities for managing task instance states throughout the execution lifecycle with color coding for UI display.
class State:
QUEUED = "queued"
RUNNING = "running"
SUCCESS = "success"
SHUTDOWN = "shutdown"
FAILED = "failed"
UP_FOR_RETRY = "up_for_retry"
UPSTREAM_FAILED = "upstream_failed"
SKIPPED = "skipped"
NONE = "none"
@classmethod
def color(cls, state):
"""
Get UI color for a given state.
Parameters:
- state (str): State name
Returns:
- str: Color string for UI display
"""
@classmethod
def runnable(cls):
"""
Get list of states that are considered runnable.
Returns:
- list: List of runnable state values
"""Usage Examples:
from airflow.utils import State
def check_task_state(**context):
task_instance = context['ti']
# Check current state
if task_instance.state == State.RUNNING:
print("Task is currently running")
elif task_instance.state == State.SUCCESS:
print("Task completed successfully")
elif task_instance.state == State.FAILED:
print("Task failed")
# Get UI color for state
color = State.color(task_instance.state)
print(f"UI color for state '{task_instance.state}': {color}")
# Check if state is runnable
runnable_states = State.runnable()
if task_instance.state in runnable_states:
print("Task is in a runnable state")
# Custom state checking in operators
class StateAwareOperator(BaseOperator):
@apply_defaults
def __init__(self, check_upstream_states=False, **kwargs):
super().__init__(**kwargs)
self.check_upstream_states = check_upstream_states
def execute(self, context):
if self.check_upstream_states:
dag = context['dag']
execution_date = context['execution_date']
for upstream_task_id in self.upstream_task_ids:
ti = dag.get_task(upstream_task_id).get_task_instance(
execution_date=execution_date
)
if ti.state != State.SUCCESS:
raise AirflowException(
f"Upstream task {upstream_task_id} is in state {ti.state}"
)
# Continue with task execution
return "Task completed"Constants defining when tasks should be triggered based on upstream task completion states, enabling complex workflow control patterns.
class TriggerRule:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'Usage Examples:
from airflow.utils import TriggerRule
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
# Task that runs only if all upstream tasks succeed (default)
success_task = DummyOperator(
task_id='all_success_task',
trigger_rule=TriggerRule.ALL_SUCCESS, # Default behavior
dag=dag
)
# Task that runs if any upstream task fails (error handling)
error_handler = PythonOperator(
task_id='error_handler',
python_callable=handle_errors,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag
)
# Task that runs regardless of upstream task states (cleanup)
cleanup_task = DummyOperator(
task_id='cleanup',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag
)
# Task that runs if no upstream tasks failed
continue_task = DummyOperator(
task_id='continue_processing',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
# Always run (ignore dependencies)
monitoring_task = PythonOperator(
task_id='monitoring',
python_callable=send_metrics,
trigger_rule=TriggerRule.DUMMY,
dag=dag
)
# Complex workflow with multiple trigger rules
task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag)
task_c = DummyOperator(task_id='task_c', dag=dag)
# Success path - continues only if all upstream succeed
success_path = DummyOperator(
task_id='success_path',
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
# Failure path - handles any upstream failures
failure_path = PythonOperator(
task_id='failure_path',
python_callable=handle_failure,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag
)
# Cleanup - always runs at the end
final_cleanup = DummyOperator(
task_id='final_cleanup',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag
)
# Set dependencies
[task_a, task_b, task_c] >> success_path
[task_a, task_b, task_c] >> failure_path
[success_path, failure_path] >> final_cleanupBase exception class for Airflow-specific errors with proper error propagation and logging integration.
class AirflowException(Exception):
"""
Base exception class for Airflow-specific errors.
All Airflow operators and hooks should raise this exception type
for proper error handling by the scheduler and executor.
"""
passUsage Examples:
from airflow.utils import AirflowException
def validate_input_data(**context):
data_path = f"/data/{context['ds']}"
# Check if data exists
import os
if not os.path.exists(data_path):
raise AirflowException(f"Input data not found at {data_path}")
# Check data quality
import pandas as pd
df = pd.read_csv(data_path)
if df.empty:
raise AirflowException(f"Data file {data_path} is empty")
if df.isnull().sum().sum() > len(df) * 0.1: # >10% missing values
raise AirflowException(f"Data quality check failed: too many missing values")
return f"Data validation passed for {len(df)} records"
def safe_api_call(url, **context):
import requests
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
if not response.json():
raise AirflowException("API returned empty response")
return response.json()
except requests.exceptions.Timeout:
raise AirflowException(f"API call to {url} timed out after 30 seconds")
except requests.exceptions.ConnectionError:
raise AirflowException(f"Failed to connect to API at {url}")
except requests.exceptions.HTTPError as e:
raise AirflowException(f"HTTP error {e.response.status_code}: {e.response.text}")
except Exception as e:
raise AirflowException(f"Unexpected error calling API: {str(e)}")
# Custom operator with proper exception handling
class DataValidationOperator(BaseOperator):
@apply_defaults
def __init__(self, validation_rules=None, **kwargs):
super().__init__(**kwargs)
self.validation_rules = validation_rules or {}
def execute(self, context):
try:
# Perform validation
result = self._validate_data(context)
if not result['valid']:
raise AirflowException(
f"Data validation failed: {result['errors']}"
)
return result
except FileNotFoundError as e:
raise AirflowException(f"Required file not found: {e}")
except ValueError as e:
raise AirflowException(f"Data validation error: {e}")
except Exception as e:
# Wrap unexpected exceptions
raise AirflowException(f"Validation failed unexpectedly: {str(e)}")
def _validate_data(self, context):
# Implementation details
return {'valid': True, 'errors': []}Function decorator that automatically applies default arguments from DAG configuration, enabling consistent operator parameter management across workflows.
def apply_defaults(func):
"""
Function decorator that looks for an argument named "default_args" and fills
unspecified arguments from it.
Features:
- Searches for "default_args" parameter and applies missing arguments
- Provides specific information about missing arguments for debugging
- Enforces keyword argument usage when initializing operators
- Integrates with DAG-level default arguments
Parameters:
- func (callable): Function to decorate (typically __init__ method)
Returns:
- callable: Decorated function with default argument application
"""Usage Examples:
from airflow.utils import apply_defaults
from airflow.models import BaseOperator
from datetime import timedelta
class MyCustomOperator(BaseOperator):
@apply_defaults
def __init__(
self,
my_param,
optional_param=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.my_param = my_param
self.optional_param = optional_param
def execute(self, context):
return f"Executed with {self.my_param}"
# DAG with default arguments
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': False,
'email': ['admin@example.com']
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1)
)
# Operator automatically inherits default_args
task1 = MyCustomOperator(
task_id='task1',
my_param='value1',
dag=dag
# owner, retries, retry_delay, etc. are applied automatically
)
# Override specific defaults
task2 = MyCustomOperator(
task_id='task2',
my_param='value2',
retries=5, # Override default retries
owner='specific_owner', # Override default owner
dag=dag
)
# Complex operator with multiple parameter types
class AdvancedOperator(BaseOperator):
template_fields = ('input_template', 'output_template')
@apply_defaults
def __init__(
self,
input_path,
output_path,
processing_config=None,
input_template=None,
output_template=None,
validation_enabled=True,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.input_path = input_path
self.output_path = output_path
self.processing_config = processing_config or {}
self.input_template = input_template
self.output_template = output_template
self.validation_enabled = validation_enabled
def execute(self, context):
# Implementation uses all parameters
print(f"Processing {self.input_path} -> {self.output_path}")
print(f"Config: {self.processing_config}")
return "Processing complete"
# Enhanced default arguments with custom parameters
enhanced_defaults = {
'owner': 'data_pipeline',
'retries': 3,
'retry_delay': timedelta(minutes=10),
'processing_config': {'threads': 4, 'memory_limit': '2GB'},
'validation_enabled': True,
'email_on_failure': True
}
enhanced_dag = DAG(
'enhanced_pipeline',
default_args=enhanced_defaults,
schedule_interval='@daily'
)
# Operator inherits both standard and custom defaults
advanced_task = AdvancedOperator(
task_id='advanced_processing',
input_path='/data/{{ ds }}',
output_path='/processed/{{ ds }}',
dag=enhanced_dag
# All default_args are applied automatically
)from airflow.models import BaseOperator
from airflow.utils import apply_defaults, AirflowException
from airflow.hooks.base_hook import BaseHook
class DatabaseETLOperator(BaseOperator):
"""
Custom operator that combines multiple framework components.
"""
template_fields = ('source_sql', 'target_table')
ui_color = '#4CAF50'
@apply_defaults
def __init__(
self,
source_conn_id,
target_conn_id,
source_sql,
target_table,
chunk_size=10000,
**kwargs
):
super().__init__(**kwargs)
self.source_conn_id = source_conn_id
self.target_conn_id = target_conn_id
self.source_sql = source_sql
self.target_table = target_table
self.chunk_size = chunk_size
def execute(self, context):
try:
# Use hooks for database connectivity
source_hook = BaseHook.get_hook(self.source_conn_id)
target_hook = BaseHook.get_hook(self.target_conn_id)
# Extract data
data = source_hook.get_records(self.source_sql)
if not data:
raise AirflowException("No data returned from source query")
# Load data in chunks
for i in range(0, len(data), self.chunk_size):
chunk = data[i:i + self.chunk_size]
target_hook.insert_rows(
table=self.target_table,
rows=chunk
)
return f"Loaded {len(data)} records to {self.target_table}"
except Exception as e:
raise AirflowException(f"ETL operation failed: {str(e)}")
def on_kill(self):
# Cleanup resources when task is killed
print("ETL operation was killed, performing cleanup")# Complex workflow with multiple paths and trigger rules
def create_robust_workflow():
# Data processing tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag
)
validate_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag
)
# Success path
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
# Error handling
error_notification = EmailOperator(
task_id='error_notification',
to=['admin@example.com'],
subject='Pipeline Failed - {{ ds }}',
html_content='Pipeline failed at task {{ ti.task_id }}',
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag
)
# Cleanup (always runs)
cleanup_task = BashOperator(
task_id='cleanup',
bash_command='rm -rf /tmp/pipeline_{{ ds }}',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag
)
# Set up dependencies
extract_task >> validate_task >> transform_task >> load_task
[extract_task, validate_task, transform_task, load_task] >> error_notification
[load_task, error_notification] >> cleanup_task
return dagInstall with Tessl CLI
npx tessl i tessl/pypi-airflow