0
# DAG Management
1
2
Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks. DAGs represent workflows as code with explicit dependencies, scheduling, and configuration.
3
4
## Capabilities
5
6
### DAG Definition
7
8
Create and configure DAGs using the traditional class-based approach or modern decorator patterns.
9
10
```python { .api }
11
class DAG:
12
def __init__(
13
self,
14
dag_id: str,
15
description: str = None,
16
schedule: Optional[Union[str, datetime.timedelta]] = None,
17
start_date: Optional[datetime.datetime] = None,
18
end_date: Optional[datetime.datetime] = None,
19
template_searchpath: Optional[Union[str, List[str]]] = None,
20
template_undefined: type = jinja2.StrictUndefined,
21
user_defined_macros: Optional[Dict] = None,
22
user_defined_filters: Optional[Dict] = None,
23
default_args: Optional[Dict] = None,
24
max_active_tasks: int = 16,
25
max_active_runs: int = 16,
26
dagrun_timeout: Optional[datetime.timedelta] = None,
27
sla_miss_callback: Optional[Callable] = None,
28
default_view: str = "tree",
29
orientation: str = "LR",
30
catchup: bool = True,
31
on_success_callback: Optional[Callable] = None,
32
on_failure_callback: Optional[Callable] = None,
33
tags: Optional[List[str]] = None,
34
params: Optional[Dict[str, Any]] = None,
35
access_control: Optional[Dict[str, Dict[str, Collection[str]]]] = None,
36
is_paused_upon_creation: Optional[bool] = None,
37
jinja_environment_kwargs: Optional[Dict] = None,
38
render_template_as_native_obj: bool = False,
39
owner_links: Optional[Dict[str, str]] = None,
40
auto_register: bool = True,
41
fail_fast: bool = False,
42
dag_display_name: Optional[str] = None,
43
max_consecutive_failed_dag_runs: int = 0,
44
**kwargs
45
):
46
"""
47
Create a new DAG instance.
48
49
Args:
50
dag_id: Unique identifier for the DAG
51
description: Description of the DAG's purpose
52
schedule: How often to run the DAG (cron, timedelta, or None)
53
start_date: When the DAG should start being scheduled
54
end_date: When the DAG should stop being scheduled (optional)
55
default_args: Default arguments applied to all tasks
56
catchup: Whether to backfill missed runs
57
tags: List of tags for categorization
58
"""
59
```
60
61
### DAG Decorator
62
63
Modern approach to DAG definition using the @dag decorator for cleaner, more Pythonic workflow definition.
64
65
```python { .api }
66
@dag(
67
dag_id: Optional[str] = None,
68
description: Optional[str] = None,
69
schedule: Optional[Union[str, timedelta, cron.CronExpression]] = None,
70
start_date: Optional[datetime] = None,
71
end_date: Optional[datetime] = None,
72
template_searchpath: Optional[Union[str, List[str]]] = None,
73
user_defined_macros: Optional[Dict] = None,
74
user_defined_filters: Optional[Dict] = None,
75
default_args: Optional[Dict] = None,
76
max_active_tasks: int = 16,
77
max_active_runs: int = 16,
78
dagrun_timeout: Optional[timedelta] = None,
79
catchup: bool = True,
80
on_success_callback: Optional[Callable] = None,
81
on_failure_callback: Optional[Callable] = None,
82
tags: Optional[List[str]] = None,
83
**kwargs
84
) -> Callable:
85
"""
86
Decorator to create a DAG from a function.
87
88
Args:
89
dag_id: Unique identifier (auto-generated from function name if not provided)
90
schedule: How often to run the DAG
91
start_date: When the DAG should start being scheduled
92
catchup: Whether to backfill missed runs
93
tags: List of tags for categorization
94
95
Returns:
96
Decorated function that returns a DAG instance
97
"""
98
```
99
100
Usage example:
101
102
```python
103
from airflow.decorators import dag, task
104
from datetime import datetime, timedelta
105
106
@dag(
107
dag_id='modern_workflow',
108
schedule=timedelta(hours=1),
109
start_date=datetime(2024, 1, 1),
110
catchup=False,
111
tags=['modern', 'example']
112
)
113
def modern_workflow():
114
@task
115
def process_data():
116
return "processed"
117
118
process_data()
119
120
dag_instance = modern_workflow()
121
```
122
123
### Task Groups
124
125
Organize related tasks into logical groups for better DAG visualization and organization.
126
127
```python { .api }
128
class TaskGroup:
129
def __init__(
130
self,
131
group_id: str,
132
tooltip: str = "",
133
dag: Optional[DAG] = None,
134
default_args: Optional[Dict] = None,
135
prefix_group_id: bool = True,
136
parent_group: Optional['TaskGroup'] = None,
137
ui_color: str = "CornflowerBlue",
138
ui_fgcolor: str = "#000",
139
add_suffix_on_collision: bool = False,
140
group_display_name: Optional[str] = None,
141
**kwargs
142
):
143
"""
144
Create a new task group.
145
146
Args:
147
group_id: Unique identifier for the group
148
tooltip: Tooltip text displayed in the UI
149
dag: Parent DAG (auto-detected if not provided)
150
prefix_group_id: Whether to prefix task IDs with group ID
151
"""
152
153
@task_group(
154
group_id: Optional[str] = None,
155
tooltip: str = "",
156
default_args: Optional[Dict] = None,
157
prefix_group_id: bool = True,
158
**kwargs
159
) -> Callable:
160
"""
161
Decorator to create a task group from a function.
162
163
Args:
164
group_id: Unique identifier (auto-generated from function name if not provided)
165
tooltip: Tooltip text displayed in the UI
166
prefix_group_id: Whether to prefix task IDs with group ID
167
168
Returns:
169
Decorated function that returns a TaskGroup instance
170
"""
171
```
172
173
Usage example:
174
175
```python
176
from airflow.decorators import dag, task, task_group
177
178
@dag(dag_id='grouped_workflow', start_date=datetime(2024, 1, 1))
179
def grouped_workflow():
180
@task_group(group_id='data_processing')
181
def data_processing():
182
@task
183
def extract():
184
return "extracted"
185
186
@task
187
def transform(data):
188
return f"transformed_{data}"
189
190
@task
191
def load(data):
192
print(f"loading {data}")
193
194
data = extract()
195
transformed = transform(data)
196
load(transformed)
197
198
return transformed
199
200
processed = data_processing()
201
202
dag_instance = grouped_workflow()
203
```
204
205
### DAG Model and Metadata
206
207
ORM model representing DAG metadata in the database.
208
209
```python { .api }
210
class DagModel:
211
"""
212
ORM model for DAG metadata storage.
213
214
Attributes:
215
dag_id: Unique DAG identifier
216
is_active: Whether the DAG is currently active
217
is_paused: Whether the DAG is paused
218
last_parsed_time: When the DAG was last parsed
219
last_pickled: When the DAG was last pickled
220
last_expired: When the DAG last expired
221
scheduler_lock: Scheduler lock information
222
pickle_id: Pickle ID for serialization
223
fileloc: File location of the DAG
224
owners: DAG owners
225
description: DAG description
226
default_view: Default view in the UI
227
schedule_interval: DAG schedule interval
228
tags: List of DAG tags
229
"""
230
dag_id: str
231
is_active: bool
232
is_paused: bool
233
last_parsed_time: Optional[datetime]
234
last_pickled: Optional[datetime]
235
last_expired: Optional[datetime]
236
scheduler_lock: Optional[bool]
237
pickle_id: Optional[int]
238
fileloc: str
239
owners: str
240
description: Optional[str]
241
default_view: str
242
schedule_interval: Optional[str]
243
tags: List[str]
244
```
245
246
### DAG Runs
247
248
Represents individual executions of a DAG.
249
250
```python { .api }
251
class DagRun:
252
"""
253
ORM model for DAG run instances.
254
255
Attributes:
256
dag_id: DAG identifier
257
execution_date: Execution date for this run
258
run_id: Unique run identifier
259
state: Current state of the run
260
run_type: Type of run (scheduled, manual, backfill)
261
external_trigger: Whether triggered externally
262
start_date: When the run started
263
end_date: When the run ended
264
creating_job_id: ID of job that created this run
265
"""
266
dag_id: str
267
execution_date: datetime
268
run_id: str
269
state: str
270
run_type: str
271
external_trigger: bool
272
start_date: Optional[datetime]
273
end_date: Optional[datetime]
274
creating_job_id: Optional[int]
275
```
276
277
### DAG Utilities
278
279
Utility functions for DAG management and organization.
280
281
```python { .api }
282
class DagBag:
283
def __init__(
284
self,
285
dag_folder: Optional[str] = None,
286
executor: Optional[BaseExecutor] = None,
287
include_examples: bool = True,
288
safe_mode: bool = True,
289
read_dags_from_db: bool = False,
290
store_serialized_dags: bool = False,
291
load_op_links: bool = True
292
):
293
"""
294
Container for loading and managing multiple DAGs.
295
296
Args:
297
dag_folder: Directory to scan for DAG files
298
include_examples: Whether to include example DAGs
299
safe_mode: Whether to use safe mode for parsing
300
read_dags_from_db: Whether to read DAGs from database
301
"""
302
303
def get_dag(self, dag_id: str) -> Optional[DAG]:
304
"""Get a DAG by ID."""
305
306
def process_file(self, filepath: str) -> List[DAG]:
307
"""Process a single DAG file."""
308
309
def collect_dags(
310
self,
311
dag_folder: Optional[str] = None,
312
only_if_updated: bool = True,
313
include_examples: bool = True,
314
safe_mode: bool = True
315
) -> None:
316
"""Collect DAGs from the specified folder."""
317
```
318
319
## Types
320
321
```python { .api }
322
from typing import Union, Optional, List, Dict, Callable, Any
323
from datetime import datetime, timedelta
324
import jinja2
325
from crontab import CronTab
326
327
DagRunState = Literal["queued", "running", "success", "failed"]
328
ScheduleInterval = Union[str, timedelta, cron.CronExpression, None]
329
```