CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-airflow

Core operators and hooks for Apache Airflow workflow orchestration including BashOperator, PythonOperator, EmailOperator, and essential database and HTTP connectivity

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

operators.mddocs/

Core Operators

Essential operators for task execution that form the building blocks of Apache Airflow workflows. These operators handle common execution patterns including shell commands, Python functions, workflow control, and notifications.

Capabilities

Bash Command Execution

Execute shell commands, scripts, and system operations with environment variable support and output capture capabilities.

class BashOperator(BaseOperator):
    def __init__(
        self, 
        bash_command,
        xcom_push=False,
        env=None,
        **kwargs
    ):
        """
        Execute a Bash script, command or set of commands.

        Parameters:
        - bash_command (str): The command, set of commands or reference to a bash script (must be '.sh') to be executed
        - xcom_push (bool): If True, the last line written to stdout will be pushed to an XCom when the bash command completes
        - env (dict): If not None, defines environment variables for the new process instead of inheriting the current process environment
        - **kwargs: Additional BaseOperator parameters
        """

    template_fields = ('bash_command',)
    template_ext = ('.sh', '.bash')
    ui_color = '#f0ede4'

    def execute(self, context): ...
    def on_kill(self): ...

Usage Example:

from airflow.operators.bash_operator import BashOperator

# Simple command execution
bash_task = BashOperator(
    task_id='run_bash_script',
    bash_command='echo "Processing started at $(date)"',
    dag=dag
)

# Script execution with environment variables
script_task = BashOperator(
    task_id='run_data_script',
    bash_command='/path/to/process_data.sh',
    env={'DATA_PATH': '/tmp/data', 'LOG_LEVEL': 'INFO'},
    xcom_push=True,  # Capture script output
    dag=dag
)

# Templated command using Airflow variables
templated_task = BashOperator(
    task_id='templated_command',
    bash_command='echo "Processing data for {{ ds }}"',
    dag=dag
)

Python Function Execution

Execute Python callables with parameter passing, context injection, and template support for dynamic task execution.

class PythonOperator(BaseOperator):
    def __init__(
        self,
        python_callable,
        op_args=[],
        op_kwargs={},
        provide_context=False,
        templates_dict=None,
        templates_exts=None,
        **kwargs
    ):
        """
        Executes a Python callable.

        Parameters:
        - python_callable (callable): A reference to an object that is callable
        - op_args (list, default=None): List of positional arguments that will get unpacked when calling your callable
        - op_kwargs (dict, default=None): Dictionary of keyword arguments that will get unpacked in your function
        - provide_context (bool): If True, Airflow will pass keyword arguments that can be used in your function
        - templates_dict (dict): Dictionary where values are templates that will get templated by Airflow engine
        - templates_exts (list): List of file extensions to resolve while processing templated fields
        - **kwargs: Additional BaseOperator parameters
        """

    template_fields = ('templates_dict',)
    template_ext = tuple()
    ui_color = '#ffefeb'

    def execute(self, context): ...

Usage Examples:

from airflow.operators.python_operator import PythonOperator

# Simple function execution
def my_python_function():
    print("Task executed successfully")
    return "Success"

python_task = PythonOperator(
    task_id='run_python_function',
    python_callable=my_python_function,
    dag=dag
)

# Function with parameters
def process_data(input_path, output_path, **kwargs):
    print(f"Processing {input_path} -> {output_path}")
    # Processing logic here
    return f"Processed {input_path}"

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    op_args=['/tmp/input'],
    op_kwargs={'output_path': '/tmp/output'},
    dag=dag
)

# Function with Airflow context
def context_aware_function(**context):
    execution_date = context['ds']
    task_instance = context['ti']
    dag = context['dag']
    
    print(f"Execution date: {execution_date}")
    print(f"Task ID: {task_instance.task_id}")
    return f"Processed for {execution_date}"

context_task = PythonOperator(
    task_id='context_function',
    python_callable=context_aware_function,
    provide_context=True,
    dag=dag
)

Workflow Branching

Control workflow execution paths based on runtime conditions with dynamic task selection and conditional execution.

class BranchPythonOperator(PythonOperator):
    def __init__(self, **kwargs):
        """
        Allows a workflow to "branch" or follow a single path following the execution of this task.
        
        The python_callable should return the task_id to follow. The returned task_id should point 
        to a task directly downstream from this operator. All other "branches" or directly 
        downstream tasks are marked with a state of "skipped".
        """

    def execute(self, context): ...

Usage Example:

from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

def choose_branch(**context):
    # Decision logic based on context or external conditions
    execution_date = context['ds']
    
    # Example: Branch based on day of week
    from datetime import datetime
    date_obj = datetime.strptime(execution_date, '%Y-%m-%d')
    
    if date_obj.weekday() < 5:  # Monday-Friday
        return 'weekday_processing'
    else:  # Weekend
        return 'weekend_processing'

branch_task = BranchPythonOperator(
    task_id='branch_decision',
    python_callable=choose_branch,
    provide_context=True,
    dag=dag
)

weekday_task = DummyOperator(
    task_id='weekday_processing',
    dag=dag
)

weekend_task = DummyOperator(
    task_id='weekend_processing', 
    dag=dag
)

# Set up branching
branch_task >> [weekday_task, weekend_task]

Conditional Workflow Continuation

Stop workflow execution based on conditions while skipping downstream tasks when criteria are not met.

class ShortCircuitOperator(PythonOperator):
    def __init__(self, **kwargs):
        """
        Allows a workflow to continue only if a condition is met. Otherwise, the workflow 
        "short-circuits" and downstream tasks are skipped.
        
        The python_callable should return True to continue or False to short-circuit.
        Any downstream tasks are marked with a state of "skipped" when condition is False.
        """

    def execute(self, context): ...

Usage Example:

from airflow.operators.python_operator import ShortCircuitOperator

def check_data_availability(**context):
    # Check if required data is available
    import os
    data_path = f"/data/{context['ds']}"
    
    if os.path.exists(data_path) and os.listdir(data_path):
        print(f"Data available for {context['ds']}")
        return True
    else:
        print(f"No data available for {context['ds']}, skipping downstream tasks")
        return False

condition_check = ShortCircuitOperator(
    task_id='check_data',
    python_callable=check_data_availability,
    provide_context=True,
    dag=dag
)

# These tasks will be skipped if condition_check returns False
downstream_task1 = DummyOperator(task_id='process_data', dag=dag)
downstream_task2 = DummyOperator(task_id='generate_report', dag=dag)

condition_check >> [downstream_task1, downstream_task2]

Workflow Placeholders and Grouping

Provide structural elements for DAG organization without performing actual work, useful for workflow visualization and dependency management.

class DummyOperator(BaseOperator):
    def __init__(self, **kwargs):
        """
        Operator that does literally nothing. It can be used to group tasks in a DAG.
        """

    template_fields = tuple()
    ui_color = '#e8f7e4'

    def execute(self, context): ...

Usage Example:

from airflow.operators.dummy_operator import DummyOperator

# Workflow structure and grouping
start_task = DummyOperator(
    task_id='workflow_start',
    dag=dag
)

data_processing_start = DummyOperator(
    task_id='data_processing_start',
    dag=dag
)

data_processing_end = DummyOperator(
    task_id='data_processing_end',
    dag=dag
)

workflow_end = DummyOperator(
    task_id='workflow_end',
    dag=dag
)

# Create workflow structure
start_task >> data_processing_start
data_processing_start >> [task1, task2, task3]  # Parallel processing
[task1, task2, task3] >> data_processing_end
data_processing_end >> workflow_end

Email Notifications

Send email notifications with template support for dynamic content and file attachments.

class EmailOperator(BaseOperator):
    def __init__(
        self,
        to,
        subject,
        html_content,
        files=None,
        **kwargs
    ):
        """
        Sends an email.

        Parameters:
        - to (str or list): List of emails to send the email to (comma or semicolon delimited if string)
        - subject (str): Subject line for the email (templated)
        - html_content (str): Content of the email (templated), html markup is allowed
        - files (list, default=None): File names to attach in email
        - **kwargs: Additional BaseOperator parameters
        """

    template_fields = ('subject', 'html_content')
    template_ext = ('.html',)
    ui_color = '#e6faf9'

    def execute(self, context): ...

Usage Examples:

from airflow.operators.email_operator import EmailOperator

# Simple notification
email_task = EmailOperator(
    task_id='send_notification',
    to=['admin@example.com', 'team@example.com'],
    subject='Workflow Completed Successfully',
    html_content='<h2>Daily ETL process completed at {{ ts }}</h2>',
    dag=dag
)

# Detailed report with attachments
report_email = EmailOperator(
    task_id='send_report',
    to='reports@example.com',
    subject='Daily Report - {{ ds }}',
    html_content='''
    <h1>Daily Processing Report</h1>
    <p>Execution Date: {{ ds }}</p>
    <p>Task Instance: {{ ti.task_id }}</p>
    <p>DAG: {{ dag.dag_id }}</p>
    <h2>Summary</h2>
    <p>All tasks completed successfully.</p>
    ''',
    files=['/tmp/daily_report.pdf', '/tmp/data_summary.csv'],
    dag=dag
)

# Conditional email based on task status
def send_failure_email(**context):
    return EmailOperator(
        task_id='failure_email',
        to=['alerts@example.com'],
        subject=f'ALERT: Task Failed - {context["task_instance"].task_id}',
        html_content=f'''
        <h1>Task Failure Alert</h1>
        <p><strong>Task:</strong> {context["task_instance"].task_id}</p>
        <p><strong>DAG:</strong> {context["dag"].dag_id}</p>
        <p><strong>Execution Date:</strong> {context["ds"]}</p>
        <p>Please investigate the failure immediately.</p>
        ''',
        dag=dag
    ).execute(context)

Database Operations

Execute SQL queries and statements against various database systems with connection management, parameter binding, and transaction control.

MySQL Operations

Execute SQL code in MySQL databases with connection management and parameter support.

class MySqlOperator(BaseOperator):
    def __init__(
        self,
        sql,
        mysql_conn_id='mysql_default',
        parameters=None,
        **kwargs
    ):
        """
        Executes SQL code in a specific MySQL database.

        Parameters:
        - sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
        - mysql_conn_id (str): Reference to MySQL connection ID
        - parameters (dict): Parameters for SQL query binding
        - **kwargs: Additional BaseOperator parameters
        """

    template_fields = ('sql',)
    template_ext = ('.sql',)
    ui_color = '#ededed'

    def execute(self, context): ...

PostgreSQL Operations

Execute SQL code in PostgreSQL databases with autocommit control and parameter binding.

class PostgresOperator(BaseOperator):
    def __init__(
        self,
        sql,
        postgres_conn_id='postgres_default',
        autocommit=False,
        parameters=None,
        **kwargs
    ):
        """
        Executes SQL code in a specific PostgreSQL database.

        Parameters:
        - sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
        - postgres_conn_id (str): Reference to PostgreSQL connection ID
        - autocommit (bool): Enable autocommit for the SQL execution
        - parameters (dict): Parameters for SQL query binding
        - **kwargs: Additional BaseOperator parameters
        """

    template_fields = ('sql',)
    template_ext = ('.sql',)
    ui_color = '#ededed'

    def execute(self, context): ...

SQLite Operations

Execute SQL code in SQLite databases with lightweight database operations.

class SqliteOperator(BaseOperator):
    def __init__(
        self,
        sql,
        sqlite_conn_id='sqlite_default',
        parameters=None,
        **kwargs
    ):
        """
        Executes SQL code in a specific SQLite database.

        Parameters:
        - sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
        - sqlite_conn_id (str): Reference to SQLite connection ID
        - parameters (dict): Parameters for SQL query binding
        - **kwargs: Additional BaseOperator parameters
        """

    template_fields = ('sql',)
    template_ext = ('.sql',)
    ui_color = '#ededed'

    def execute(self, context): ...

Usage Examples:

from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.sqlite_operator import SqliteOperator

# MySQL query execution
mysql_task = MySqlOperator(
    task_id='run_mysql_query',
    mysql_conn_id='mysql_prod',
    sql='''
        INSERT INTO daily_stats (date, record_count, avg_value)
        SELECT '{{ ds }}', COUNT(*), AVG(value)
        FROM transactions
        WHERE DATE(created_at) = '{{ ds }}'
    ''',
    dag=dag
)

# PostgreSQL with parameters
postgres_task = PostgresOperator(
    task_id='update_user_stats',
    postgres_conn_id='postgres_warehouse',
    sql='''
        UPDATE user_metrics 
        SET last_login = %(login_time)s,
            login_count = login_count + 1
        WHERE user_id = %(user_id)s
    ''',
    parameters={'login_time': '{{ ts }}', 'user_id': 12345},
    autocommit=True,
    dag=dag
)

# SQLite file operations
sqlite_task = SqliteOperator(
    task_id='local_db_cleanup',
    sqlite_conn_id='sqlite_local',
    sql='/path/to/cleanup_script.sql',
    dag=dag
)

HTTP Operations

Execute HTTP requests against external APIs and web services with response validation and customizable request parameters.

class SimpleHttpOperator(BaseOperator):
    def __init__(
        self,
        endpoint,
        method='POST',
        data=None,
        headers=None,
        response_check=None,
        extra_options=None,
        http_conn_id='http_default',
        **kwargs
    ):
        """
        Calls an endpoint on an HTTP system to execute an action.

        Parameters:
        - endpoint (str): The relative part of the full URL
        - method (str): HTTP method to use (default: 'POST')
        - data (dict): Data to pass (POST/PUT data or URL params for GET)
        - headers (dict): HTTP headers to add to the request
        - response_check (callable): Function to validate response (returns True/False)
        - extra_options (dict): Extra options for requests library (timeout, ssl, etc.)
        - http_conn_id (str): Reference to HTTP connection ID
        - **kwargs: Additional BaseOperator parameters
        """

    template_fields = ('endpoint',)
    template_ext = ()
    ui_color = '#f4a460'

    def execute(self, context): ...

Usage Example:

from airflow.operators.http_operator import SimpleHttpOperator

# POST request with data
api_call = SimpleHttpOperator(
    task_id='api_post',
    http_conn_id='api_server',
    endpoint='/v1/process',
    method='POST',
    data={'job_id': '{{ dag_run.run_id }}', 'date': '{{ ds }}'},
    headers={'Content-Type': 'application/json'},
    dag=dag
)

# GET request with response validation
def check_status_code(response):
    return response.status_code == 200

status_check = SimpleHttpOperator(
    task_id='health_check',
    http_conn_id='service_api',
    endpoint='/health',
    method='GET',
    response_check=check_status_code,
    dag=dag
)

Workflow Composition

Create complex workflows by embedding sub-DAGs within parent DAGs for modular and reusable workflow components.

class SubDagOperator(BaseOperator):
    def __init__(
        self,
        subdag,
        executor=DEFAULT_EXECUTOR,
        **kwargs
    ):
        """
        Execute a sub-DAG as part of a larger workflow.
        
        By convention, a sub-DAG's dag_id should be prefixed by its parent and a dot,
        as in 'parent.child'.

        Parameters:
        - subdag (DAG): The DAG object to run as a subdag of the current DAG
        - executor (BaseExecutor): Executor to use for the sub-DAG
        - **kwargs: Additional BaseOperator parameters (must include 'dag')
        """

    template_fields = tuple()
    ui_color = '#555'
    ui_fgcolor = '#fff'

    def execute(self, context): ...

Usage Example:

from airflow.operators.subdag_operator import SubDagOperator
from airflow import DAG
from datetime import datetime, timedelta

# Define the sub-DAG
def create_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):
    subdag = DAG(
        dag_id=f'{parent_dag_id}.{child_dag_id}',
        start_date=start_date,
        schedule_interval=schedule_interval,
    )
    
    # Add tasks to sub-DAG
    task1 = DummyOperator(task_id='subtask1', dag=subdag)
    task2 = DummyOperator(task_id='subtask2', dag=subdag)
    task1 >> task2
    
    return subdag

# Use SubDagOperator in main DAG
subdag_task = SubDagOperator(
    task_id='parallel_processing',
    subdag=create_subdag(
        parent_dag_id='main_dag',
        child_dag_id='parallel_processing',
        start_date=datetime(2023, 1, 1),
        schedule_interval='@daily'
    ),
    dag=dag
)

Sensor Operations

Monitor external systems and wait for conditions to be met before proceeding with downstream tasks. Sensors periodically check conditions and succeed when criteria are satisfied.

Base Sensor Framework

Abstract foundation for all sensor operators providing polling mechanism, timeout handling, and configurable check intervals.

class BaseSensorOperator(BaseOperator):
    def __init__(
        self,
        poke_interval=60,
        timeout=60*60*24*7,
        **kwargs
    ):
        """
        Base class for all sensor operators that keep executing at intervals until criteria is met.

        Parameters:
        - poke_interval (int): Time in seconds between each check (default: 60)
        - timeout (int): Time in seconds before the task times out and fails (default: 7 days)
        - **kwargs: Additional BaseOperator parameters
        """

    ui_color = '#e6f1f2'

    def poke(self, context):
        """Override this method to define sensor condition check logic."""
        
    def execute(self, context): ...

SQL-based Sensors

Monitor database conditions by executing SQL queries until specified criteria are met.

class SqlSensor(BaseSensorOperator):
    def __init__(
        self,
        conn_id,
        sql,
        **kwargs
    ):
        """
        Runs a SQL statement until criteria is met. Succeeds when SQL returns non-empty, non-zero result.

        Parameters:
        - conn_id (str): The connection ID to run the sensor against
        - sql (str): SQL statement to execute. Must return at least one non-zero/non-empty cell to pass
        - **kwargs: Additional BaseSensorOperator parameters
        """

    template_fields = ('sql',)
    template_ext = ('.hql', '.sql')

    def poke(self, context): ...

File System Sensors

Monitor file systems for the presence of files or directories before proceeding with workflow execution.

class HdfsSensor(BaseSensorOperator):
    def __init__(
        self,
        filepath,
        hdfs_conn_id='hdfs_default',
        **kwargs
    ):
        """
        Waits for a file or folder to appear in HDFS.

        Parameters:
        - filepath (str): Path to file or directory in HDFS
        - hdfs_conn_id (str): Reference to HDFS connection ID
        - **kwargs: Additional BaseSensorOperator parameters
        """

    template_fields = ('filepath',)

    def poke(self, context): ...

class WebHdfsSensor(BaseSensorOperator):
    def __init__(
        self,
        filepath,
        webhdfs_conn_id='webhdfs_default',
        **kwargs
    ):
        """
        Waits for a file or folder to appear in HDFS via WebHDFS API.

        Parameters:
        - filepath (str): Path to file or directory in HDFS
        - webhdfs_conn_id (str): Reference to WebHDFS connection ID
        - **kwargs: Additional BaseSensorOperator parameters
        """

    template_fields = ('filepath',)

    def poke(self, context): ...

Usage Examples:

from airflow.operators.sensors import BaseSensorOperator, SqlSensor, HdfsSensor

# Custom sensor implementation
class DataReadySensor(BaseSensorOperator):
    def __init__(self, data_path, **kwargs):
        super().__init__(**kwargs)
        self.data_path = data_path
    
    def poke(self, context):
        import os
        return os.path.exists(self.data_path) and os.listdir(self.data_path)

data_sensor = DataReadySensor(
    task_id='wait_for_data',
    data_path='/data/{{ ds }}',
    poke_interval=30,  # Check every 30 seconds
    timeout=3600,      # Timeout after 1 hour
    dag=dag
)

# SQL sensor for database monitoring
db_sensor = SqlSensor(
    task_id='wait_for_records',
    conn_id='postgres_prod',
    sql='''
        SELECT COUNT(*) 
        FROM transactions 
        WHERE DATE(created_at) = '{{ ds }}' 
        AND status = 'completed'
    ''',
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

# HDFS file sensor
file_sensor = HdfsSensor(
    task_id='wait_for_hdfs_file',
    filepath='/data/raw/{{ ds }}/input.parquet',
    hdfs_conn_id='hdfs_cluster',
    poke_interval=60,
    dag=dag
)

# Chain sensors with processing tasks
data_sensor >> db_sensor >> file_sensor >> processing_task

Template Support

Most operators support Jinja templating for dynamic content:

# Template variables available in operators
templated_bash = BashOperator(
    task_id='templated_bash',
    bash_command='echo "Processing {{ ds }} in DAG {{ dag.dag_id }}"',
    dag=dag
)

templated_email = EmailOperator(
    task_id='templated_email',
    to=['admin@example.com'],
    subject='Report for {{ ds }}',
    html_content='''
    <h1>Report Generated</h1>
    <p>Date: {{ ds }}</p>
    <p>Timestamp: {{ ts }}</p>
    <p>Previous Date: {{ prev_ds }}</p>
    <p>Next Date: {{ next_ds }}</p>
    ''',
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-airflow

docs

core.md

hooks.md

index.md

operators.md

tile.json