0
# CLI and Utilities
1
2
Command-line interface, context utilities, dependency management, and workflow orchestration helpers. Airflow provides extensive CLI tools and utility functions for workflow management.
3
4
## Capabilities
5
6
### Context Utilities
7
8
Access execution context and runtime information.
9
10
```python { .api }
11
def get_current_context() -> Context:
12
"""
13
Get the current task execution context.
14
15
Returns:
16
Current execution context with task, DAG, and runtime information
17
"""
18
19
def get_parsing_context() -> Context:
20
"""
21
Get the DAG parsing context.
22
23
Returns:
24
Context available during DAG parsing
25
"""
26
27
class Context:
28
"""Task execution context."""
29
# Core objects
30
task_instance: TaskInstance
31
task: BaseOperator
32
dag: DAG
33
dag_run: DagRun
34
35
# Execution info
36
execution_date: datetime
37
logical_date: datetime
38
data_interval_start: datetime
39
data_interval_end: datetime
40
41
# Formatted dates
42
ds: str # YYYY-MM-DD
43
ds_nodash: str # YYYYMMDD
44
ts: str # ISO timestamp
45
ts_nodash: str # Timestamp without separators
46
47
# Configuration
48
params: Dict[str, Any]
49
var: Dict[str, Any]
50
conf: Dict[str, Any]
51
52
# XCom access
53
ti: TaskInstance # For XCom operations
54
```
55
56
### Dependency Management
57
58
Manage task dependencies and execution order.
59
60
```python { .api }
61
def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:
62
"""
63
Chain tasks in sequence: task1 >> task2 >> task3.
64
65
Args:
66
*tasks: Tasks to chain in order
67
"""
68
69
def chain_linear(*tasks: BaseOperator) -> None:
70
"""
71
Chain tasks linearly with explicit ordering.
72
73
Args:
74
*tasks: Tasks to chain linearly
75
"""
76
77
def cross_downstream(
78
from_tasks: Sequence[BaseOperator],
79
to_tasks: Sequence[BaseOperator]
80
) -> None:
81
"""
82
Set all tasks in from_tasks as upstream of all tasks in to_tasks.
83
84
Args:
85
from_tasks: Upstream tasks
86
to_tasks: Downstream tasks
87
"""
88
```
89
90
Usage example:
91
92
```python
93
from airflow.decorators import dag, task
94
from airflow.models.baseoperator import chain, cross_downstream
95
96
@dag(dag_id='dependency_example', start_date=datetime(2024, 1, 1))
97
def dependency_example():
98
@task
99
def start():
100
return "started"
101
102
@task
103
def extract_a():
104
return "data_a"
105
106
@task
107
def extract_b():
108
return "data_b"
109
110
@task
111
def transform_a(data):
112
return f"transformed_{data}"
113
114
@task
115
def transform_b(data):
116
return f"transformed_{data}"
117
118
@task
119
def combine(data_a, data_b):
120
return f"combined: {data_a}, {data_b}"
121
122
@task
123
def end():
124
return "finished"
125
126
# Set up dependencies
127
start_task = start()
128
extract_a_task = extract_a()
129
extract_b_task = extract_b()
130
transform_a_task = transform_a(extract_a_task)
131
transform_b_task = transform_b(extract_b_task)
132
combine_task = combine(transform_a_task, transform_b_task)
133
end_task = end()
134
135
# Chain: start >> [extract_a, extract_b] >> [transform_a, transform_b] >> combine >> end
136
chain(
137
start_task,
138
[extract_a_task, extract_b_task],
139
[transform_a_task, transform_b_task],
140
combine_task,
141
end_task
142
)
143
144
dag_instance = dependency_example()
145
```
146
147
### Template Utilities
148
149
Template rendering and macro functions.
150
151
```python { .api }
152
def render_template(
153
template: str,
154
context: Context,
155
jinja_env: Optional[Environment] = None
156
) -> str:
157
"""
158
Render Jinja template with context.
159
160
Args:
161
template: Template string
162
context: Execution context
163
jinja_env: Jinja environment
164
165
Returns:
166
Rendered template
167
"""
168
169
def render_template_from_field(
170
operator: BaseOperator,
171
field: str,
172
context: Context
173
) -> Any:
174
"""
175
Render template field from operator.
176
177
Args:
178
operator: Operator instance
179
field: Field name to render
180
context: Execution context
181
182
Returns:
183
Rendered field value
184
"""
185
186
# Built-in template functions
187
def ds_add(ds: str, days: int) -> str:
188
"""Add days to date string (YYYY-MM-DD format)."""
189
190
def ds_format(ds: str, input_format: str, output_format: str) -> str:
191
"""Format date string from one format to another."""
192
193
def macros_datetime(dt: datetime) -> datetime:
194
"""Access datetime in templates."""
195
196
def macros_timedelta(**kwargs) -> timedelta:
197
"""Create timedelta in templates."""
198
```
199
200
### Date and Time Utilities
201
202
Common date/time operations for workflow scheduling.
203
204
```python { .api }
205
from airflow.utils.dates import days_ago, round_time, infer_time_unit
206
207
def days_ago(n: int, hour: int = 0, minute: int = 0, second: int = 0) -> datetime:
208
"""
209
Get datetime n days ago.
210
211
Args:
212
n: Number of days ago
213
hour: Hour of day
214
minute: Minute of hour
215
second: Second of minute
216
217
Returns:
218
Datetime n days ago
219
"""
220
221
def round_time(dt: datetime, delta: timedelta) -> datetime:
222
"""
223
Round datetime to nearest delta interval.
224
225
Args:
226
dt: Datetime to round
227
delta: Rounding interval
228
229
Returns:
230
Rounded datetime
231
"""
232
233
def infer_time_unit(time_seconds_arr: List[float]) -> str:
234
"""
235
Infer appropriate time unit from array of seconds.
236
237
Args:
238
time_seconds_arr: Array of time values in seconds
239
240
Returns:
241
Appropriate unit ('seconds', 'minutes', 'hours', 'days')
242
"""
243
```
244
245
### State Management
246
247
Utilities for managing task and DAG states.
248
249
```python { .api }
250
from airflow.utils.state import State, DagRunState, TaskInstanceState
251
252
class State:
253
"""Base state management."""
254
255
@classmethod
256
def task_states(cls) -> Set[str]:
257
"""Get all task states."""
258
259
@classmethod
260
def dag_states(cls) -> Set[str]:
261
"""Get all DAG states."""
262
263
@classmethod
264
def finished_states(cls) -> Set[str]:
265
"""Get terminal states."""
266
267
@classmethod
268
def unfinished_states(cls) -> Set[str]:
269
"""Get non-terminal states."""
270
271
def clear_task_instances(
272
tis: List[TaskInstance],
273
session: Session,
274
dag: Optional[DAG] = None
275
) -> None:
276
"""
277
Clear task instances for retry.
278
279
Args:
280
tis: Task instances to clear
281
session: Database session
282
dag: Optional DAG instance
283
"""
284
```
285
286
### Logging Utilities
287
288
Logging configuration and utilities.
289
290
```python { .api }
291
from airflow.utils.log.logging_mixin import LoggingMixin
292
293
class LoggingMixin:
294
"""Mixin for adding logging to classes."""
295
296
@property
297
def logger(self) -> logging.Logger:
298
"""Get logger instance."""
299
300
def log_info(self, message: str) -> None:
301
"""Log info message."""
302
303
def log_warning(self, message: str) -> None:
304
"""Log warning message."""
305
306
def log_error(self, message: str) -> None:
307
"""Log error message."""
308
309
def configure_logging() -> None:
310
"""Configure Airflow logging system."""
311
312
def setup_logging(filename: Optional[str] = None) -> None:
313
"""Setup logging configuration."""
314
```
315
316
## Types
317
318
```python { .api }
319
from typing import Union, Optional, List, Dict, Any, Sequence, Set
320
from datetime import datetime, timedelta
321
from airflow.models.baseoperator import BaseOperator
322
from airflow.models.taskinstance import TaskInstance
323
from airflow.utils.context import Context
324
325
TaskLike = Union[BaseOperator, Sequence[BaseOperator]]
326
StateType = str
327
```