CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

dag-management.mddocs/

DAG Management

Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks. DAGs represent workflows as code with explicit dependencies, scheduling, and configuration.

Capabilities

DAG Definition

Create and configure DAGs using the traditional class-based approach or modern decorator patterns.

class DAG:
    def __init__(
        self,
        dag_id: str,
        description: str = None,
        schedule: Optional[Union[str, datetime.timedelta]] = None,
        start_date: Optional[datetime.datetime] = None,
        end_date: Optional[datetime.datetime] = None,
        template_searchpath: Optional[Union[str, List[str]]] = None,
        template_undefined: type = jinja2.StrictUndefined,
        user_defined_macros: Optional[Dict] = None,
        user_defined_filters: Optional[Dict] = None,
        default_args: Optional[Dict] = None,
        max_active_tasks: int = 16,
        max_active_runs: int = 16,
        dagrun_timeout: Optional[datetime.timedelta] = None,
        sla_miss_callback: Optional[Callable] = None,
        default_view: str = "tree",
        orientation: str = "LR",
        catchup: bool = True,
        on_success_callback: Optional[Callable] = None,
        on_failure_callback: Optional[Callable] = None,
        tags: Optional[List[str]] = None,
        params: Optional[Dict[str, Any]] = None,
        access_control: Optional[Dict[str, Dict[str, Collection[str]]]] = None,
        is_paused_upon_creation: Optional[bool] = None,
        jinja_environment_kwargs: Optional[Dict] = None,
        render_template_as_native_obj: bool = False,
        owner_links: Optional[Dict[str, str]] = None,
        auto_register: bool = True,
        fail_fast: bool = False,
        dag_display_name: Optional[str] = None,
        max_consecutive_failed_dag_runs: int = 0,
        **kwargs
    ):
        """
        Create a new DAG instance.
        
        Args:
            dag_id: Unique identifier for the DAG
            description: Description of the DAG's purpose
            schedule: How often to run the DAG (cron, timedelta, or None)
            start_date: When the DAG should start being scheduled
            end_date: When the DAG should stop being scheduled (optional)
            default_args: Default arguments applied to all tasks
            catchup: Whether to backfill missed runs
            tags: List of tags for categorization
        """

DAG Decorator

Modern approach to DAG definition using the @dag decorator for cleaner, more Pythonic workflow definition.

@dag(
    dag_id: Optional[str] = None,
    description: Optional[str] = None,
    schedule: Optional[Union[str, timedelta, cron.CronExpression]] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    template_searchpath: Optional[Union[str, List[str]]] = None,
    user_defined_macros: Optional[Dict] = None,
    user_defined_filters: Optional[Dict] = None,
    default_args: Optional[Dict] = None,
    max_active_tasks: int = 16,
    max_active_runs: int = 16,
    dagrun_timeout: Optional[timedelta] = None,
    catchup: bool = True,
    on_success_callback: Optional[Callable] = None,
    on_failure_callback: Optional[Callable] = None,
    tags: Optional[List[str]] = None,
    **kwargs
) -> Callable:
    """
    Decorator to create a DAG from a function.
    
    Args:
        dag_id: Unique identifier (auto-generated from function name if not provided)
        schedule: How often to run the DAG
        start_date: When the DAG should start being scheduled
        catchup: Whether to backfill missed runs
        tags: List of tags for categorization
        
    Returns:
        Decorated function that returns a DAG instance
    """

Usage example:

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    dag_id='modern_workflow',
    schedule=timedelta(hours=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['modern', 'example']
)
def modern_workflow():
    @task
    def process_data():
        return "processed"
    
    process_data()

dag_instance = modern_workflow()

Task Groups

Organize related tasks into logical groups for better DAG visualization and organization.

class TaskGroup:
    def __init__(
        self,
        group_id: str,
        tooltip: str = "",
        dag: Optional[DAG] = None,
        default_args: Optional[Dict] = None,
        prefix_group_id: bool = True,
        parent_group: Optional['TaskGroup'] = None,
        ui_color: str = "CornflowerBlue",
        ui_fgcolor: str = "#000",
        add_suffix_on_collision: bool = False,
        group_display_name: Optional[str] = None,
        **kwargs
    ):
        """
        Create a new task group.
        
        Args:
            group_id: Unique identifier for the group
            tooltip: Tooltip text displayed in the UI
            dag: Parent DAG (auto-detected if not provided)
            prefix_group_id: Whether to prefix task IDs with group ID
        """

@task_group(
    group_id: Optional[str] = None,
    tooltip: str = "",
    default_args: Optional[Dict] = None,
    prefix_group_id: bool = True,
    **kwargs
) -> Callable:
    """
    Decorator to create a task group from a function.
    
    Args:
        group_id: Unique identifier (auto-generated from function name if not provided)
        tooltip: Tooltip text displayed in the UI
        prefix_group_id: Whether to prefix task IDs with group ID
        
    Returns:
        Decorated function that returns a TaskGroup instance
    """

Usage example:

from airflow.decorators import dag, task, task_group

@dag(dag_id='grouped_workflow', start_date=datetime(2024, 1, 1))
def grouped_workflow():
    @task_group(group_id='data_processing')
    def data_processing():
        @task
        def extract():
            return "extracted"
        
        @task  
        def transform(data):
            return f"transformed_{data}"
        
        @task
        def load(data):
            print(f"loading {data}")
        
        data = extract()
        transformed = transform(data)
        load(transformed)
        
        return transformed
    
    processed = data_processing()

dag_instance = grouped_workflow()

DAG Model and Metadata

ORM model representing DAG metadata in the database.

class DagModel:
    """
    ORM model for DAG metadata storage.
    
    Attributes:
        dag_id: Unique DAG identifier
        is_active: Whether the DAG is currently active
        is_paused: Whether the DAG is paused
        last_parsed_time: When the DAG was last parsed
        last_pickled: When the DAG was last pickled
        last_expired: When the DAG last expired
        scheduler_lock: Scheduler lock information
        pickle_id: Pickle ID for serialization
        fileloc: File location of the DAG
        owners: DAG owners
        description: DAG description
        default_view: Default view in the UI
        schedule_interval: DAG schedule interval
        tags: List of DAG tags
    """
    dag_id: str
    is_active: bool
    is_paused: bool
    last_parsed_time: Optional[datetime]
    last_pickled: Optional[datetime]
    last_expired: Optional[datetime]
    scheduler_lock: Optional[bool]
    pickle_id: Optional[int]
    fileloc: str
    owners: str
    description: Optional[str]
    default_view: str
    schedule_interval: Optional[str]
    tags: List[str]

DAG Runs

Represents individual executions of a DAG.

class DagRun:
    """
    ORM model for DAG run instances.
    
    Attributes:
        dag_id: DAG identifier
        execution_date: Execution date for this run
        run_id: Unique run identifier
        state: Current state of the run
        run_type: Type of run (scheduled, manual, backfill)
        external_trigger: Whether triggered externally
        start_date: When the run started
        end_date: When the run ended
        creating_job_id: ID of job that created this run
    """
    dag_id: str
    execution_date: datetime
    run_id: str
    state: str
    run_type: str
    external_trigger: bool
    start_date: Optional[datetime]
    end_date: Optional[datetime]
    creating_job_id: Optional[int]

DAG Utilities

Utility functions for DAG management and organization.

class DagBag:
    def __init__(
        self,
        dag_folder: Optional[str] = None,
        executor: Optional[BaseExecutor] = None,
        include_examples: bool = True,
        safe_mode: bool = True,
        read_dags_from_db: bool = False,
        store_serialized_dags: bool = False,
        load_op_links: bool = True
    ):
        """
        Container for loading and managing multiple DAGs.
        
        Args:
            dag_folder: Directory to scan for DAG files
            include_examples: Whether to include example DAGs
            safe_mode: Whether to use safe mode for parsing
            read_dags_from_db: Whether to read DAGs from database
        """

    def get_dag(self, dag_id: str) -> Optional[DAG]:
        """Get a DAG by ID."""

    def process_file(self, filepath: str) -> List[DAG]:
        """Process a single DAG file."""

    def collect_dags(
        self,
        dag_folder: Optional[str] = None,
        only_if_updated: bool = True,
        include_examples: bool = True,
        safe_mode: bool = True
    ) -> None:
        """Collect DAGs from the specified folder."""

Types

from typing import Union, Optional, List, Dict, Callable, Any
from datetime import datetime, timedelta
import jinja2
from crontab import CronTab

DagRunState = Literal["queued", "running", "success", "failed"]
ScheduleInterval = Union[str, timedelta, cron.CronExpression, None]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json