Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks. DAGs represent workflows as code with explicit dependencies, scheduling, and configuration.
Create and configure DAGs using the traditional class-based approach or modern decorator patterns.
class DAG:
def __init__(
self,
dag_id: str,
description: str = None,
schedule: Optional[Union[str, datetime.timedelta]] = None,
start_date: Optional[datetime.datetime] = None,
end_date: Optional[datetime.datetime] = None,
template_searchpath: Optional[Union[str, List[str]]] = None,
template_undefined: type = jinja2.StrictUndefined,
user_defined_macros: Optional[Dict] = None,
user_defined_filters: Optional[Dict] = None,
default_args: Optional[Dict] = None,
max_active_tasks: int = 16,
max_active_runs: int = 16,
dagrun_timeout: Optional[datetime.timedelta] = None,
sla_miss_callback: Optional[Callable] = None,
default_view: str = "tree",
orientation: str = "LR",
catchup: bool = True,
on_success_callback: Optional[Callable] = None,
on_failure_callback: Optional[Callable] = None,
tags: Optional[List[str]] = None,
params: Optional[Dict[str, Any]] = None,
access_control: Optional[Dict[str, Dict[str, Collection[str]]]] = None,
is_paused_upon_creation: Optional[bool] = None,
jinja_environment_kwargs: Optional[Dict] = None,
render_template_as_native_obj: bool = False,
owner_links: Optional[Dict[str, str]] = None,
auto_register: bool = True,
fail_fast: bool = False,
dag_display_name: Optional[str] = None,
max_consecutive_failed_dag_runs: int = 0,
**kwargs
):
"""
Create a new DAG instance.
Args:
dag_id: Unique identifier for the DAG
description: Description of the DAG's purpose
schedule: How often to run the DAG (cron, timedelta, or None)
start_date: When the DAG should start being scheduled
end_date: When the DAG should stop being scheduled (optional)
default_args: Default arguments applied to all tasks
catchup: Whether to backfill missed runs
tags: List of tags for categorization
"""Modern approach to DAG definition using the @dag decorator for cleaner, more Pythonic workflow definition.
@dag(
dag_id: Optional[str] = None,
description: Optional[str] = None,
schedule: Optional[Union[str, timedelta, cron.CronExpression]] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
template_searchpath: Optional[Union[str, List[str]]] = None,
user_defined_macros: Optional[Dict] = None,
user_defined_filters: Optional[Dict] = None,
default_args: Optional[Dict] = None,
max_active_tasks: int = 16,
max_active_runs: int = 16,
dagrun_timeout: Optional[timedelta] = None,
catchup: bool = True,
on_success_callback: Optional[Callable] = None,
on_failure_callback: Optional[Callable] = None,
tags: Optional[List[str]] = None,
**kwargs
) -> Callable:
"""
Decorator to create a DAG from a function.
Args:
dag_id: Unique identifier (auto-generated from function name if not provided)
schedule: How often to run the DAG
start_date: When the DAG should start being scheduled
catchup: Whether to backfill missed runs
tags: List of tags for categorization
Returns:
Decorated function that returns a DAG instance
"""Usage example:
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
dag_id='modern_workflow',
schedule=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['modern', 'example']
)
def modern_workflow():
@task
def process_data():
return "processed"
process_data()
dag_instance = modern_workflow()Organize related tasks into logical groups for better DAG visualization and organization.
class TaskGroup:
def __init__(
self,
group_id: str,
tooltip: str = "",
dag: Optional[DAG] = None,
default_args: Optional[Dict] = None,
prefix_group_id: bool = True,
parent_group: Optional['TaskGroup'] = None,
ui_color: str = "CornflowerBlue",
ui_fgcolor: str = "#000",
add_suffix_on_collision: bool = False,
group_display_name: Optional[str] = None,
**kwargs
):
"""
Create a new task group.
Args:
group_id: Unique identifier for the group
tooltip: Tooltip text displayed in the UI
dag: Parent DAG (auto-detected if not provided)
prefix_group_id: Whether to prefix task IDs with group ID
"""
@task_group(
group_id: Optional[str] = None,
tooltip: str = "",
default_args: Optional[Dict] = None,
prefix_group_id: bool = True,
**kwargs
) -> Callable:
"""
Decorator to create a task group from a function.
Args:
group_id: Unique identifier (auto-generated from function name if not provided)
tooltip: Tooltip text displayed in the UI
prefix_group_id: Whether to prefix task IDs with group ID
Returns:
Decorated function that returns a TaskGroup instance
"""Usage example:
from airflow.decorators import dag, task, task_group
@dag(dag_id='grouped_workflow', start_date=datetime(2024, 1, 1))
def grouped_workflow():
@task_group(group_id='data_processing')
def data_processing():
@task
def extract():
return "extracted"
@task
def transform(data):
return f"transformed_{data}"
@task
def load(data):
print(f"loading {data}")
data = extract()
transformed = transform(data)
load(transformed)
return transformed
processed = data_processing()
dag_instance = grouped_workflow()ORM model representing DAG metadata in the database.
class DagModel:
"""
ORM model for DAG metadata storage.
Attributes:
dag_id: Unique DAG identifier
is_active: Whether the DAG is currently active
is_paused: Whether the DAG is paused
last_parsed_time: When the DAG was last parsed
last_pickled: When the DAG was last pickled
last_expired: When the DAG last expired
scheduler_lock: Scheduler lock information
pickle_id: Pickle ID for serialization
fileloc: File location of the DAG
owners: DAG owners
description: DAG description
default_view: Default view in the UI
schedule_interval: DAG schedule interval
tags: List of DAG tags
"""
dag_id: str
is_active: bool
is_paused: bool
last_parsed_time: Optional[datetime]
last_pickled: Optional[datetime]
last_expired: Optional[datetime]
scheduler_lock: Optional[bool]
pickle_id: Optional[int]
fileloc: str
owners: str
description: Optional[str]
default_view: str
schedule_interval: Optional[str]
tags: List[str]Represents individual executions of a DAG.
class DagRun:
"""
ORM model for DAG run instances.
Attributes:
dag_id: DAG identifier
execution_date: Execution date for this run
run_id: Unique run identifier
state: Current state of the run
run_type: Type of run (scheduled, manual, backfill)
external_trigger: Whether triggered externally
start_date: When the run started
end_date: When the run ended
creating_job_id: ID of job that created this run
"""
dag_id: str
execution_date: datetime
run_id: str
state: str
run_type: str
external_trigger: bool
start_date: Optional[datetime]
end_date: Optional[datetime]
creating_job_id: Optional[int]Utility functions for DAG management and organization.
class DagBag:
def __init__(
self,
dag_folder: Optional[str] = None,
executor: Optional[BaseExecutor] = None,
include_examples: bool = True,
safe_mode: bool = True,
read_dags_from_db: bool = False,
store_serialized_dags: bool = False,
load_op_links: bool = True
):
"""
Container for loading and managing multiple DAGs.
Args:
dag_folder: Directory to scan for DAG files
include_examples: Whether to include example DAGs
safe_mode: Whether to use safe mode for parsing
read_dags_from_db: Whether to read DAGs from database
"""
def get_dag(self, dag_id: str) -> Optional[DAG]:
"""Get a DAG by ID."""
def process_file(self, filepath: str) -> List[DAG]:
"""Process a single DAG file."""
def collect_dags(
self,
dag_folder: Optional[str] = None,
only_if_updated: bool = True,
include_examples: bool = True,
safe_mode: bool = True
) -> None:
"""Collect DAGs from the specified folder."""from typing import Union, Optional, List, Dict, Callable, Any
from datetime import datetime, timedelta
import jinja2
from crontab import CronTab
DagRunState = Literal["queued", "running", "success", "failed"]
ScheduleInterval = Union[str, timedelta, cron.CronExpression, None]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow