or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assets-scheduling.mdcli-utilities.mdconfiguration.mddag-management.mddatabase-models.mdexceptions.mdexecutors.mdextensions.mdindex.mdtask-operators.mdxcom.md

executors.mddocs/

0

# Executors

1

2

Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development. Executors determine how and where tasks are executed in Airflow.

3

4

## Capabilities

5

6

### Base Executor

7

8

Foundation for all Airflow executors providing core execution interface.

9

10

```python { .api }

11

class BaseExecutor:

12

def __init__(self, parallelism: int = 32):

13

"""

14

Base executor implementation.

15

16

Args:

17

parallelism: Maximum number of parallel tasks

18

"""

19

20

def execute_async(

21

self,

22

key: TaskInstanceKey,

23

command: CommandType,

24

queue: Optional[str] = None,

25

executor_config: Optional[Dict] = None

26

) -> None:

27

"""

28

Execute task asynchronously.

29

30

Args:

31

key: Task instance key

32

command: Command to execute

33

queue: Execution queue

34

executor_config: Executor-specific configuration

35

"""

36

37

def sync(self) -> None:

38

"""Sync executor state and collect results."""

39

40

def heartbeat(self) -> None:

41

"""Heartbeat to maintain executor health."""

42

43

def end(self) -> None:

44

"""Clean shutdown of executor."""

45

46

def terminate(self) -> None:

47

"""Force terminate executor."""

48

```

49

50

### Local Executor

51

52

Execute tasks in separate processes on the same machine.

53

54

```python { .api }

55

class LocalExecutor(BaseExecutor):

56

def __init__(self, parallelism: int = 0):

57

"""

58

Local process executor.

59

60

Args:

61

parallelism: Max parallel tasks (0 = unlimited)

62

"""

63

64

def execute_async(

65

self,

66

key: TaskInstanceKey,

67

command: CommandType,

68

queue: Optional[str] = None,

69

executor_config: Optional[Dict] = None

70

) -> None:

71

"""Execute task in local subprocess."""

72

73

def sync(self) -> None:

74

"""Collect completed task results."""

75

```

76

77

Usage example:

78

79

```python

80

# Configuration for LocalExecutor

81

EXECUTOR_CONFIG = {

82

'core': {

83

'executor': 'LocalExecutor',

84

'parallelism': 16,

85

'max_active_runs_per_dag': 4

86

}

87

}

88

```

89

90

### Sequential Executor

91

92

Execute tasks one at a time (for testing and development).

93

94

```python { .api }

95

class SequentialExecutor(BaseExecutor):

96

def __init__(self):

97

"""Sequential executor for single-threaded execution."""

98

99

def execute_async(

100

self,

101

key: TaskInstanceKey,

102

command: CommandType,

103

queue: Optional[str] = None,

104

executor_config: Optional[Dict] = None

105

) -> None:

106

"""Execute task immediately in current process."""

107

```

108

109

### Celery Executor

110

111

Distribute tasks across multiple worker nodes using Celery.

112

113

```python { .api }

114

class CeleryExecutor(BaseExecutor):

115

def __init__(self):

116

"""Celery-based distributed executor."""

117

118

def execute_async(

119

self,

120

key: TaskInstanceKey,

121

command: CommandType,

122

queue: Optional[str] = 'default',

123

executor_config: Optional[Dict] = None

124

) -> None:

125

"""Submit task to Celery worker queue."""

126

127

def sync(self) -> None:

128

"""Check Celery task status and collect results."""

129

```

130

131

### Kubernetes Executor

132

133

Execute tasks as Kubernetes pods.

134

135

```python { .api }

136

class KubernetesExecutor(BaseExecutor):

137

def __init__(self):

138

"""Kubernetes pod executor."""

139

140

def execute_async(

141

self,

142

key: TaskInstanceKey,

143

command: CommandType,

144

queue: Optional[str] = None,

145

executor_config: Optional[Dict] = None

146

) -> None:

147

"""Create Kubernetes pod for task execution."""

148

149

def sync(self) -> None:

150

"""Monitor pod status and collect results."""

151

152

def adopt_launched_task(

153

self,

154

kube_client: Any,

155

pod: Any,

156

pods: Dict[TaskInstanceKey, Any]

157

) -> None:

158

"""Adopt running pod for monitoring."""

159

```

160

161

## Types

162

163

```python { .api }

164

from typing import Optional, Dict, Any, List, Tuple

165

from airflow.models.taskinstance import TaskInstanceKey

166

167

CommandType = List[str]

168

ExecutorConfigType = Optional[Dict[str, Any]]

169

QueuedTaskInstanceType = Tuple[TaskInstanceKey, CommandType, Optional[str], ExecutorConfigType]

170

```