or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assets-scheduling.mdcli-utilities.mdconfiguration.mddag-management.mddatabase-models.mdexceptions.mdexecutors.mdextensions.mdindex.mdtask-operators.mdxcom.md

cli-utilities.mddocs/

0

# CLI and Utilities

1

2

Command-line interface, context utilities, dependency management, and workflow orchestration helpers. Airflow provides extensive CLI tools and utility functions for workflow management.

3

4

## Capabilities

5

6

### Context Utilities

7

8

Access execution context and runtime information.

9

10

```python { .api }

11

def get_current_context() -> Context:

12

"""

13

Get the current task execution context.

14

15

Returns:

16

Current execution context with task, DAG, and runtime information

17

"""

18

19

def get_parsing_context() -> Context:

20

"""

21

Get the DAG parsing context.

22

23

Returns:

24

Context available during DAG parsing

25

"""

26

27

class Context:

28

"""Task execution context."""

29

# Core objects

30

task_instance: TaskInstance

31

task: BaseOperator

32

dag: DAG

33

dag_run: DagRun

34

35

# Execution info

36

execution_date: datetime

37

logical_date: datetime

38

data_interval_start: datetime

39

data_interval_end: datetime

40

41

# Formatted dates

42

ds: str # YYYY-MM-DD

43

ds_nodash: str # YYYYMMDD

44

ts: str # ISO timestamp

45

ts_nodash: str # Timestamp without separators

46

47

# Configuration

48

params: Dict[str, Any]

49

var: Dict[str, Any]

50

conf: Dict[str, Any]

51

52

# XCom access

53

ti: TaskInstance # For XCom operations

54

```

55

56

### Dependency Management

57

58

Manage task dependencies and execution order.

59

60

```python { .api }

61

def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:

62

"""

63

Chain tasks in sequence: task1 >> task2 >> task3.

64

65

Args:

66

*tasks: Tasks to chain in order

67

"""

68

69

def chain_linear(*tasks: BaseOperator) -> None:

70

"""

71

Chain tasks linearly with explicit ordering.

72

73

Args:

74

*tasks: Tasks to chain linearly

75

"""

76

77

def cross_downstream(

78

from_tasks: Sequence[BaseOperator],

79

to_tasks: Sequence[BaseOperator]

80

) -> None:

81

"""

82

Set all tasks in from_tasks as upstream of all tasks in to_tasks.

83

84

Args:

85

from_tasks: Upstream tasks

86

to_tasks: Downstream tasks

87

"""

88

```

89

90

Usage example:

91

92

```python

93

from airflow.decorators import dag, task

94

from airflow.models.baseoperator import chain, cross_downstream

95

96

@dag(dag_id='dependency_example', start_date=datetime(2024, 1, 1))

97

def dependency_example():

98

@task

99

def start():

100

return "started"

101

102

@task

103

def extract_a():

104

return "data_a"

105

106

@task

107

def extract_b():

108

return "data_b"

109

110

@task

111

def transform_a(data):

112

return f"transformed_{data}"

113

114

@task

115

def transform_b(data):

116

return f"transformed_{data}"

117

118

@task

119

def combine(data_a, data_b):

120

return f"combined: {data_a}, {data_b}"

121

122

@task

123

def end():

124

return "finished"

125

126

# Set up dependencies

127

start_task = start()

128

extract_a_task = extract_a()

129

extract_b_task = extract_b()

130

transform_a_task = transform_a(extract_a_task)

131

transform_b_task = transform_b(extract_b_task)

132

combine_task = combine(transform_a_task, transform_b_task)

133

end_task = end()

134

135

# Chain: start >> [extract_a, extract_b] >> [transform_a, transform_b] >> combine >> end

136

chain(

137

start_task,

138

[extract_a_task, extract_b_task],

139

[transform_a_task, transform_b_task],

140

combine_task,

141

end_task

142

)

143

144

dag_instance = dependency_example()

145

```

146

147

### Template Utilities

148

149

Template rendering and macro functions.

150

151

```python { .api }

152

def render_template(

153

template: str,

154

context: Context,

155

jinja_env: Optional[Environment] = None

156

) -> str:

157

"""

158

Render Jinja template with context.

159

160

Args:

161

template: Template string

162

context: Execution context

163

jinja_env: Jinja environment

164

165

Returns:

166

Rendered template

167

"""

168

169

def render_template_from_field(

170

operator: BaseOperator,

171

field: str,

172

context: Context

173

) -> Any:

174

"""

175

Render template field from operator.

176

177

Args:

178

operator: Operator instance

179

field: Field name to render

180

context: Execution context

181

182

Returns:

183

Rendered field value

184

"""

185

186

# Built-in template functions

187

def ds_add(ds: str, days: int) -> str:

188

"""Add days to date string (YYYY-MM-DD format)."""

189

190

def ds_format(ds: str, input_format: str, output_format: str) -> str:

191

"""Format date string from one format to another."""

192

193

def macros_datetime(dt: datetime) -> datetime:

194

"""Access datetime in templates."""

195

196

def macros_timedelta(**kwargs) -> timedelta:

197

"""Create timedelta in templates."""

198

```

199

200

### Date and Time Utilities

201

202

Common date/time operations for workflow scheduling.

203

204

```python { .api }

205

from airflow.utils.dates import days_ago, round_time, infer_time_unit

206

207

def days_ago(n: int, hour: int = 0, minute: int = 0, second: int = 0) -> datetime:

208

"""

209

Get datetime n days ago.

210

211

Args:

212

n: Number of days ago

213

hour: Hour of day

214

minute: Minute of hour

215

second: Second of minute

216

217

Returns:

218

Datetime n days ago

219

"""

220

221

def round_time(dt: datetime, delta: timedelta) -> datetime:

222

"""

223

Round datetime to nearest delta interval.

224

225

Args:

226

dt: Datetime to round

227

delta: Rounding interval

228

229

Returns:

230

Rounded datetime

231

"""

232

233

def infer_time_unit(time_seconds_arr: List[float]) -> str:

234

"""

235

Infer appropriate time unit from array of seconds.

236

237

Args:

238

time_seconds_arr: Array of time values in seconds

239

240

Returns:

241

Appropriate unit ('seconds', 'minutes', 'hours', 'days')

242

"""

243

```

244

245

### State Management

246

247

Utilities for managing task and DAG states.

248

249

```python { .api }

250

from airflow.utils.state import State, DagRunState, TaskInstanceState

251

252

class State:

253

"""Base state management."""

254

255

@classmethod

256

def task_states(cls) -> Set[str]:

257

"""Get all task states."""

258

259

@classmethod

260

def dag_states(cls) -> Set[str]:

261

"""Get all DAG states."""

262

263

@classmethod

264

def finished_states(cls) -> Set[str]:

265

"""Get terminal states."""

266

267

@classmethod

268

def unfinished_states(cls) -> Set[str]:

269

"""Get non-terminal states."""

270

271

def clear_task_instances(

272

tis: List[TaskInstance],

273

session: Session,

274

dag: Optional[DAG] = None

275

) -> None:

276

"""

277

Clear task instances for retry.

278

279

Args:

280

tis: Task instances to clear

281

session: Database session

282

dag: Optional DAG instance

283

"""

284

```

285

286

### Logging Utilities

287

288

Logging configuration and utilities.

289

290

```python { .api }

291

from airflow.utils.log.logging_mixin import LoggingMixin

292

293

class LoggingMixin:

294

"""Mixin for adding logging to classes."""

295

296

@property

297

def logger(self) -> logging.Logger:

298

"""Get logger instance."""

299

300

def log_info(self, message: str) -> None:

301

"""Log info message."""

302

303

def log_warning(self, message: str) -> None:

304

"""Log warning message."""

305

306

def log_error(self, message: str) -> None:

307

"""Log error message."""

308

309

def configure_logging() -> None:

310

"""Configure Airflow logging system."""

311

312

def setup_logging(filename: Optional[str] = None) -> None:

313

"""Setup logging configuration."""

314

```

315

316

## Types

317

318

```python { .api }

319

from typing import Union, Optional, List, Dict, Any, Sequence, Set

320

from datetime import datetime, timedelta

321

from airflow.models.baseoperator import BaseOperator

322

from airflow.models.taskinstance import TaskInstance

323

from airflow.utils.context import Context

324

325

TaskLike = Union[BaseOperator, Sequence[BaseOperator]]

326

StateType = str

327

```