Programmatically author, schedule and monitor data pipelines
npx @tessl/cli install tessl/pypi-apache-airflow@3.0.0Apache Airflow is a comprehensive platform for programmatically authoring, scheduling, and monitoring data workflows and pipelines. It enables developers to define workflows as directed acyclic graphs (DAGs) of tasks with rich dependency management, featuring a powerful scheduler that executes tasks across worker arrays while respecting specified dependencies.
pip install apache-airflowimport airflow
from airflow import DAG, Asset, XComArgCommon for working with DAGs and tasks:
from airflow import DAG
from airflow.decorators import dag, task
from airflow.models import BaseOperator
from airflow.utils.dates import days_agofrom datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
# Define default arguments
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# Create DAG using decorator
@dag(
dag_id='example_workflow',
default_args=default_args,
description='A simple example DAG',
schedule_interval=timedelta(days=1),
catchup=False,
tags=['example']
)
def example_workflow():
@task
def extract_data():
"""Extract data from source"""
return {'data': [1, 2, 3, 4, 5]}
@task
def transform_data(data):
"""Transform the extracted data"""
return {'transformed': [x * 2 for x in data['data']]}
@task
def load_data(data):
"""Load transformed data"""
print(f"Loading: {data}")
return True
# Define task dependencies
raw_data = extract_data()
transformed_data = transform_data(raw_data)
load_data(transformed_data)
# Instantiate the DAG
dag_instance = example_workflow()Apache Airflow follows a distributed architecture with several key components:
The platform transforms workflow definitions into versionable, testable, and collaborative code, making it ideal for data engineering teams building complex data processing pipelines, ETL workflows, machine learning orchestration, and automated task scheduling.
Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks including DAG decorators, task groups, and dependency management.
class DAG:
def __init__(
self,
dag_id: str,
description: str = None,
schedule_interval: Optional[Union[str, datetime.timedelta]] = None,
start_date: Optional[datetime.datetime] = None,
end_date: Optional[datetime.datetime] = None,
**kwargs
): ...
@dag(
dag_id: str,
description: Optional[str] = None,
schedule: Optional[Union[str, timedelta]] = None,
start_date: Optional[datetime] = None,
**kwargs
) -> Callable: ...Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management.
class BaseOperator:
def __init__(
self,
task_id: str,
owner: str = "airflow",
retries: int = None,
retry_delay: timedelta = None,
**kwargs
): ...
@task(
task_id: Optional[str] = None,
python_callable: Optional[Callable] = None,
**kwargs
) -> Callable: ...Asset-driven scheduling system for creating data-aware workflows, including asset definitions, timetables, and dependency management.
class Asset:
def __init__(
self,
uri: str,
name: Optional[str] = None,
group: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None
): ...
class AssetAlias:
def __init__(self, name: str): ...System configuration, variables, parameters, and connection management for workflow orchestration.
class Variable:
@classmethod
def get(cls, key: str, default_var: Any = None) -> Any: ...
@classmethod
def set(cls, key: str, value: Any) -> None: ...
class Param:
def __init__(
self,
default: Any = None,
description: Optional[str] = None,
**kwargs
): ...Cross-communication system for sharing data between tasks including XComArg, custom backends, and serialization.
class XComArg:
def __init__(
self,
operator: BaseOperator,
key: str = None
): ...
class XCom:
@classmethod
def get_one(
cls,
task_id: str,
dag_id: str,
key: str = None,
execution_date: datetime = None
) -> Any: ...Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development.
class BaseExecutor:
def __init__(self, parallelism: int = 32): ...
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None
) -> None: ...
class LocalExecutor(BaseExecutor):
def __init__(self, parallelism: int = 0): ...Command-line interface, context utilities, dependency management, and workflow orchestration helpers.
def get_current_context() -> Context: ...
def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None: ...
def cross_downstream(
from_tasks: Sequence[BaseOperator],
to_tasks: Sequence[BaseOperator]
) -> None: ...Comprehensive exception hierarchy for error handling, timeout management, and workflow recovery.
class AirflowException(Exception): ...
class AirflowTaskTimeout(AirflowException): ...
class AirflowSensorTimeout(AirflowException): ...
class AirflowRescheduleException(AirflowException): ...ORM models for DAGs, tasks, runs, connections, and metadata storage with SQLAlchemy integration.
class DagModel:
dag_id: str
is_active: bool
last_parsed_time: datetime
next_dagrun: datetime
class TaskInstance:
task_id: str
dag_id: str
execution_date: datetime
state: strPlugin system, provider packages, operator links, notifications, and custom component development.
class BaseOperatorLink:
name: str = None
def get_link(
self,
operator: BaseOperator,
dttm: datetime
) -> str: ...
class BaseNotifier:
def __init__(self, **kwargs): ...
def notify(self, context: Context) -> None: ...