CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

task-operators.mddocs/

Task Operators

Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management. Tasks represent individual units of work within DAGs.

Capabilities

Base Operator

Foundation class for all Airflow operators, providing core task functionality and lifecycle management.

class BaseOperator:
    def __init__(
        self,
        task_id: str,
        owner: str = "airflow",
        email: Optional[Union[str, List[str]]] = None,
        email_on_retry: bool = True,
        email_on_failure: bool = True,
        retries: Optional[int] = None,
        retry_delay: timedelta = timedelta(seconds=300),
        retry_exponential_backoff: bool = False,
        max_retry_delay: Optional[timedelta] = None,
        start_date: Optional[datetime] = None,
        end_date: Optional[datetime] = None,
        depends_on_past: bool = False,
        wait_for_downstream: bool = False,
        dag: Optional[DAG] = None,
        params: Optional[Dict[str, Any]] = None,
        default_args: Optional[Dict[str, Any]] = None,
        pool: Optional[str] = None,
        pool_slots: int = 1,
        queue: Optional[str] = None,
        priority_weight: int = 1,
        weight_rule: str = "downstream",
        sla: Optional[timedelta] = None,
        execution_timeout: Optional[timedelta] = None,
        on_execute_callback: Optional[Callable] = None,
        on_failure_callback: Optional[Callable] = None,
        on_success_callback: Optional[Callable] = None,
        on_retry_callback: Optional[Callable] = None,
        trigger_rule: str = "all_success",
        resources: Optional[Dict[str, Any]] = None,
        run_as_user: Optional[str] = None,
        task_concurrency: Optional[int] = None,
        max_active_tis_per_dag: Optional[int] = None,
        executor_config: Optional[Dict[str, Any]] = None,
        do_xcom_push: bool = True,
        inlets: Optional[List[Any]] = None,
        outlets: Optional[List[Any]] = None,
        task_group: Optional[TaskGroup] = None,
        doc: Optional[str] = None,
        doc_md: Optional[str] = None,
        doc_json: Optional[str] = None,
        doc_yaml: Optional[str] = None,
        doc_rst: Optional[str] = None,
        wait_for_past_depends_before_skipping: bool = False,
        max_active_tis_per_dagrun: Optional[int] = None,
        map_index_template: Optional[str] = None,
        multiple_outputs: bool = False,
        task_display_name: Optional[str] = None,
        logger_name: Optional[str] = None,
        allow_nested_operators: bool = True,
        **kwargs
    ):
        """
        Base operator for all Airflow tasks.
        
        Args:
            task_id: Unique identifier for the task
            owner: Owner of the task
            retries: Number of retries when task fails
            retry_delay: Delay between retries
            start_date: When the task should start being scheduled
            end_date: When the task should stop being scheduled
            depends_on_past: Whether task depends on previous run success
            pool: Resource pool for task execution
            priority_weight: Task priority for execution order
            trigger_rule: Rule for task triggering based on upstream tasks
            execution_timeout: Maximum runtime before task timeout
        """

    def execute(self, context: Context) -> Any:
        """Execute the task logic. Must be implemented by subclasses."""

    def on_kill(self) -> None:
        """Called when task is killed for cleanup."""

    def defer(self, trigger: BaseTrigger, method_name: str, **kwargs) -> None:
        """Defer task execution to a trigger."""

    def resume_execution(self, context: Context, event: Dict[str, Any]) -> Any:
        """Resume execution after deferral."""

    def render_template_fields(
        self, 
        context: Context, 
        jinja_env: Optional[jinja2.Environment] = None
    ) -> None:
        """Render Jinja templates in task fields."""

Task Decorator

Modern approach to task definition using decorators for cleaner, more Pythonic task creation.

@task(
    task_id: Optional[str] = None,
    python_callable: Optional[Callable] = None,
    op_args: Optional[List[Any]] = None,
    op_kwargs: Optional[Dict[str, Any]] = None,
    templates_dict: Optional[Dict[str, Any]] = None,
    templates_exts: Optional[List[str]] = None,
    show_return_value_in_logs: bool = True,
    **kwargs
) -> Callable:
    """
    Decorator to create a task from a Python function.
    
    Args:
        task_id: Unique identifier (auto-generated from function name if not provided)
        python_callable: The Python function to execute
        op_args: Positional arguments to pass to the function
        op_kwargs: Keyword arguments to pass to the function
        templates_dict: Dictionary of templates to render
        show_return_value_in_logs: Whether to log return value
        
    Returns:
        Decorated function that returns task output
    """

@task.setup(
    task_id: Optional[str] = None,
    **kwargs
) -> Callable:
    """
    Decorator for setup tasks that run before other tasks.
    
    Args:
        task_id: Unique identifier
        **kwargs: Additional task arguments
        
    Returns:
        Decorated function for setup task
    """

@task.teardown(
    task_id: Optional[str] = None,
    **kwargs
) -> Callable:
    """
    Decorator for teardown tasks that run after other tasks.
    
    Args:
        task_id: Unique identifier
        **kwargs: Additional task arguments
        
    Returns:
        Decorated function for teardown task
    """

Usage example:

from airflow.decorators import dag, task
from datetime import datetime

@dag(dag_id='task_decorator_example', start_date=datetime(2024, 1, 1))
def task_decorator_example():
    @task
    def extract_data(source: str) -> dict:
        """Extract data from source."""
        return {'data': f'extracted from {source}', 'count': 100}
    
    @task
    def transform_data(data: dict) -> dict:
        """Transform the data."""
        return {
            'transformed_data': data['data'].upper(),
            'processed_count': data['count'] * 2
        }
    
    @task.setup
    def setup_environment():
        """Setup task that runs first."""
        print("Setting up environment")
    
    @task.teardown
    def cleanup():
        """Cleanup task that runs last."""
        print("Cleaning up")
    
    # Define dependencies
    setup_environment()
    raw_data = extract_data('database')
    processed_data = transform_data(raw_data)
    cleanup()

dag_instance = task_decorator_example()

Dynamic Task Mapping

Create tasks dynamically at runtime based on input data or external conditions.

class MappedOperator:
    """
    Operator created through dynamic task mapping.
    
    Attributes:
        task_id: Base task identifier
        operator_class: Original operator class
        mapped_op_kwargs: Mapped operator arguments
        partial_kwargs: Static operator arguments
    """
    task_id: str
    operator_class: type
    mapped_op_kwargs: Dict[str, Any]
    partial_kwargs: Dict[str, Any]

    def expand(self, **mapped_kwargs) -> 'MappedOperator':
        """
        Expand operator with mapped arguments.
        
        Args:
            **mapped_kwargs: Arguments to map over
            
        Returns:
            MappedOperator instance
        """

    def partial(self, **partial_kwargs) -> 'MappedOperator':
        """
        Set static arguments for mapped operator.
        
        Args:
            **partial_kwargs: Static arguments
            
        Returns:
            Partially configured MappedOperator
        """

Usage example:

from airflow.decorators import dag, task

@dag(dag_id='dynamic_mapping_example', start_date=datetime(2024, 1, 1))
def dynamic_mapping_example():
    @task
    def get_file_list() -> List[str]:
        """Get list of files to process."""
        return ['file1.csv', 'file2.csv', 'file3.csv']
    
    @task
    def process_file(filename: str) -> str:
        """Process a single file."""
        return f"processed {filename}"
    
    # Dynamic mapping - creates one task per file
    files = get_file_list()
    process_file.expand(filename=files)

dag_instance = dynamic_mapping_example()

Task Instance

Represents a specific execution of a task within a DAG run.

class TaskInstance:
    """
    ORM model for task instance execution.
    
    Attributes:
        task_id: Task identifier
        dag_id: DAG identifier
        run_id: DAG run identifier
        execution_date: Execution date
        start_date: When task started
        end_date: When task ended
        duration: Task execution duration
        state: Current task state
        try_number: Current retry attempt
        max_tries: Maximum retry attempts
        hostname: Worker hostname
        unixname: Unix username
        job_id: Job identifier
        pool: Resource pool
        pool_slots: Number of pool slots used
        queue: Execution queue
        priority_weight: Task priority
        operator: Operator class name
        queued_dttm: When task was queued
        pid: Process ID
        executor_config: Executor configuration
        external_executor_id: External executor identifier
        trigger_id: Trigger identifier (for deferred tasks)
        next_method: Next method to call
        next_kwargs: Arguments for next method
    """
    id: Optional[UUID]
    task_id: str
    dag_id: str
    run_id: str
    map_index: int
    execution_date: datetime
    start_date: Optional[datetime]
    end_date: Optional[datetime]
    duration: Optional[float]
    state: Optional[str]
    try_number: int
    max_tries: int
    hostname: str
    unixname: str
    job_id: Optional[int]
    pool: str
    pool_slots: int
    queue: str
    priority_weight: int
    operator: str
    queued_dttm: Optional[datetime]
    pid: Optional[int]
    executor_config: Optional[Dict]
    external_executor_id: Optional[str]
    trigger_id: Optional[int]
    next_method: Optional[str]
    next_kwargs: Optional[Dict]

    def clear_task_instances(
        self,
        tis: List['TaskInstance'],
        session: Session = None,
        dag: Optional[DAG] = None
    ) -> None:
        """Clear task instances for retry."""

    def get_task_instance(
        self,
        task_id: str,
        execution_date: datetime,
        session: Session = None
    ) -> Optional['TaskInstance']:
        """Get task instance by ID and execution date."""

Task Dependencies

Manage dependencies between tasks using various trigger rules and patterns.

def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:
    """
    Chain tasks sequentially.
    
    Args:
        *tasks: Tasks to chain in order
    """

def chain_linear(*tasks: BaseOperator) -> None:
    """
    Chain tasks in a linear sequence.
    
    Args:
        *tasks: Tasks to chain linearly
    """

def cross_downstream(
    from_tasks: Sequence[BaseOperator],
    to_tasks: Sequence[BaseOperator]
) -> None:
    """
    Create dependencies from all tasks in from_tasks to all tasks in to_tasks.
    
    Args:
        from_tasks: Source tasks
        to_tasks: Target tasks
    """

Usage example:

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain, cross_downstream

@dag(dag_id='dependencies_example', start_date=datetime(2024, 1, 1))
def dependencies_example():
    @task
    def start():
        return "started"
    
    @task
    def process_a():
        return "a"
    
    @task
    def process_b():
        return "b"
    
    @task
    def combine():
        return "combined"
    
    @task
    def end():
        return "ended"
    
    # Linear chain
    start_task = start()
    process_a_task = process_a()
    process_b_task = process_b()
    combine_task = combine()
    end_task = end()
    
    # Set up dependencies
    chain(start_task, [process_a_task, process_b_task], combine_task, end_task)

dag_instance = dependencies_example()

Task States and Lifecycle

Task execution states and lifecycle management.

from airflow.utils.state import TaskInstanceState

# Task States
class TaskInstanceState:
    """Task instance states."""
    NONE: str = "none"
    SCHEDULED: str = "scheduled"
    QUEUED: str = "queued"
    RUNNING: str = "running"
    SUCCESS: str = "success"
    SHUTDOWN: str = "shutdown"
    RESTARTING: str = "restarting"
    FAILED: str = "failed"
    UP_FOR_RETRY: str = "up_for_retry"
    UP_FOR_RESCHEDULE: str = "up_for_reschedule"
    UPSTREAM_FAILED: str = "upstream_failed"
    SKIPPED: str = "skipped"
    REMOVED: str = "removed"
    DEFERRED: str = "deferred"

# Trigger Rules
TRIGGER_RULES = [
    "all_success",      # All upstream tasks succeeded
    "all_failed",       # All upstream tasks failed
    "all_done",         # All upstream tasks completed (success or failed)
    "one_success",      # At least one upstream task succeeded
    "one_failed",       # At least one upstream task failed
    "none_failed",      # No upstream tasks failed
    "none_failed_min_one_success",  # No failures and at least one success
    "none_skipped",     # No upstream tasks skipped
    "always",           # Always run regardless of upstream state
]

Types

from typing import Union, Optional, List, Dict, Callable, Any, Sequence
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.models.taskgroup import TaskGroup
from airflow.utils.context import Context

TaskState = Literal[
    "none", "scheduled", "queued", "running", "success", 
    "failed", "up_for_retry", "upstream_failed", "skipped"
]

TriggerRule = Literal[
    "all_success", "all_failed", "all_done", "one_success", 
    "one_failed", "none_failed", "always"
]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json