CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cli-utilities.mddocs/

CLI and Utilities

Command-line interface, context utilities, dependency management, and workflow orchestration helpers. Airflow provides extensive CLI tools and utility functions for workflow management.

Capabilities

Context Utilities

Access execution context and runtime information.

def get_current_context() -> Context:
    """
    Get the current task execution context.
    
    Returns:
        Current execution context with task, DAG, and runtime information
    """

def get_parsing_context() -> Context:
    """
    Get the DAG parsing context.
    
    Returns:
        Context available during DAG parsing
    """

class Context:
    """Task execution context."""
    # Core objects
    task_instance: TaskInstance
    task: BaseOperator
    dag: DAG
    dag_run: DagRun
    
    # Execution info
    execution_date: datetime
    logical_date: datetime
    data_interval_start: datetime
    data_interval_end: datetime
    
    # Formatted dates
    ds: str           # YYYY-MM-DD
    ds_nodash: str    # YYYYMMDD
    ts: str           # ISO timestamp
    ts_nodash: str    # Timestamp without separators
    
    # Configuration
    params: Dict[str, Any]
    var: Dict[str, Any]
    conf: Dict[str, Any]
    
    # XCom access
    ti: TaskInstance  # For XCom operations

Dependency Management

Manage task dependencies and execution order.

def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:
    """
    Chain tasks in sequence: task1 >> task2 >> task3.
    
    Args:
        *tasks: Tasks to chain in order
    """

def chain_linear(*tasks: BaseOperator) -> None:
    """
    Chain tasks linearly with explicit ordering.
    
    Args:
        *tasks: Tasks to chain linearly
    """

def cross_downstream(
    from_tasks: Sequence[BaseOperator],
    to_tasks: Sequence[BaseOperator]
) -> None:
    """
    Set all tasks in from_tasks as upstream of all tasks in to_tasks.
    
    Args:
        from_tasks: Upstream tasks
        to_tasks: Downstream tasks
    """

Usage example:

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain, cross_downstream

@dag(dag_id='dependency_example', start_date=datetime(2024, 1, 1))
def dependency_example():
    @task
    def start():
        return "started"
    
    @task
    def extract_a():
        return "data_a"
    
    @task
    def extract_b():
        return "data_b"
    
    @task
    def transform_a(data):
        return f"transformed_{data}"
    
    @task
    def transform_b(data):
        return f"transformed_{data}"
    
    @task
    def combine(data_a, data_b):
        return f"combined: {data_a}, {data_b}"
    
    @task
    def end():
        return "finished"
    
    # Set up dependencies
    start_task = start()
    extract_a_task = extract_a()
    extract_b_task = extract_b()
    transform_a_task = transform_a(extract_a_task)
    transform_b_task = transform_b(extract_b_task)
    combine_task = combine(transform_a_task, transform_b_task)
    end_task = end()
    
    # Chain: start >> [extract_a, extract_b] >> [transform_a, transform_b] >> combine >> end
    chain(
        start_task,
        [extract_a_task, extract_b_task],
        [transform_a_task, transform_b_task],
        combine_task,
        end_task
    )

dag_instance = dependency_example()

Template Utilities

Template rendering and macro functions.

def render_template(
    template: str,
    context: Context,
    jinja_env: Optional[Environment] = None
) -> str:
    """
    Render Jinja template with context.
    
    Args:
        template: Template string
        context: Execution context
        jinja_env: Jinja environment
        
    Returns:
        Rendered template
    """

def render_template_from_field(
    operator: BaseOperator,
    field: str,
    context: Context
) -> Any:
    """
    Render template field from operator.
    
    Args:
        operator: Operator instance
        field: Field name to render
        context: Execution context
        
    Returns:
        Rendered field value
    """

# Built-in template functions
def ds_add(ds: str, days: int) -> str:
    """Add days to date string (YYYY-MM-DD format)."""

def ds_format(ds: str, input_format: str, output_format: str) -> str:
    """Format date string from one format to another."""

def macros_datetime(dt: datetime) -> datetime:
    """Access datetime in templates."""

def macros_timedelta(**kwargs) -> timedelta:
    """Create timedelta in templates."""

Date and Time Utilities

Common date/time operations for workflow scheduling.

from airflow.utils.dates import days_ago, round_time, infer_time_unit

def days_ago(n: int, hour: int = 0, minute: int = 0, second: int = 0) -> datetime:
    """
    Get datetime n days ago.
    
    Args:
        n: Number of days ago
        hour: Hour of day
        minute: Minute of hour
        second: Second of minute
        
    Returns:
        Datetime n days ago
    """

def round_time(dt: datetime, delta: timedelta) -> datetime:
    """
    Round datetime to nearest delta interval.
    
    Args:
        dt: Datetime to round
        delta: Rounding interval
        
    Returns:
        Rounded datetime
    """

def infer_time_unit(time_seconds_arr: List[float]) -> str:
    """
    Infer appropriate time unit from array of seconds.
    
    Args:
        time_seconds_arr: Array of time values in seconds
        
    Returns:
        Appropriate unit ('seconds', 'minutes', 'hours', 'days')
    """

State Management

Utilities for managing task and DAG states.

from airflow.utils.state import State, DagRunState, TaskInstanceState

class State:
    """Base state management."""
    
    @classmethod
    def task_states(cls) -> Set[str]:
        """Get all task states."""
    
    @classmethod
    def dag_states(cls) -> Set[str]:
        """Get all DAG states."""
    
    @classmethod
    def finished_states(cls) -> Set[str]:
        """Get terminal states."""
    
    @classmethod
    def unfinished_states(cls) -> Set[str]:
        """Get non-terminal states."""

def clear_task_instances(
    tis: List[TaskInstance],
    session: Session,
    dag: Optional[DAG] = None
) -> None:
    """
    Clear task instances for retry.
    
    Args:
        tis: Task instances to clear
        session: Database session
        dag: Optional DAG instance
    """

Logging Utilities

Logging configuration and utilities.

from airflow.utils.log.logging_mixin import LoggingMixin

class LoggingMixin:
    """Mixin for adding logging to classes."""
    
    @property
    def logger(self) -> logging.Logger:
        """Get logger instance."""
    
    def log_info(self, message: str) -> None:
        """Log info message."""
    
    def log_warning(self, message: str) -> None:
        """Log warning message."""
    
    def log_error(self, message: str) -> None:
        """Log error message."""

def configure_logging() -> None:
    """Configure Airflow logging system."""

def setup_logging(filename: Optional[str] = None) -> None:
    """Setup logging configuration."""

Types

from typing import Union, Optional, List, Dict, Any, Sequence, Set
from datetime import datetime, timedelta
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context

TaskLike = Union[BaseOperator, Sequence[BaseOperator]]
StateType = str

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json