0
# Executors
1
2
Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development. Executors determine how and where tasks are executed in Airflow.
3
4
## Capabilities
5
6
### Base Executor
7
8
Foundation for all Airflow executors providing core execution interface.
9
10
```python { .api }
11
class BaseExecutor:
12
def __init__(self, parallelism: int = 32):
13
"""
14
Base executor implementation.
15
16
Args:
17
parallelism: Maximum number of parallel tasks
18
"""
19
20
def execute_async(
21
self,
22
key: TaskInstanceKey,
23
command: CommandType,
24
queue: Optional[str] = None,
25
executor_config: Optional[Dict] = None
26
) -> None:
27
"""
28
Execute task asynchronously.
29
30
Args:
31
key: Task instance key
32
command: Command to execute
33
queue: Execution queue
34
executor_config: Executor-specific configuration
35
"""
36
37
def sync(self) -> None:
38
"""Sync executor state and collect results."""
39
40
def heartbeat(self) -> None:
41
"""Heartbeat to maintain executor health."""
42
43
def end(self) -> None:
44
"""Clean shutdown of executor."""
45
46
def terminate(self) -> None:
47
"""Force terminate executor."""
48
```
49
50
### Local Executor
51
52
Execute tasks in separate processes on the same machine.
53
54
```python { .api }
55
class LocalExecutor(BaseExecutor):
56
def __init__(self, parallelism: int = 0):
57
"""
58
Local process executor.
59
60
Args:
61
parallelism: Max parallel tasks (0 = unlimited)
62
"""
63
64
def execute_async(
65
self,
66
key: TaskInstanceKey,
67
command: CommandType,
68
queue: Optional[str] = None,
69
executor_config: Optional[Dict] = None
70
) -> None:
71
"""Execute task in local subprocess."""
72
73
def sync(self) -> None:
74
"""Collect completed task results."""
75
```
76
77
Usage example:
78
79
```python
80
# Configuration for LocalExecutor
81
EXECUTOR_CONFIG = {
82
'core': {
83
'executor': 'LocalExecutor',
84
'parallelism': 16,
85
'max_active_runs_per_dag': 4
86
}
87
}
88
```
89
90
### Sequential Executor
91
92
Execute tasks one at a time (for testing and development).
93
94
```python { .api }
95
class SequentialExecutor(BaseExecutor):
96
def __init__(self):
97
"""Sequential executor for single-threaded execution."""
98
99
def execute_async(
100
self,
101
key: TaskInstanceKey,
102
command: CommandType,
103
queue: Optional[str] = None,
104
executor_config: Optional[Dict] = None
105
) -> None:
106
"""Execute task immediately in current process."""
107
```
108
109
### Celery Executor
110
111
Distribute tasks across multiple worker nodes using Celery.
112
113
```python { .api }
114
class CeleryExecutor(BaseExecutor):
115
def __init__(self):
116
"""Celery-based distributed executor."""
117
118
def execute_async(
119
self,
120
key: TaskInstanceKey,
121
command: CommandType,
122
queue: Optional[str] = 'default',
123
executor_config: Optional[Dict] = None
124
) -> None:
125
"""Submit task to Celery worker queue."""
126
127
def sync(self) -> None:
128
"""Check Celery task status and collect results."""
129
```
130
131
### Kubernetes Executor
132
133
Execute tasks as Kubernetes pods.
134
135
```python { .api }
136
class KubernetesExecutor(BaseExecutor):
137
def __init__(self):
138
"""Kubernetes pod executor."""
139
140
def execute_async(
141
self,
142
key: TaskInstanceKey,
143
command: CommandType,
144
queue: Optional[str] = None,
145
executor_config: Optional[Dict] = None
146
) -> None:
147
"""Create Kubernetes pod for task execution."""
148
149
def sync(self) -> None:
150
"""Monitor pod status and collect results."""
151
152
def adopt_launched_task(
153
self,
154
kube_client: Any,
155
pod: Any,
156
pods: Dict[TaskInstanceKey, Any]
157
) -> None:
158
"""Adopt running pod for monitoring."""
159
```
160
161
## Types
162
163
```python { .api }
164
from typing import Optional, Dict, Any, List, Tuple
165
from airflow.models.taskinstance import TaskInstanceKey
166
167
CommandType = List[str]
168
ExecutorConfigType = Optional[Dict[str, Any]]
169
QueuedTaskInstanceType = Tuple[TaskInstanceKey, CommandType, Optional[str], ExecutorConfigType]
170
```