0
# Task Operators
1
2
Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management. Tasks represent individual units of work within DAGs.
3
4
## Capabilities
5
6
### Base Operator
7
8
Foundation class for all Airflow operators, providing core task functionality and lifecycle management.
9
10
```python { .api }
11
class BaseOperator:
12
def __init__(
13
self,
14
task_id: str,
15
owner: str = "airflow",
16
email: Optional[Union[str, List[str]]] = None,
17
email_on_retry: bool = True,
18
email_on_failure: bool = True,
19
retries: Optional[int] = None,
20
retry_delay: timedelta = timedelta(seconds=300),
21
retry_exponential_backoff: bool = False,
22
max_retry_delay: Optional[timedelta] = None,
23
start_date: Optional[datetime] = None,
24
end_date: Optional[datetime] = None,
25
depends_on_past: bool = False,
26
wait_for_downstream: bool = False,
27
dag: Optional[DAG] = None,
28
params: Optional[Dict[str, Any]] = None,
29
default_args: Optional[Dict[str, Any]] = None,
30
pool: Optional[str] = None,
31
pool_slots: int = 1,
32
queue: Optional[str] = None,
33
priority_weight: int = 1,
34
weight_rule: str = "downstream",
35
sla: Optional[timedelta] = None,
36
execution_timeout: Optional[timedelta] = None,
37
on_execute_callback: Optional[Callable] = None,
38
on_failure_callback: Optional[Callable] = None,
39
on_success_callback: Optional[Callable] = None,
40
on_retry_callback: Optional[Callable] = None,
41
trigger_rule: str = "all_success",
42
resources: Optional[Dict[str, Any]] = None,
43
run_as_user: Optional[str] = None,
44
task_concurrency: Optional[int] = None,
45
max_active_tis_per_dag: Optional[int] = None,
46
executor_config: Optional[Dict[str, Any]] = None,
47
do_xcom_push: bool = True,
48
inlets: Optional[List[Any]] = None,
49
outlets: Optional[List[Any]] = None,
50
task_group: Optional[TaskGroup] = None,
51
doc: Optional[str] = None,
52
doc_md: Optional[str] = None,
53
doc_json: Optional[str] = None,
54
doc_yaml: Optional[str] = None,
55
doc_rst: Optional[str] = None,
56
wait_for_past_depends_before_skipping: bool = False,
57
max_active_tis_per_dagrun: Optional[int] = None,
58
map_index_template: Optional[str] = None,
59
multiple_outputs: bool = False,
60
task_display_name: Optional[str] = None,
61
logger_name: Optional[str] = None,
62
allow_nested_operators: bool = True,
63
**kwargs
64
):
65
"""
66
Base operator for all Airflow tasks.
67
68
Args:
69
task_id: Unique identifier for the task
70
owner: Owner of the task
71
retries: Number of retries when task fails
72
retry_delay: Delay between retries
73
start_date: When the task should start being scheduled
74
end_date: When the task should stop being scheduled
75
depends_on_past: Whether task depends on previous run success
76
pool: Resource pool for task execution
77
priority_weight: Task priority for execution order
78
trigger_rule: Rule for task triggering based on upstream tasks
79
execution_timeout: Maximum runtime before task timeout
80
"""
81
82
def execute(self, context: Context) -> Any:
83
"""Execute the task logic. Must be implemented by subclasses."""
84
85
def on_kill(self) -> None:
86
"""Called when task is killed for cleanup."""
87
88
def defer(self, trigger: BaseTrigger, method_name: str, **kwargs) -> None:
89
"""Defer task execution to a trigger."""
90
91
def resume_execution(self, context: Context, event: Dict[str, Any]) -> Any:
92
"""Resume execution after deferral."""
93
94
def render_template_fields(
95
self,
96
context: Context,
97
jinja_env: Optional[jinja2.Environment] = None
98
) -> None:
99
"""Render Jinja templates in task fields."""
100
```
101
102
### Task Decorator
103
104
Modern approach to task definition using decorators for cleaner, more Pythonic task creation.
105
106
```python { .api }
107
@task(
108
task_id: Optional[str] = None,
109
python_callable: Optional[Callable] = None,
110
op_args: Optional[List[Any]] = None,
111
op_kwargs: Optional[Dict[str, Any]] = None,
112
templates_dict: Optional[Dict[str, Any]] = None,
113
templates_exts: Optional[List[str]] = None,
114
show_return_value_in_logs: bool = True,
115
**kwargs
116
) -> Callable:
117
"""
118
Decorator to create a task from a Python function.
119
120
Args:
121
task_id: Unique identifier (auto-generated from function name if not provided)
122
python_callable: The Python function to execute
123
op_args: Positional arguments to pass to the function
124
op_kwargs: Keyword arguments to pass to the function
125
templates_dict: Dictionary of templates to render
126
show_return_value_in_logs: Whether to log return value
127
128
Returns:
129
Decorated function that returns task output
130
"""
131
132
@task.setup(
133
task_id: Optional[str] = None,
134
**kwargs
135
) -> Callable:
136
"""
137
Decorator for setup tasks that run before other tasks.
138
139
Args:
140
task_id: Unique identifier
141
**kwargs: Additional task arguments
142
143
Returns:
144
Decorated function for setup task
145
"""
146
147
@task.teardown(
148
task_id: Optional[str] = None,
149
**kwargs
150
) -> Callable:
151
"""
152
Decorator for teardown tasks that run after other tasks.
153
154
Args:
155
task_id: Unique identifier
156
**kwargs: Additional task arguments
157
158
Returns:
159
Decorated function for teardown task
160
"""
161
```
162
163
Usage example:
164
165
```python
166
from airflow.decorators import dag, task
167
from datetime import datetime
168
169
@dag(dag_id='task_decorator_example', start_date=datetime(2024, 1, 1))
170
def task_decorator_example():
171
@task
172
def extract_data(source: str) -> dict:
173
"""Extract data from source."""
174
return {'data': f'extracted from {source}', 'count': 100}
175
176
@task
177
def transform_data(data: dict) -> dict:
178
"""Transform the data."""
179
return {
180
'transformed_data': data['data'].upper(),
181
'processed_count': data['count'] * 2
182
}
183
184
@task.setup
185
def setup_environment():
186
"""Setup task that runs first."""
187
print("Setting up environment")
188
189
@task.teardown
190
def cleanup():
191
"""Cleanup task that runs last."""
192
print("Cleaning up")
193
194
# Define dependencies
195
setup_environment()
196
raw_data = extract_data('database')
197
processed_data = transform_data(raw_data)
198
cleanup()
199
200
dag_instance = task_decorator_example()
201
```
202
203
### Dynamic Task Mapping
204
205
Create tasks dynamically at runtime based on input data or external conditions.
206
207
```python { .api }
208
class MappedOperator:
209
"""
210
Operator created through dynamic task mapping.
211
212
Attributes:
213
task_id: Base task identifier
214
operator_class: Original operator class
215
mapped_op_kwargs: Mapped operator arguments
216
partial_kwargs: Static operator arguments
217
"""
218
task_id: str
219
operator_class: type
220
mapped_op_kwargs: Dict[str, Any]
221
partial_kwargs: Dict[str, Any]
222
223
def expand(self, **mapped_kwargs) -> 'MappedOperator':
224
"""
225
Expand operator with mapped arguments.
226
227
Args:
228
**mapped_kwargs: Arguments to map over
229
230
Returns:
231
MappedOperator instance
232
"""
233
234
def partial(self, **partial_kwargs) -> 'MappedOperator':
235
"""
236
Set static arguments for mapped operator.
237
238
Args:
239
**partial_kwargs: Static arguments
240
241
Returns:
242
Partially configured MappedOperator
243
"""
244
```
245
246
Usage example:
247
248
```python
249
from airflow.decorators import dag, task
250
251
@dag(dag_id='dynamic_mapping_example', start_date=datetime(2024, 1, 1))
252
def dynamic_mapping_example():
253
@task
254
def get_file_list() -> List[str]:
255
"""Get list of files to process."""
256
return ['file1.csv', 'file2.csv', 'file3.csv']
257
258
@task
259
def process_file(filename: str) -> str:
260
"""Process a single file."""
261
return f"processed {filename}"
262
263
# Dynamic mapping - creates one task per file
264
files = get_file_list()
265
process_file.expand(filename=files)
266
267
dag_instance = dynamic_mapping_example()
268
```
269
270
### Task Instance
271
272
Represents a specific execution of a task within a DAG run.
273
274
```python { .api }
275
class TaskInstance:
276
"""
277
ORM model for task instance execution.
278
279
Attributes:
280
task_id: Task identifier
281
dag_id: DAG identifier
282
run_id: DAG run identifier
283
execution_date: Execution date
284
start_date: When task started
285
end_date: When task ended
286
duration: Task execution duration
287
state: Current task state
288
try_number: Current retry attempt
289
max_tries: Maximum retry attempts
290
hostname: Worker hostname
291
unixname: Unix username
292
job_id: Job identifier
293
pool: Resource pool
294
pool_slots: Number of pool slots used
295
queue: Execution queue
296
priority_weight: Task priority
297
operator: Operator class name
298
queued_dttm: When task was queued
299
pid: Process ID
300
executor_config: Executor configuration
301
external_executor_id: External executor identifier
302
trigger_id: Trigger identifier (for deferred tasks)
303
next_method: Next method to call
304
next_kwargs: Arguments for next method
305
"""
306
id: Optional[UUID]
307
task_id: str
308
dag_id: str
309
run_id: str
310
map_index: int
311
execution_date: datetime
312
start_date: Optional[datetime]
313
end_date: Optional[datetime]
314
duration: Optional[float]
315
state: Optional[str]
316
try_number: int
317
max_tries: int
318
hostname: str
319
unixname: str
320
job_id: Optional[int]
321
pool: str
322
pool_slots: int
323
queue: str
324
priority_weight: int
325
operator: str
326
queued_dttm: Optional[datetime]
327
pid: Optional[int]
328
executor_config: Optional[Dict]
329
external_executor_id: Optional[str]
330
trigger_id: Optional[int]
331
next_method: Optional[str]
332
next_kwargs: Optional[Dict]
333
334
def clear_task_instances(
335
self,
336
tis: List['TaskInstance'],
337
session: Session = None,
338
dag: Optional[DAG] = None
339
) -> None:
340
"""Clear task instances for retry."""
341
342
def get_task_instance(
343
self,
344
task_id: str,
345
execution_date: datetime,
346
session: Session = None
347
) -> Optional['TaskInstance']:
348
"""Get task instance by ID and execution date."""
349
```
350
351
### Task Dependencies
352
353
Manage dependencies between tasks using various trigger rules and patterns.
354
355
```python { .api }
356
def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:
357
"""
358
Chain tasks sequentially.
359
360
Args:
361
*tasks: Tasks to chain in order
362
"""
363
364
def chain_linear(*tasks: BaseOperator) -> None:
365
"""
366
Chain tasks in a linear sequence.
367
368
Args:
369
*tasks: Tasks to chain linearly
370
"""
371
372
def cross_downstream(
373
from_tasks: Sequence[BaseOperator],
374
to_tasks: Sequence[BaseOperator]
375
) -> None:
376
"""
377
Create dependencies from all tasks in from_tasks to all tasks in to_tasks.
378
379
Args:
380
from_tasks: Source tasks
381
to_tasks: Target tasks
382
"""
383
```
384
385
Usage example:
386
387
```python
388
from airflow.decorators import dag, task
389
from airflow.models.baseoperator import chain, cross_downstream
390
391
@dag(dag_id='dependencies_example', start_date=datetime(2024, 1, 1))
392
def dependencies_example():
393
@task
394
def start():
395
return "started"
396
397
@task
398
def process_a():
399
return "a"
400
401
@task
402
def process_b():
403
return "b"
404
405
@task
406
def combine():
407
return "combined"
408
409
@task
410
def end():
411
return "ended"
412
413
# Linear chain
414
start_task = start()
415
process_a_task = process_a()
416
process_b_task = process_b()
417
combine_task = combine()
418
end_task = end()
419
420
# Set up dependencies
421
chain(start_task, [process_a_task, process_b_task], combine_task, end_task)
422
423
dag_instance = dependencies_example()
424
```
425
426
### Task States and Lifecycle
427
428
Task execution states and lifecycle management.
429
430
```python { .api }
431
from airflow.utils.state import TaskInstanceState
432
433
# Task States
434
class TaskInstanceState:
435
"""Task instance states."""
436
NONE: str = "none"
437
SCHEDULED: str = "scheduled"
438
QUEUED: str = "queued"
439
RUNNING: str = "running"
440
SUCCESS: str = "success"
441
SHUTDOWN: str = "shutdown"
442
RESTARTING: str = "restarting"
443
FAILED: str = "failed"
444
UP_FOR_RETRY: str = "up_for_retry"
445
UP_FOR_RESCHEDULE: str = "up_for_reschedule"
446
UPSTREAM_FAILED: str = "upstream_failed"
447
SKIPPED: str = "skipped"
448
REMOVED: str = "removed"
449
DEFERRED: str = "deferred"
450
451
# Trigger Rules
452
TRIGGER_RULES = [
453
"all_success", # All upstream tasks succeeded
454
"all_failed", # All upstream tasks failed
455
"all_done", # All upstream tasks completed (success or failed)
456
"one_success", # At least one upstream task succeeded
457
"one_failed", # At least one upstream task failed
458
"none_failed", # No upstream tasks failed
459
"none_failed_min_one_success", # No failures and at least one success
460
"none_skipped", # No upstream tasks skipped
461
"always", # Always run regardless of upstream state
462
]
463
```
464
465
## Types
466
467
```python { .api }
468
from typing import Union, Optional, List, Dict, Callable, Any, Sequence
469
from datetime import datetime, timedelta
470
from airflow.models.dag import DAG
471
from airflow.models.taskgroup import TaskGroup
472
from airflow.utils.context import Context
473
474
TaskState = Literal[
475
"none", "scheduled", "queued", "running", "success",
476
"failed", "up_for_retry", "upstream_failed", "skipped"
477
]
478
479
TriggerRule = Literal[
480
"all_success", "all_failed", "all_done", "one_success",
481
"one_failed", "none_failed", "always"
482
]
483
```