Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management. Tasks represent individual units of work within DAGs.
Foundation class for all Airflow operators, providing core task functionality and lifecycle management.
class BaseOperator:
def __init__(
self,
task_id: str,
owner: str = "airflow",
email: Optional[Union[str, List[str]]] = None,
email_on_retry: bool = True,
email_on_failure: bool = True,
retries: Optional[int] = None,
retry_delay: timedelta = timedelta(seconds=300),
retry_exponential_backoff: bool = False,
max_retry_delay: Optional[timedelta] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
depends_on_past: bool = False,
wait_for_downstream: bool = False,
dag: Optional[DAG] = None,
params: Optional[Dict[str, Any]] = None,
default_args: Optional[Dict[str, Any]] = None,
pool: Optional[str] = None,
pool_slots: int = 1,
queue: Optional[str] = None,
priority_weight: int = 1,
weight_rule: str = "downstream",
sla: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = None,
on_execute_callback: Optional[Callable] = None,
on_failure_callback: Optional[Callable] = None,
on_success_callback: Optional[Callable] = None,
on_retry_callback: Optional[Callable] = None,
trigger_rule: str = "all_success",
resources: Optional[Dict[str, Any]] = None,
run_as_user: Optional[str] = None,
task_concurrency: Optional[int] = None,
max_active_tis_per_dag: Optional[int] = None,
executor_config: Optional[Dict[str, Any]] = None,
do_xcom_push: bool = True,
inlets: Optional[List[Any]] = None,
outlets: Optional[List[Any]] = None,
task_group: Optional[TaskGroup] = None,
doc: Optional[str] = None,
doc_md: Optional[str] = None,
doc_json: Optional[str] = None,
doc_yaml: Optional[str] = None,
doc_rst: Optional[str] = None,
wait_for_past_depends_before_skipping: bool = False,
max_active_tis_per_dagrun: Optional[int] = None,
map_index_template: Optional[str] = None,
multiple_outputs: bool = False,
task_display_name: Optional[str] = None,
logger_name: Optional[str] = None,
allow_nested_operators: bool = True,
**kwargs
):
"""
Base operator for all Airflow tasks.
Args:
task_id: Unique identifier for the task
owner: Owner of the task
retries: Number of retries when task fails
retry_delay: Delay between retries
start_date: When the task should start being scheduled
end_date: When the task should stop being scheduled
depends_on_past: Whether task depends on previous run success
pool: Resource pool for task execution
priority_weight: Task priority for execution order
trigger_rule: Rule for task triggering based on upstream tasks
execution_timeout: Maximum runtime before task timeout
"""
def execute(self, context: Context) -> Any:
"""Execute the task logic. Must be implemented by subclasses."""
def on_kill(self) -> None:
"""Called when task is killed for cleanup."""
def defer(self, trigger: BaseTrigger, method_name: str, **kwargs) -> None:
"""Defer task execution to a trigger."""
def resume_execution(self, context: Context, event: Dict[str, Any]) -> Any:
"""Resume execution after deferral."""
def render_template_fields(
self,
context: Context,
jinja_env: Optional[jinja2.Environment] = None
) -> None:
"""Render Jinja templates in task fields."""Modern approach to task definition using decorators for cleaner, more Pythonic task creation.
@task(
task_id: Optional[str] = None,
python_callable: Optional[Callable] = None,
op_args: Optional[List[Any]] = None,
op_kwargs: Optional[Dict[str, Any]] = None,
templates_dict: Optional[Dict[str, Any]] = None,
templates_exts: Optional[List[str]] = None,
show_return_value_in_logs: bool = True,
**kwargs
) -> Callable:
"""
Decorator to create a task from a Python function.
Args:
task_id: Unique identifier (auto-generated from function name if not provided)
python_callable: The Python function to execute
op_args: Positional arguments to pass to the function
op_kwargs: Keyword arguments to pass to the function
templates_dict: Dictionary of templates to render
show_return_value_in_logs: Whether to log return value
Returns:
Decorated function that returns task output
"""
@task.setup(
task_id: Optional[str] = None,
**kwargs
) -> Callable:
"""
Decorator for setup tasks that run before other tasks.
Args:
task_id: Unique identifier
**kwargs: Additional task arguments
Returns:
Decorated function for setup task
"""
@task.teardown(
task_id: Optional[str] = None,
**kwargs
) -> Callable:
"""
Decorator for teardown tasks that run after other tasks.
Args:
task_id: Unique identifier
**kwargs: Additional task arguments
Returns:
Decorated function for teardown task
"""Usage example:
from airflow.decorators import dag, task
from datetime import datetime
@dag(dag_id='task_decorator_example', start_date=datetime(2024, 1, 1))
def task_decorator_example():
@task
def extract_data(source: str) -> dict:
"""Extract data from source."""
return {'data': f'extracted from {source}', 'count': 100}
@task
def transform_data(data: dict) -> dict:
"""Transform the data."""
return {
'transformed_data': data['data'].upper(),
'processed_count': data['count'] * 2
}
@task.setup
def setup_environment():
"""Setup task that runs first."""
print("Setting up environment")
@task.teardown
def cleanup():
"""Cleanup task that runs last."""
print("Cleaning up")
# Define dependencies
setup_environment()
raw_data = extract_data('database')
processed_data = transform_data(raw_data)
cleanup()
dag_instance = task_decorator_example()Create tasks dynamically at runtime based on input data or external conditions.
class MappedOperator:
"""
Operator created through dynamic task mapping.
Attributes:
task_id: Base task identifier
operator_class: Original operator class
mapped_op_kwargs: Mapped operator arguments
partial_kwargs: Static operator arguments
"""
task_id: str
operator_class: type
mapped_op_kwargs: Dict[str, Any]
partial_kwargs: Dict[str, Any]
def expand(self, **mapped_kwargs) -> 'MappedOperator':
"""
Expand operator with mapped arguments.
Args:
**mapped_kwargs: Arguments to map over
Returns:
MappedOperator instance
"""
def partial(self, **partial_kwargs) -> 'MappedOperator':
"""
Set static arguments for mapped operator.
Args:
**partial_kwargs: Static arguments
Returns:
Partially configured MappedOperator
"""Usage example:
from airflow.decorators import dag, task
@dag(dag_id='dynamic_mapping_example', start_date=datetime(2024, 1, 1))
def dynamic_mapping_example():
@task
def get_file_list() -> List[str]:
"""Get list of files to process."""
return ['file1.csv', 'file2.csv', 'file3.csv']
@task
def process_file(filename: str) -> str:
"""Process a single file."""
return f"processed {filename}"
# Dynamic mapping - creates one task per file
files = get_file_list()
process_file.expand(filename=files)
dag_instance = dynamic_mapping_example()Represents a specific execution of a task within a DAG run.
class TaskInstance:
"""
ORM model for task instance execution.
Attributes:
task_id: Task identifier
dag_id: DAG identifier
run_id: DAG run identifier
execution_date: Execution date
start_date: When task started
end_date: When task ended
duration: Task execution duration
state: Current task state
try_number: Current retry attempt
max_tries: Maximum retry attempts
hostname: Worker hostname
unixname: Unix username
job_id: Job identifier
pool: Resource pool
pool_slots: Number of pool slots used
queue: Execution queue
priority_weight: Task priority
operator: Operator class name
queued_dttm: When task was queued
pid: Process ID
executor_config: Executor configuration
external_executor_id: External executor identifier
trigger_id: Trigger identifier (for deferred tasks)
next_method: Next method to call
next_kwargs: Arguments for next method
"""
id: Optional[UUID]
task_id: str
dag_id: str
run_id: str
map_index: int
execution_date: datetime
start_date: Optional[datetime]
end_date: Optional[datetime]
duration: Optional[float]
state: Optional[str]
try_number: int
max_tries: int
hostname: str
unixname: str
job_id: Optional[int]
pool: str
pool_slots: int
queue: str
priority_weight: int
operator: str
queued_dttm: Optional[datetime]
pid: Optional[int]
executor_config: Optional[Dict]
external_executor_id: Optional[str]
trigger_id: Optional[int]
next_method: Optional[str]
next_kwargs: Optional[Dict]
def clear_task_instances(
self,
tis: List['TaskInstance'],
session: Session = None,
dag: Optional[DAG] = None
) -> None:
"""Clear task instances for retry."""
def get_task_instance(
self,
task_id: str,
execution_date: datetime,
session: Session = None
) -> Optional['TaskInstance']:
"""Get task instance by ID and execution date."""Manage dependencies between tasks using various trigger rules and patterns.
def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:
"""
Chain tasks sequentially.
Args:
*tasks: Tasks to chain in order
"""
def chain_linear(*tasks: BaseOperator) -> None:
"""
Chain tasks in a linear sequence.
Args:
*tasks: Tasks to chain linearly
"""
def cross_downstream(
from_tasks: Sequence[BaseOperator],
to_tasks: Sequence[BaseOperator]
) -> None:
"""
Create dependencies from all tasks in from_tasks to all tasks in to_tasks.
Args:
from_tasks: Source tasks
to_tasks: Target tasks
"""Usage example:
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain, cross_downstream
@dag(dag_id='dependencies_example', start_date=datetime(2024, 1, 1))
def dependencies_example():
@task
def start():
return "started"
@task
def process_a():
return "a"
@task
def process_b():
return "b"
@task
def combine():
return "combined"
@task
def end():
return "ended"
# Linear chain
start_task = start()
process_a_task = process_a()
process_b_task = process_b()
combine_task = combine()
end_task = end()
# Set up dependencies
chain(start_task, [process_a_task, process_b_task], combine_task, end_task)
dag_instance = dependencies_example()Task execution states and lifecycle management.
from airflow.utils.state import TaskInstanceState
# Task States
class TaskInstanceState:
"""Task instance states."""
NONE: str = "none"
SCHEDULED: str = "scheduled"
QUEUED: str = "queued"
RUNNING: str = "running"
SUCCESS: str = "success"
SHUTDOWN: str = "shutdown"
RESTARTING: str = "restarting"
FAILED: str = "failed"
UP_FOR_RETRY: str = "up_for_retry"
UP_FOR_RESCHEDULE: str = "up_for_reschedule"
UPSTREAM_FAILED: str = "upstream_failed"
SKIPPED: str = "skipped"
REMOVED: str = "removed"
DEFERRED: str = "deferred"
# Trigger Rules
TRIGGER_RULES = [
"all_success", # All upstream tasks succeeded
"all_failed", # All upstream tasks failed
"all_done", # All upstream tasks completed (success or failed)
"one_success", # At least one upstream task succeeded
"one_failed", # At least one upstream task failed
"none_failed", # No upstream tasks failed
"none_failed_min_one_success", # No failures and at least one success
"none_skipped", # No upstream tasks skipped
"always", # Always run regardless of upstream state
]from typing import Union, Optional, List, Dict, Callable, Any, Sequence
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.models.taskgroup import TaskGroup
from airflow.utils.context import Context
TaskState = Literal[
"none", "scheduled", "queued", "running", "success",
"failed", "up_for_retry", "upstream_failed", "skipped"
]
TriggerRule = Literal[
"all_success", "all_failed", "all_done", "one_success",
"one_failed", "none_failed", "always"
]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow