or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

assets-scheduling.mdcli-utilities.mdconfiguration.mddag-management.mddatabase-models.mdexceptions.mdexecutors.mdextensions.mdindex.mdtask-operators.mdxcom.md
tile.json

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow@3.0.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow@3.0.0

index.mddocs/

Apache Airflow

Apache Airflow is a comprehensive platform for programmatically authoring, scheduling, and monitoring data workflows and pipelines. It enables developers to define workflows as directed acyclic graphs (DAGs) of tasks with rich dependency management, featuring a powerful scheduler that executes tasks across worker arrays while respecting specified dependencies.

Package Information

  • Package Name: apache-airflow
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow
  • Version: 3.0.6

Core Imports

import airflow
from airflow import DAG, Asset, XComArg

Common for working with DAGs and tasks:

from airflow import DAG
from airflow.decorators import dag, task
from airflow.models import BaseOperator
from airflow.utils.dates import days_ago

Basic Usage

from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task

# Define default arguments
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Create DAG using decorator
@dag(
    dag_id='example_workflow',
    default_args=default_args,
    description='A simple example DAG',
    schedule_interval=timedelta(days=1),
    catchup=False,
    tags=['example']
)
def example_workflow():
    
    @task
    def extract_data():
        """Extract data from source"""
        return {'data': [1, 2, 3, 4, 5]}
    
    @task
    def transform_data(data):
        """Transform the extracted data"""
        return {'transformed': [x * 2 for x in data['data']]}
    
    @task
    def load_data(data):
        """Load transformed data"""
        print(f"Loading: {data}")
        return True
    
    # Define task dependencies
    raw_data = extract_data()
    transformed_data = transform_data(raw_data)
    load_data(transformed_data)

# Instantiate the DAG
dag_instance = example_workflow()

Architecture

Apache Airflow follows a distributed architecture with several key components:

  • Scheduler: Orchestrates task execution based on dependencies and schedules
  • Executor: Runs tasks on worker nodes (Local, Celery, Kubernetes, etc.)
  • Web Server: Provides the web UI for monitoring and managing workflows
  • Worker Nodes: Execute individual tasks in the workflow
  • Metadata Database: Stores DAG definitions, task states, and execution history
  • Task SDK: Core definitions and utilities for task execution (new in 3.0)

The platform transforms workflow definitions into versionable, testable, and collaborative code, making it ideal for data engineering teams building complex data processing pipelines, ETL workflows, machine learning orchestration, and automated task scheduling.

Capabilities

DAG Management

Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks including DAG decorators, task groups, and dependency management.

class DAG:
    def __init__(
        self,
        dag_id: str,
        description: str = None,
        schedule_interval: Optional[Union[str, datetime.timedelta]] = None,
        start_date: Optional[datetime.datetime] = None,
        end_date: Optional[datetime.datetime] = None,
        **kwargs
    ): ...

@dag(
    dag_id: str,
    description: Optional[str] = None,
    schedule: Optional[Union[str, timedelta]] = None,
    start_date: Optional[datetime] = None,
    **kwargs
) -> Callable: ...

DAG Management

Task Operators

Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management.

class BaseOperator:
    def __init__(
        self,
        task_id: str,
        owner: str = "airflow",
        retries: int = None,
        retry_delay: timedelta = None,
        **kwargs
    ): ...

@task(
    task_id: Optional[str] = None,
    python_callable: Optional[Callable] = None,
    **kwargs
) -> Callable: ...

Task Operators

Assets and Scheduling

Asset-driven scheduling system for creating data-aware workflows, including asset definitions, timetables, and dependency management.

class Asset:
    def __init__(
        self,
        uri: str,
        name: Optional[str] = None,
        group: Optional[str] = None,
        extra: Optional[Dict[str, Any]] = None
    ): ...

class AssetAlias:
    def __init__(self, name: str): ...

Assets and Scheduling

Configuration Management

System configuration, variables, parameters, and connection management for workflow orchestration.

class Variable:
    @classmethod
    def get(cls, key: str, default_var: Any = None) -> Any: ...
    
    @classmethod
    def set(cls, key: str, value: Any) -> None: ...

class Param:
    def __init__(
        self,
        default: Any = None,
        description: Optional[str] = None,
        **kwargs
    ): ...

Configuration

Cross-Communication (XCom)

Cross-communication system for sharing data between tasks including XComArg, custom backends, and serialization.

class XComArg:
    def __init__(
        self,
        operator: BaseOperator,
        key: str = None
    ): ...

class XCom:
    @classmethod
    def get_one(
        cls,
        task_id: str,
        dag_id: str,
        key: str = None,
        execution_date: datetime = None
    ) -> Any: ...

Cross-Communication

Executors

Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development.

class BaseExecutor:
    def __init__(self, parallelism: int = 32): ...
    
    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: Optional[str] = None
    ) -> None: ...

class LocalExecutor(BaseExecutor):
    def __init__(self, parallelism: int = 0): ...

Executors

CLI and Utilities

Command-line interface, context utilities, dependency management, and workflow orchestration helpers.

def get_current_context() -> Context: ...

def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None: ...

def cross_downstream(
    from_tasks: Sequence[BaseOperator],
    to_tasks: Sequence[BaseOperator]
) -> None: ...

CLI and Utilities

Exception Handling

Comprehensive exception hierarchy for error handling, timeout management, and workflow recovery.

class AirflowException(Exception): ...

class AirflowTaskTimeout(AirflowException): ...

class AirflowSensorTimeout(AirflowException): ...

class AirflowRescheduleException(AirflowException): ...

Exception Handling

Database Models

ORM models for DAGs, tasks, runs, connections, and metadata storage with SQLAlchemy integration.

class DagModel:
    dag_id: str
    is_active: bool
    last_parsed_time: datetime
    next_dagrun: datetime

class TaskInstance:
    task_id: str
    dag_id: str
    execution_date: datetime
    state: str

Database Models

Extensions and Providers

Plugin system, provider packages, operator links, notifications, and custom component development.

class BaseOperatorLink:
    name: str = None
    
    def get_link(
        self,
        operator: BaseOperator,
        dttm: datetime
    ) -> str: ...

class BaseNotifier:
    def __init__(self, **kwargs): ...
    
    def notify(self, context: Context) -> None: ...

Extensions and Providers