or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assets-scheduling.mdcli-utilities.mdconfiguration.mddag-management.mddatabase-models.mdexceptions.mdexecutors.mdextensions.mdindex.mdtask-operators.mdxcom.md

dag-management.mddocs/

0

# DAG Management

1

2

Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks. DAGs represent workflows as code with explicit dependencies, scheduling, and configuration.

3

4

## Capabilities

5

6

### DAG Definition

7

8

Create and configure DAGs using the traditional class-based approach or modern decorator patterns.

9

10

```python { .api }

11

class DAG:

12

def __init__(

13

self,

14

dag_id: str,

15

description: str = None,

16

schedule: Optional[Union[str, datetime.timedelta]] = None,

17

start_date: Optional[datetime.datetime] = None,

18

end_date: Optional[datetime.datetime] = None,

19

template_searchpath: Optional[Union[str, List[str]]] = None,

20

template_undefined: type = jinja2.StrictUndefined,

21

user_defined_macros: Optional[Dict] = None,

22

user_defined_filters: Optional[Dict] = None,

23

default_args: Optional[Dict] = None,

24

max_active_tasks: int = 16,

25

max_active_runs: int = 16,

26

dagrun_timeout: Optional[datetime.timedelta] = None,

27

sla_miss_callback: Optional[Callable] = None,

28

default_view: str = "tree",

29

orientation: str = "LR",

30

catchup: bool = True,

31

on_success_callback: Optional[Callable] = None,

32

on_failure_callback: Optional[Callable] = None,

33

tags: Optional[List[str]] = None,

34

params: Optional[Dict[str, Any]] = None,

35

access_control: Optional[Dict[str, Dict[str, Collection[str]]]] = None,

36

is_paused_upon_creation: Optional[bool] = None,

37

jinja_environment_kwargs: Optional[Dict] = None,

38

render_template_as_native_obj: bool = False,

39

owner_links: Optional[Dict[str, str]] = None,

40

auto_register: bool = True,

41

fail_fast: bool = False,

42

dag_display_name: Optional[str] = None,

43

max_consecutive_failed_dag_runs: int = 0,

44

**kwargs

45

):

46

"""

47

Create a new DAG instance.

48

49

Args:

50

dag_id: Unique identifier for the DAG

51

description: Description of the DAG's purpose

52

schedule: How often to run the DAG (cron, timedelta, or None)

53

start_date: When the DAG should start being scheduled

54

end_date: When the DAG should stop being scheduled (optional)

55

default_args: Default arguments applied to all tasks

56

catchup: Whether to backfill missed runs

57

tags: List of tags for categorization

58

"""

59

```

60

61

### DAG Decorator

62

63

Modern approach to DAG definition using the @dag decorator for cleaner, more Pythonic workflow definition.

64

65

```python { .api }

66

@dag(

67

dag_id: Optional[str] = None,

68

description: Optional[str] = None,

69

schedule: Optional[Union[str, timedelta, cron.CronExpression]] = None,

70

start_date: Optional[datetime] = None,

71

end_date: Optional[datetime] = None,

72

template_searchpath: Optional[Union[str, List[str]]] = None,

73

user_defined_macros: Optional[Dict] = None,

74

user_defined_filters: Optional[Dict] = None,

75

default_args: Optional[Dict] = None,

76

max_active_tasks: int = 16,

77

max_active_runs: int = 16,

78

dagrun_timeout: Optional[timedelta] = None,

79

catchup: bool = True,

80

on_success_callback: Optional[Callable] = None,

81

on_failure_callback: Optional[Callable] = None,

82

tags: Optional[List[str]] = None,

83

**kwargs

84

) -> Callable:

85

"""

86

Decorator to create a DAG from a function.

87

88

Args:

89

dag_id: Unique identifier (auto-generated from function name if not provided)

90

schedule: How often to run the DAG

91

start_date: When the DAG should start being scheduled

92

catchup: Whether to backfill missed runs

93

tags: List of tags for categorization

94

95

Returns:

96

Decorated function that returns a DAG instance

97

"""

98

```

99

100

Usage example:

101

102

```python

103

from airflow.decorators import dag, task

104

from datetime import datetime, timedelta

105

106

@dag(

107

dag_id='modern_workflow',

108

schedule=timedelta(hours=1),

109

start_date=datetime(2024, 1, 1),

110

catchup=False,

111

tags=['modern', 'example']

112

)

113

def modern_workflow():

114

@task

115

def process_data():

116

return "processed"

117

118

process_data()

119

120

dag_instance = modern_workflow()

121

```

122

123

### Task Groups

124

125

Organize related tasks into logical groups for better DAG visualization and organization.

126

127

```python { .api }

128

class TaskGroup:

129

def __init__(

130

self,

131

group_id: str,

132

tooltip: str = "",

133

dag: Optional[DAG] = None,

134

default_args: Optional[Dict] = None,

135

prefix_group_id: bool = True,

136

parent_group: Optional['TaskGroup'] = None,

137

ui_color: str = "CornflowerBlue",

138

ui_fgcolor: str = "#000",

139

add_suffix_on_collision: bool = False,

140

group_display_name: Optional[str] = None,

141

**kwargs

142

):

143

"""

144

Create a new task group.

145

146

Args:

147

group_id: Unique identifier for the group

148

tooltip: Tooltip text displayed in the UI

149

dag: Parent DAG (auto-detected if not provided)

150

prefix_group_id: Whether to prefix task IDs with group ID

151

"""

152

153

@task_group(

154

group_id: Optional[str] = None,

155

tooltip: str = "",

156

default_args: Optional[Dict] = None,

157

prefix_group_id: bool = True,

158

**kwargs

159

) -> Callable:

160

"""

161

Decorator to create a task group from a function.

162

163

Args:

164

group_id: Unique identifier (auto-generated from function name if not provided)

165

tooltip: Tooltip text displayed in the UI

166

prefix_group_id: Whether to prefix task IDs with group ID

167

168

Returns:

169

Decorated function that returns a TaskGroup instance

170

"""

171

```

172

173

Usage example:

174

175

```python

176

from airflow.decorators import dag, task, task_group

177

178

@dag(dag_id='grouped_workflow', start_date=datetime(2024, 1, 1))

179

def grouped_workflow():

180

@task_group(group_id='data_processing')

181

def data_processing():

182

@task

183

def extract():

184

return "extracted"

185

186

@task

187

def transform(data):

188

return f"transformed_{data}"

189

190

@task

191

def load(data):

192

print(f"loading {data}")

193

194

data = extract()

195

transformed = transform(data)

196

load(transformed)

197

198

return transformed

199

200

processed = data_processing()

201

202

dag_instance = grouped_workflow()

203

```

204

205

### DAG Model and Metadata

206

207

ORM model representing DAG metadata in the database.

208

209

```python { .api }

210

class DagModel:

211

"""

212

ORM model for DAG metadata storage.

213

214

Attributes:

215

dag_id: Unique DAG identifier

216

is_active: Whether the DAG is currently active

217

is_paused: Whether the DAG is paused

218

last_parsed_time: When the DAG was last parsed

219

last_pickled: When the DAG was last pickled

220

last_expired: When the DAG last expired

221

scheduler_lock: Scheduler lock information

222

pickle_id: Pickle ID for serialization

223

fileloc: File location of the DAG

224

owners: DAG owners

225

description: DAG description

226

default_view: Default view in the UI

227

schedule_interval: DAG schedule interval

228

tags: List of DAG tags

229

"""

230

dag_id: str

231

is_active: bool

232

is_paused: bool

233

last_parsed_time: Optional[datetime]

234

last_pickled: Optional[datetime]

235

last_expired: Optional[datetime]

236

scheduler_lock: Optional[bool]

237

pickle_id: Optional[int]

238

fileloc: str

239

owners: str

240

description: Optional[str]

241

default_view: str

242

schedule_interval: Optional[str]

243

tags: List[str]

244

```

245

246

### DAG Runs

247

248

Represents individual executions of a DAG.

249

250

```python { .api }

251

class DagRun:

252

"""

253

ORM model for DAG run instances.

254

255

Attributes:

256

dag_id: DAG identifier

257

execution_date: Execution date for this run

258

run_id: Unique run identifier

259

state: Current state of the run

260

run_type: Type of run (scheduled, manual, backfill)

261

external_trigger: Whether triggered externally

262

start_date: When the run started

263

end_date: When the run ended

264

creating_job_id: ID of job that created this run

265

"""

266

dag_id: str

267

execution_date: datetime

268

run_id: str

269

state: str

270

run_type: str

271

external_trigger: bool

272

start_date: Optional[datetime]

273

end_date: Optional[datetime]

274

creating_job_id: Optional[int]

275

```

276

277

### DAG Utilities

278

279

Utility functions for DAG management and organization.

280

281

```python { .api }

282

class DagBag:

283

def __init__(

284

self,

285

dag_folder: Optional[str] = None,

286

executor: Optional[BaseExecutor] = None,

287

include_examples: bool = True,

288

safe_mode: bool = True,

289

read_dags_from_db: bool = False,

290

store_serialized_dags: bool = False,

291

load_op_links: bool = True

292

):

293

"""

294

Container for loading and managing multiple DAGs.

295

296

Args:

297

dag_folder: Directory to scan for DAG files

298

include_examples: Whether to include example DAGs

299

safe_mode: Whether to use safe mode for parsing

300

read_dags_from_db: Whether to read DAGs from database

301

"""

302

303

def get_dag(self, dag_id: str) -> Optional[DAG]:

304

"""Get a DAG by ID."""

305

306

def process_file(self, filepath: str) -> List[DAG]:

307

"""Process a single DAG file."""

308

309

def collect_dags(

310

self,

311

dag_folder: Optional[str] = None,

312

only_if_updated: bool = True,

313

include_examples: bool = True,

314

safe_mode: bool = True

315

) -> None:

316

"""Collect DAGs from the specified folder."""

317

```

318

319

## Types

320

321

```python { .api }

322

from typing import Union, Optional, List, Dict, Callable, Any

323

from datetime import datetime, timedelta

324

import jinja2

325

from crontab import CronTab

326

327

DagRunState = Literal["queued", "running", "success", "failed"]

328

ScheduleInterval = Union[str, timedelta, cron.CronExpression, None]

329

```