Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Command-line interface, context utilities, dependency management, and workflow orchestration helpers. Airflow provides extensive CLI tools and utility functions for workflow management.
Access execution context and runtime information.
def get_current_context() -> Context:
"""
Get the current task execution context.
Returns:
Current execution context with task, DAG, and runtime information
"""
def get_parsing_context() -> Context:
"""
Get the DAG parsing context.
Returns:
Context available during DAG parsing
"""
class Context:
"""Task execution context."""
# Core objects
task_instance: TaskInstance
task: BaseOperator
dag: DAG
dag_run: DagRun
# Execution info
execution_date: datetime
logical_date: datetime
data_interval_start: datetime
data_interval_end: datetime
# Formatted dates
ds: str # YYYY-MM-DD
ds_nodash: str # YYYYMMDD
ts: str # ISO timestamp
ts_nodash: str # Timestamp without separators
# Configuration
params: Dict[str, Any]
var: Dict[str, Any]
conf: Dict[str, Any]
# XCom access
ti: TaskInstance # For XCom operationsManage task dependencies and execution order.
def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:
"""
Chain tasks in sequence: task1 >> task2 >> task3.
Args:
*tasks: Tasks to chain in order
"""
def chain_linear(*tasks: BaseOperator) -> None:
"""
Chain tasks linearly with explicit ordering.
Args:
*tasks: Tasks to chain linearly
"""
def cross_downstream(
from_tasks: Sequence[BaseOperator],
to_tasks: Sequence[BaseOperator]
) -> None:
"""
Set all tasks in from_tasks as upstream of all tasks in to_tasks.
Args:
from_tasks: Upstream tasks
to_tasks: Downstream tasks
"""Usage example:
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain, cross_downstream
@dag(dag_id='dependency_example', start_date=datetime(2024, 1, 1))
def dependency_example():
@task
def start():
return "started"
@task
def extract_a():
return "data_a"
@task
def extract_b():
return "data_b"
@task
def transform_a(data):
return f"transformed_{data}"
@task
def transform_b(data):
return f"transformed_{data}"
@task
def combine(data_a, data_b):
return f"combined: {data_a}, {data_b}"
@task
def end():
return "finished"
# Set up dependencies
start_task = start()
extract_a_task = extract_a()
extract_b_task = extract_b()
transform_a_task = transform_a(extract_a_task)
transform_b_task = transform_b(extract_b_task)
combine_task = combine(transform_a_task, transform_b_task)
end_task = end()
# Chain: start >> [extract_a, extract_b] >> [transform_a, transform_b] >> combine >> end
chain(
start_task,
[extract_a_task, extract_b_task],
[transform_a_task, transform_b_task],
combine_task,
end_task
)
dag_instance = dependency_example()Template rendering and macro functions.
def render_template(
template: str,
context: Context,
jinja_env: Optional[Environment] = None
) -> str:
"""
Render Jinja template with context.
Args:
template: Template string
context: Execution context
jinja_env: Jinja environment
Returns:
Rendered template
"""
def render_template_from_field(
operator: BaseOperator,
field: str,
context: Context
) -> Any:
"""
Render template field from operator.
Args:
operator: Operator instance
field: Field name to render
context: Execution context
Returns:
Rendered field value
"""
# Built-in template functions
def ds_add(ds: str, days: int) -> str:
"""Add days to date string (YYYY-MM-DD format)."""
def ds_format(ds: str, input_format: str, output_format: str) -> str:
"""Format date string from one format to another."""
def macros_datetime(dt: datetime) -> datetime:
"""Access datetime in templates."""
def macros_timedelta(**kwargs) -> timedelta:
"""Create timedelta in templates."""Common date/time operations for workflow scheduling.
from airflow.utils.dates import days_ago, round_time, infer_time_unit
def days_ago(n: int, hour: int = 0, minute: int = 0, second: int = 0) -> datetime:
"""
Get datetime n days ago.
Args:
n: Number of days ago
hour: Hour of day
minute: Minute of hour
second: Second of minute
Returns:
Datetime n days ago
"""
def round_time(dt: datetime, delta: timedelta) -> datetime:
"""
Round datetime to nearest delta interval.
Args:
dt: Datetime to round
delta: Rounding interval
Returns:
Rounded datetime
"""
def infer_time_unit(time_seconds_arr: List[float]) -> str:
"""
Infer appropriate time unit from array of seconds.
Args:
time_seconds_arr: Array of time values in seconds
Returns:
Appropriate unit ('seconds', 'minutes', 'hours', 'days')
"""Utilities for managing task and DAG states.
from airflow.utils.state import State, DagRunState, TaskInstanceState
class State:
"""Base state management."""
@classmethod
def task_states(cls) -> Set[str]:
"""Get all task states."""
@classmethod
def dag_states(cls) -> Set[str]:
"""Get all DAG states."""
@classmethod
def finished_states(cls) -> Set[str]:
"""Get terminal states."""
@classmethod
def unfinished_states(cls) -> Set[str]:
"""Get non-terminal states."""
def clear_task_instances(
tis: List[TaskInstance],
session: Session,
dag: Optional[DAG] = None
) -> None:
"""
Clear task instances for retry.
Args:
tis: Task instances to clear
session: Database session
dag: Optional DAG instance
"""Logging configuration and utilities.
from airflow.utils.log.logging_mixin import LoggingMixin
class LoggingMixin:
"""Mixin for adding logging to classes."""
@property
def logger(self) -> logging.Logger:
"""Get logger instance."""
def log_info(self, message: str) -> None:
"""Log info message."""
def log_warning(self, message: str) -> None:
"""Log warning message."""
def log_error(self, message: str) -> None:
"""Log error message."""
def configure_logging() -> None:
"""Configure Airflow logging system."""
def setup_logging(filename: Optional[str] = None) -> None:
"""Setup logging configuration."""from typing import Union, Optional, List, Dict, Any, Sequence, Set
from datetime import datetime, timedelta
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
TaskLike = Union[BaseOperator, Sequence[BaseOperator]]
StateType = strInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow