Core operators and hooks for Apache Airflow workflow orchestration including BashOperator, PythonOperator, EmailOperator, and essential database and HTTP connectivity
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Essential operators for task execution that form the building blocks of Apache Airflow workflows. These operators handle common execution patterns including shell commands, Python functions, workflow control, and notifications.
Execute shell commands, scripts, and system operations with environment variable support and output capture capabilities.
class BashOperator(BaseOperator):
def __init__(
self,
bash_command,
xcom_push=False,
env=None,
**kwargs
):
"""
Execute a Bash script, command or set of commands.
Parameters:
- bash_command (str): The command, set of commands or reference to a bash script (must be '.sh') to be executed
- xcom_push (bool): If True, the last line written to stdout will be pushed to an XCom when the bash command completes
- env (dict): If not None, defines environment variables for the new process instead of inheriting the current process environment
- **kwargs: Additional BaseOperator parameters
"""
template_fields = ('bash_command',)
template_ext = ('.sh', '.bash')
ui_color = '#f0ede4'
def execute(self, context): ...
def on_kill(self): ...Usage Example:
from airflow.operators.bash_operator import BashOperator
# Simple command execution
bash_task = BashOperator(
task_id='run_bash_script',
bash_command='echo "Processing started at $(date)"',
dag=dag
)
# Script execution with environment variables
script_task = BashOperator(
task_id='run_data_script',
bash_command='/path/to/process_data.sh',
env={'DATA_PATH': '/tmp/data', 'LOG_LEVEL': 'INFO'},
xcom_push=True, # Capture script output
dag=dag
)
# Templated command using Airflow variables
templated_task = BashOperator(
task_id='templated_command',
bash_command='echo "Processing data for {{ ds }}"',
dag=dag
)Execute Python callables with parameter passing, context injection, and template support for dynamic task execution.
class PythonOperator(BaseOperator):
def __init__(
self,
python_callable,
op_args=[],
op_kwargs={},
provide_context=False,
templates_dict=None,
templates_exts=None,
**kwargs
):
"""
Executes a Python callable.
Parameters:
- python_callable (callable): A reference to an object that is callable
- op_args (list, default=None): List of positional arguments that will get unpacked when calling your callable
- op_kwargs (dict, default=None): Dictionary of keyword arguments that will get unpacked in your function
- provide_context (bool): If True, Airflow will pass keyword arguments that can be used in your function
- templates_dict (dict): Dictionary where values are templates that will get templated by Airflow engine
- templates_exts (list): List of file extensions to resolve while processing templated fields
- **kwargs: Additional BaseOperator parameters
"""
template_fields = ('templates_dict',)
template_ext = tuple()
ui_color = '#ffefeb'
def execute(self, context): ...Usage Examples:
from airflow.operators.python_operator import PythonOperator
# Simple function execution
def my_python_function():
print("Task executed successfully")
return "Success"
python_task = PythonOperator(
task_id='run_python_function',
python_callable=my_python_function,
dag=dag
)
# Function with parameters
def process_data(input_path, output_path, **kwargs):
print(f"Processing {input_path} -> {output_path}")
# Processing logic here
return f"Processed {input_path}"
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
op_args=['/tmp/input'],
op_kwargs={'output_path': '/tmp/output'},
dag=dag
)
# Function with Airflow context
def context_aware_function(**context):
execution_date = context['ds']
task_instance = context['ti']
dag = context['dag']
print(f"Execution date: {execution_date}")
print(f"Task ID: {task_instance.task_id}")
return f"Processed for {execution_date}"
context_task = PythonOperator(
task_id='context_function',
python_callable=context_aware_function,
provide_context=True,
dag=dag
)Control workflow execution paths based on runtime conditions with dynamic task selection and conditional execution.
class BranchPythonOperator(PythonOperator):
def __init__(self, **kwargs):
"""
Allows a workflow to "branch" or follow a single path following the execution of this task.
The python_callable should return the task_id to follow. The returned task_id should point
to a task directly downstream from this operator. All other "branches" or directly
downstream tasks are marked with a state of "skipped".
"""
def execute(self, context): ...Usage Example:
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
def choose_branch(**context):
# Decision logic based on context or external conditions
execution_date = context['ds']
# Example: Branch based on day of week
from datetime import datetime
date_obj = datetime.strptime(execution_date, '%Y-%m-%d')
if date_obj.weekday() < 5: # Monday-Friday
return 'weekday_processing'
else: # Weekend
return 'weekend_processing'
branch_task = BranchPythonOperator(
task_id='branch_decision',
python_callable=choose_branch,
provide_context=True,
dag=dag
)
weekday_task = DummyOperator(
task_id='weekday_processing',
dag=dag
)
weekend_task = DummyOperator(
task_id='weekend_processing',
dag=dag
)
# Set up branching
branch_task >> [weekday_task, weekend_task]Stop workflow execution based on conditions while skipping downstream tasks when criteria are not met.
class ShortCircuitOperator(PythonOperator):
def __init__(self, **kwargs):
"""
Allows a workflow to continue only if a condition is met. Otherwise, the workflow
"short-circuits" and downstream tasks are skipped.
The python_callable should return True to continue or False to short-circuit.
Any downstream tasks are marked with a state of "skipped" when condition is False.
"""
def execute(self, context): ...Usage Example:
from airflow.operators.python_operator import ShortCircuitOperator
def check_data_availability(**context):
# Check if required data is available
import os
data_path = f"/data/{context['ds']}"
if os.path.exists(data_path) and os.listdir(data_path):
print(f"Data available for {context['ds']}")
return True
else:
print(f"No data available for {context['ds']}, skipping downstream tasks")
return False
condition_check = ShortCircuitOperator(
task_id='check_data',
python_callable=check_data_availability,
provide_context=True,
dag=dag
)
# These tasks will be skipped if condition_check returns False
downstream_task1 = DummyOperator(task_id='process_data', dag=dag)
downstream_task2 = DummyOperator(task_id='generate_report', dag=dag)
condition_check >> [downstream_task1, downstream_task2]Provide structural elements for DAG organization without performing actual work, useful for workflow visualization and dependency management.
class DummyOperator(BaseOperator):
def __init__(self, **kwargs):
"""
Operator that does literally nothing. It can be used to group tasks in a DAG.
"""
template_fields = tuple()
ui_color = '#e8f7e4'
def execute(self, context): ...Usage Example:
from airflow.operators.dummy_operator import DummyOperator
# Workflow structure and grouping
start_task = DummyOperator(
task_id='workflow_start',
dag=dag
)
data_processing_start = DummyOperator(
task_id='data_processing_start',
dag=dag
)
data_processing_end = DummyOperator(
task_id='data_processing_end',
dag=dag
)
workflow_end = DummyOperator(
task_id='workflow_end',
dag=dag
)
# Create workflow structure
start_task >> data_processing_start
data_processing_start >> [task1, task2, task3] # Parallel processing
[task1, task2, task3] >> data_processing_end
data_processing_end >> workflow_endSend email notifications with template support for dynamic content and file attachments.
class EmailOperator(BaseOperator):
def __init__(
self,
to,
subject,
html_content,
files=None,
**kwargs
):
"""
Sends an email.
Parameters:
- to (str or list): List of emails to send the email to (comma or semicolon delimited if string)
- subject (str): Subject line for the email (templated)
- html_content (str): Content of the email (templated), html markup is allowed
- files (list, default=None): File names to attach in email
- **kwargs: Additional BaseOperator parameters
"""
template_fields = ('subject', 'html_content')
template_ext = ('.html',)
ui_color = '#e6faf9'
def execute(self, context): ...Usage Examples:
from airflow.operators.email_operator import EmailOperator
# Simple notification
email_task = EmailOperator(
task_id='send_notification',
to=['admin@example.com', 'team@example.com'],
subject='Workflow Completed Successfully',
html_content='<h2>Daily ETL process completed at {{ ts }}</h2>',
dag=dag
)
# Detailed report with attachments
report_email = EmailOperator(
task_id='send_report',
to='reports@example.com',
subject='Daily Report - {{ ds }}',
html_content='''
<h1>Daily Processing Report</h1>
<p>Execution Date: {{ ds }}</p>
<p>Task Instance: {{ ti.task_id }}</p>
<p>DAG: {{ dag.dag_id }}</p>
<h2>Summary</h2>
<p>All tasks completed successfully.</p>
''',
files=['/tmp/daily_report.pdf', '/tmp/data_summary.csv'],
dag=dag
)
# Conditional email based on task status
def send_failure_email(**context):
return EmailOperator(
task_id='failure_email',
to=['alerts@example.com'],
subject=f'ALERT: Task Failed - {context["task_instance"].task_id}',
html_content=f'''
<h1>Task Failure Alert</h1>
<p><strong>Task:</strong> {context["task_instance"].task_id}</p>
<p><strong>DAG:</strong> {context["dag"].dag_id}</p>
<p><strong>Execution Date:</strong> {context["ds"]}</p>
<p>Please investigate the failure immediately.</p>
''',
dag=dag
).execute(context)Execute SQL queries and statements against various database systems with connection management, parameter binding, and transaction control.
Execute SQL code in MySQL databases with connection management and parameter support.
class MySqlOperator(BaseOperator):
def __init__(
self,
sql,
mysql_conn_id='mysql_default',
parameters=None,
**kwargs
):
"""
Executes SQL code in a specific MySQL database.
Parameters:
- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
- mysql_conn_id (str): Reference to MySQL connection ID
- parameters (dict): Parameters for SQL query binding
- **kwargs: Additional BaseOperator parameters
"""
template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#ededed'
def execute(self, context): ...Execute SQL code in PostgreSQL databases with autocommit control and parameter binding.
class PostgresOperator(BaseOperator):
def __init__(
self,
sql,
postgres_conn_id='postgres_default',
autocommit=False,
parameters=None,
**kwargs
):
"""
Executes SQL code in a specific PostgreSQL database.
Parameters:
- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
- postgres_conn_id (str): Reference to PostgreSQL connection ID
- autocommit (bool): Enable autocommit for the SQL execution
- parameters (dict): Parameters for SQL query binding
- **kwargs: Additional BaseOperator parameters
"""
template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#ededed'
def execute(self, context): ...Execute SQL code in SQLite databases with lightweight database operations.
class SqliteOperator(BaseOperator):
def __init__(
self,
sql,
sqlite_conn_id='sqlite_default',
parameters=None,
**kwargs
):
"""
Executes SQL code in a specific SQLite database.
Parameters:
- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
- sqlite_conn_id (str): Reference to SQLite connection ID
- parameters (dict): Parameters for SQL query binding
- **kwargs: Additional BaseOperator parameters
"""
template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#ededed'
def execute(self, context): ...Usage Examples:
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.sqlite_operator import SqliteOperator
# MySQL query execution
mysql_task = MySqlOperator(
task_id='run_mysql_query',
mysql_conn_id='mysql_prod',
sql='''
INSERT INTO daily_stats (date, record_count, avg_value)
SELECT '{{ ds }}', COUNT(*), AVG(value)
FROM transactions
WHERE DATE(created_at) = '{{ ds }}'
''',
dag=dag
)
# PostgreSQL with parameters
postgres_task = PostgresOperator(
task_id='update_user_stats',
postgres_conn_id='postgres_warehouse',
sql='''
UPDATE user_metrics
SET last_login = %(login_time)s,
login_count = login_count + 1
WHERE user_id = %(user_id)s
''',
parameters={'login_time': '{{ ts }}', 'user_id': 12345},
autocommit=True,
dag=dag
)
# SQLite file operations
sqlite_task = SqliteOperator(
task_id='local_db_cleanup',
sqlite_conn_id='sqlite_local',
sql='/path/to/cleanup_script.sql',
dag=dag
)Execute HTTP requests against external APIs and web services with response validation and customizable request parameters.
class SimpleHttpOperator(BaseOperator):
def __init__(
self,
endpoint,
method='POST',
data=None,
headers=None,
response_check=None,
extra_options=None,
http_conn_id='http_default',
**kwargs
):
"""
Calls an endpoint on an HTTP system to execute an action.
Parameters:
- endpoint (str): The relative part of the full URL
- method (str): HTTP method to use (default: 'POST')
- data (dict): Data to pass (POST/PUT data or URL params for GET)
- headers (dict): HTTP headers to add to the request
- response_check (callable): Function to validate response (returns True/False)
- extra_options (dict): Extra options for requests library (timeout, ssl, etc.)
- http_conn_id (str): Reference to HTTP connection ID
- **kwargs: Additional BaseOperator parameters
"""
template_fields = ('endpoint',)
template_ext = ()
ui_color = '#f4a460'
def execute(self, context): ...Usage Example:
from airflow.operators.http_operator import SimpleHttpOperator
# POST request with data
api_call = SimpleHttpOperator(
task_id='api_post',
http_conn_id='api_server',
endpoint='/v1/process',
method='POST',
data={'job_id': '{{ dag_run.run_id }}', 'date': '{{ ds }}'},
headers={'Content-Type': 'application/json'},
dag=dag
)
# GET request with response validation
def check_status_code(response):
return response.status_code == 200
status_check = SimpleHttpOperator(
task_id='health_check',
http_conn_id='service_api',
endpoint='/health',
method='GET',
response_check=check_status_code,
dag=dag
)Create complex workflows by embedding sub-DAGs within parent DAGs for modular and reusable workflow components.
class SubDagOperator(BaseOperator):
def __init__(
self,
subdag,
executor=DEFAULT_EXECUTOR,
**kwargs
):
"""
Execute a sub-DAG as part of a larger workflow.
By convention, a sub-DAG's dag_id should be prefixed by its parent and a dot,
as in 'parent.child'.
Parameters:
- subdag (DAG): The DAG object to run as a subdag of the current DAG
- executor (BaseExecutor): Executor to use for the sub-DAG
- **kwargs: Additional BaseOperator parameters (must include 'dag')
"""
template_fields = tuple()
ui_color = '#555'
ui_fgcolor = '#fff'
def execute(self, context): ...Usage Example:
from airflow.operators.subdag_operator import SubDagOperator
from airflow import DAG
from datetime import datetime, timedelta
# Define the sub-DAG
def create_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):
subdag = DAG(
dag_id=f'{parent_dag_id}.{child_dag_id}',
start_date=start_date,
schedule_interval=schedule_interval,
)
# Add tasks to sub-DAG
task1 = DummyOperator(task_id='subtask1', dag=subdag)
task2 = DummyOperator(task_id='subtask2', dag=subdag)
task1 >> task2
return subdag
# Use SubDagOperator in main DAG
subdag_task = SubDagOperator(
task_id='parallel_processing',
subdag=create_subdag(
parent_dag_id='main_dag',
child_dag_id='parallel_processing',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
),
dag=dag
)Monitor external systems and wait for conditions to be met before proceeding with downstream tasks. Sensors periodically check conditions and succeed when criteria are satisfied.
Abstract foundation for all sensor operators providing polling mechanism, timeout handling, and configurable check intervals.
class BaseSensorOperator(BaseOperator):
def __init__(
self,
poke_interval=60,
timeout=60*60*24*7,
**kwargs
):
"""
Base class for all sensor operators that keep executing at intervals until criteria is met.
Parameters:
- poke_interval (int): Time in seconds between each check (default: 60)
- timeout (int): Time in seconds before the task times out and fails (default: 7 days)
- **kwargs: Additional BaseOperator parameters
"""
ui_color = '#e6f1f2'
def poke(self, context):
"""Override this method to define sensor condition check logic."""
def execute(self, context): ...Monitor database conditions by executing SQL queries until specified criteria are met.
class SqlSensor(BaseSensorOperator):
def __init__(
self,
conn_id,
sql,
**kwargs
):
"""
Runs a SQL statement until criteria is met. Succeeds when SQL returns non-empty, non-zero result.
Parameters:
- conn_id (str): The connection ID to run the sensor against
- sql (str): SQL statement to execute. Must return at least one non-zero/non-empty cell to pass
- **kwargs: Additional BaseSensorOperator parameters
"""
template_fields = ('sql',)
template_ext = ('.hql', '.sql')
def poke(self, context): ...Monitor file systems for the presence of files or directories before proceeding with workflow execution.
class HdfsSensor(BaseSensorOperator):
def __init__(
self,
filepath,
hdfs_conn_id='hdfs_default',
**kwargs
):
"""
Waits for a file or folder to appear in HDFS.
Parameters:
- filepath (str): Path to file or directory in HDFS
- hdfs_conn_id (str): Reference to HDFS connection ID
- **kwargs: Additional BaseSensorOperator parameters
"""
template_fields = ('filepath',)
def poke(self, context): ...
class WebHdfsSensor(BaseSensorOperator):
def __init__(
self,
filepath,
webhdfs_conn_id='webhdfs_default',
**kwargs
):
"""
Waits for a file or folder to appear in HDFS via WebHDFS API.
Parameters:
- filepath (str): Path to file or directory in HDFS
- webhdfs_conn_id (str): Reference to WebHDFS connection ID
- **kwargs: Additional BaseSensorOperator parameters
"""
template_fields = ('filepath',)
def poke(self, context): ...Usage Examples:
from airflow.operators.sensors import BaseSensorOperator, SqlSensor, HdfsSensor
# Custom sensor implementation
class DataReadySensor(BaseSensorOperator):
def __init__(self, data_path, **kwargs):
super().__init__(**kwargs)
self.data_path = data_path
def poke(self, context):
import os
return os.path.exists(self.data_path) and os.listdir(self.data_path)
data_sensor = DataReadySensor(
task_id='wait_for_data',
data_path='/data/{{ ds }}',
poke_interval=30, # Check every 30 seconds
timeout=3600, # Timeout after 1 hour
dag=dag
)
# SQL sensor for database monitoring
db_sensor = SqlSensor(
task_id='wait_for_records',
conn_id='postgres_prod',
sql='''
SELECT COUNT(*)
FROM transactions
WHERE DATE(created_at) = '{{ ds }}'
AND status = 'completed'
''',
poke_interval=300, # Check every 5 minutes
dag=dag
)
# HDFS file sensor
file_sensor = HdfsSensor(
task_id='wait_for_hdfs_file',
filepath='/data/raw/{{ ds }}/input.parquet',
hdfs_conn_id='hdfs_cluster',
poke_interval=60,
dag=dag
)
# Chain sensors with processing tasks
data_sensor >> db_sensor >> file_sensor >> processing_taskMost operators support Jinja templating for dynamic content:
# Template variables available in operators
templated_bash = BashOperator(
task_id='templated_bash',
bash_command='echo "Processing {{ ds }} in DAG {{ dag.dag_id }}"',
dag=dag
)
templated_email = EmailOperator(
task_id='templated_email',
to=['admin@example.com'],
subject='Report for {{ ds }}',
html_content='''
<h1>Report Generated</h1>
<p>Date: {{ ds }}</p>
<p>Timestamp: {{ ts }}</p>
<p>Previous Date: {{ prev_ds }}</p>
<p>Next Date: {{ next_ds }}</p>
''',
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-airflow