Core operators and hooks for Apache Airflow workflow orchestration including BashOperator, PythonOperator, EmailOperator, and essential database and HTTP connectivity
npx @tessl/cli install tessl/pypi-airflow@1.6.0A platform to programmatically author, schedule and monitor workflows. Airflow allows you to author workflows as Directed Acyclic Graphs (DAGs) of tasks, execute them on an array of workers while following specified dependencies, and provides rich monitoring and troubleshooting capabilities.
pip install airflowBasic operator imports:
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.sqlite_operator import SqliteOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.sensors import BaseSensorOperator, SqlSensor, HdfsSensorHook imports:
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.dbapi_hook import DbApiHook
from airflow.hooks.http_hook import HttpHookCore utilities:
from airflow.models import BaseOperator, DAG
from airflow.utils import State, TriggerRule, AirflowException, apply_defaultsfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
# Define DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'example_workflow',
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False
)
# Define tasks
start_task = DummyOperator(
task_id='start',
dag=dag
)
def process_data(**context):
print(f"Processing data for {context['ds']}")
return "Data processed successfully"
python_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
provide_context=True,
dag=dag
)
bash_task = BashOperator(
task_id='cleanup',
bash_command='echo "Cleanup completed"',
dag=dag
)
# Set dependencies
start_task >> python_task >> bash_taskApache Airflow's architecture centers around several key concepts:
Operators vs Hooks: Operators focus on task execution and workflow logic, while hooks abstract external system connectivity. Operators often use hooks internally for data source interactions.
Task Lifecycle: Tasks progress through states (queued → running → success/failed) with support for retries, upstream dependency checking, and conditional execution via trigger rules.
Essential operators for task execution including bash commands, Python functions, workflow branching, and email notifications. These operators form the building blocks of most Airflow workflows.
class BashOperator(BaseOperator):
def __init__(self, bash_command, xcom_push=False, env=None, **kwargs): ...
class PythonOperator(BaseOperator):
def __init__(self, python_callable, op_args=[], op_kwargs={}, provide_context=False, **kwargs): ...
class DummyOperator(BaseOperator):
def __init__(self, **kwargs): ...
class EmailOperator(BaseOperator):
def __init__(self, to, subject, html_content, files=None, **kwargs): ...SQL execution operators for various database systems including MySQL, PostgreSQL, and SQLite with connection management, parameter binding, and transaction control.
class MySqlOperator(BaseOperator):
def __init__(self, sql, mysql_conn_id='mysql_default', parameters=None, **kwargs): ...
class PostgresOperator(BaseOperator):
def __init__(self, sql, postgres_conn_id='postgres_default', autocommit=False, parameters=None, **kwargs): ...
class SqliteOperator(BaseOperator):
def __init__(self, sql, sqlite_conn_id='sqlite_default', parameters=None, **kwargs): ...Execute HTTP requests against external APIs and web services with response validation and customizable request parameters.
class SimpleHttpOperator(BaseOperator):
def __init__(self, endpoint, method='POST', data=None, headers=None,
response_check=None, http_conn_id='http_default', **kwargs): ...Monitor external systems and wait for conditions to be met with configurable polling intervals and timeout handling.
class BaseSensorOperator(BaseOperator):
def __init__(self, poke_interval=60, timeout=60*60*24*7, **kwargs): ...
class SqlSensor(BaseSensorOperator):
def __init__(self, conn_id, sql, **kwargs): ...
class HdfsSensor(BaseSensorOperator):
def __init__(self, filepath, hdfs_conn_id='hdfs_default', **kwargs): ...Create complex workflows with sub-DAGs for modular and reusable workflow components.
class SubDagOperator(BaseOperator):
def __init__(self, subdag, executor=DEFAULT_EXECUTOR, **kwargs): ...Hooks provide standardized interfaces for connecting to external systems including databases, HTTP APIs, and custom services with built-in connection management and error handling.
class BaseHook:
@classmethod
def get_connection(cls, conn_id): ...
@classmethod
def get_connections(cls, conn_id): ...
class DbApiHook(BaseHook):
def get_records(self, sql, parameters=None): ...
def run(self, sql, autocommit=False, parameters=None): ...
class HttpHook(BaseHook):
def run(self, endpoint, data=None, headers=None, extra_options=None): ...Base classes and utility functions that provide the core framework for operator development, state management, error handling, and workflow control.
class BaseOperator:
def __init__(self, task_id, owner='airflow', retries=0, **kwargs): ...
def execute(self, context): ...
class State:
QUEUED = "queued"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
@apply_defaults
def operator_constructor(self, **kwargs): ...