or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

executors.mdfutures.mdindex.mdutilities.md

executors.mddocs/

0

# Thread and Process Executors

1

2

Executor classes provide high-level interfaces for asynchronously executing callables using either threads or processes. Both executors inherit from the abstract Executor base class and provide the same interface with different underlying implementations.

3

4

## Capabilities

5

6

### ThreadPoolExecutor

7

8

Executes calls asynchronously using a pool of worker threads. Ideal for I/O-bound tasks and situations where you want to overlap I/O with computation rather than CPU-bound parallel processing.

9

10

```python { .api }

11

class ThreadPoolExecutor:

12

def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):

13

"""

14

Initialize ThreadPoolExecutor.

15

16

Parameters:

17

- max_workers (int, optional): Maximum number of threads. Default: (cpu_count() or 1) * 5

18

- thread_name_prefix (str, optional): Name prefix for worker threads

19

- initializer (callable, optional): Function called at start of each worker thread

20

- initargs (tuple, optional): Arguments passed to initializer function

21

"""

22

23

def submit(self, fn, *args, **kwargs):

24

"""

25

Submit a callable to be executed asynchronously.

26

27

Parameters:

28

- fn (callable): Function to execute

29

- *args: Positional arguments for fn

30

- **kwargs: Keyword arguments for fn

31

32

Returns:

33

Future: Future representing the execution

34

"""

35

36

def map(self, fn, *iterables, **kwargs):

37

"""

38

Apply function to iterables in parallel.

39

40

Parameters:

41

- fn (callable): Function to apply to each element

42

- *iterables: Iterables to process

43

- timeout (float, optional): Maximum time to wait for all results

44

45

Returns:

46

iterator: Results in same order as input

47

48

Raises:

49

TimeoutError: If timeout exceeded

50

"""

51

52

def shutdown(self, wait=True):

53

"""

54

Clean up executor resources.

55

56

Parameters:

57

- wait (bool): Whether to wait for pending futures to complete

58

"""

59

```

60

61

#### Usage Examples

62

63

**Basic ThreadPoolExecutor Usage:**

64

65

```python

66

from concurrent.futures import ThreadPoolExecutor

67

import time

68

69

def io_task(n):

70

time.sleep(0.1) # Simulate I/O

71

return f"Task {n} completed"

72

73

# Context manager ensures proper cleanup

74

with ThreadPoolExecutor(max_workers=4) as executor:

75

# Submit individual tasks

76

future1 = executor.submit(io_task, 1)

77

future2 = executor.submit(io_task, 2)

78

79

# Get results

80

print(future1.result()) # "Task 1 completed"

81

print(future2.result()) # "Task 2 completed"

82

```

83

84

**Using map() for batch processing:**

85

86

```python

87

with ThreadPoolExecutor(max_workers=3) as executor:

88

# Process multiple items in parallel

89

results = list(executor.map(io_task, range(5)))

90

print(results) # ['Task 0 completed', 'Task 1 completed', ...]

91

```

92

93

**Thread naming and initialization:**

94

95

```python

96

def init_worker():

97

print(f"Worker {threading.current_thread().name} starting")

98

99

with ThreadPoolExecutor(

100

max_workers=2,

101

thread_name_prefix='MyWorker',

102

initializer=init_worker

103

) as executor:

104

future = executor.submit(io_task, 1)

105

result = future.result()

106

```

107

108

### ProcessPoolExecutor

109

110

Executes calls asynchronously using a pool of worker processes. Best for CPU-bound tasks that can benefit from true parallelism, though it has known limitations on Python 2.

111

112

```python { .api }

113

class ProcessPoolExecutor:

114

def __init__(self, max_workers=None):

115

"""

116

Initialize ProcessPoolExecutor.

117

118

Parameters:

119

- max_workers (int, optional): Maximum number of processes. Default: cpu_count()

120

"""

121

122

def submit(self, fn, *args, **kwargs):

123

"""

124

Submit a callable to be executed asynchronously.

125

126

Parameters:

127

- fn (callable): Function to execute (must be picklable)

128

- *args: Positional arguments for fn (must be picklable)

129

- **kwargs: Keyword arguments for fn (must be picklable)

130

131

Returns:

132

Future: Future representing the execution

133

"""

134

135

def map(self, fn, *iterables, **kwargs):

136

"""

137

Apply function to iterables in parallel across processes.

138

139

Parameters:

140

- fn (callable): Function to apply (must be picklable)

141

- *iterables: Iterables to process (must be picklable)

142

- timeout (float, optional): Maximum time to wait for all results

143

144

Returns:

145

iterator: Results in same order as input

146

147

Raises:

148

TimeoutError: If timeout exceeded

149

"""

150

151

def shutdown(self, wait=True):

152

"""

153

Clean up executor resources.

154

155

Parameters:

156

- wait (bool): Whether to wait for pending futures to complete

157

"""

158

```

159

160

#### Usage Examples

161

162

**Basic ProcessPoolExecutor Usage:**

163

164

```python

165

from concurrent.futures import ProcessPoolExecutor

166

import math

167

168

def cpu_task(n):

169

# CPU-intensive calculation

170

return sum(math.sqrt(i) for i in range(n * 1000))

171

172

# Only use ProcessPoolExecutor for CPU-bound tasks

173

with ProcessPoolExecutor(max_workers=2) as executor:

174

future1 = executor.submit(cpu_task, 100)

175

future2 = executor.submit(cpu_task, 200)

176

177

result1 = future1.result()

178

result2 = future2.result()

179

print(f"Results: {result1}, {result2}")

180

```

181

182

**Important ProcessPoolExecutor Considerations:**

183

184

```python

185

# Functions and arguments must be picklable

186

def process_data(data_list):

187

return [x * 2 for x in data_list]

188

189

# This works - function and arguments are picklable

190

with ProcessPoolExecutor() as executor:

191

data = [1, 2, 3, 4, 5]

192

future = executor.submit(process_data, data)

193

result = future.result() # [2, 4, 6, 8, 10]

194

```

195

196

### Executor Base Class

197

198

Both executor classes inherit from this abstract base class:

199

200

```python { .api }

201

class Executor:

202

def submit(self, fn, *args, **kwargs):

203

"""Submit callable for execution. Returns Future."""

204

205

def map(self, fn, *iterables, **kwargs):

206

"""Map function over iterables in parallel."""

207

208

def shutdown(self, wait=True):

209

"""Clean up resources."""

210

211

def __enter__(self):

212

"""Context manager entry."""

213

214

def __exit__(self, exc_type, exc_val, exc_tb):

215

"""Context manager exit with cleanup."""

216

```

217

218

## Error Handling

219

220

### BrokenExecutor Exceptions

221

222

```python { .api }

223

class BrokenExecutor(RuntimeError):

224

"""Raised when executor becomes non-functional after severe failure."""

225

226

class BrokenThreadPool(BrokenExecutor):

227

"""Raised when ThreadPoolExecutor worker thread fails during initialization."""

228

```

229

230

### Common Error Patterns

231

232

**Handling executor errors:**

233

234

```python

235

from concurrent.futures import ThreadPoolExecutor, BrokenThreadPool

236

237

def failing_initializer():

238

raise ValueError("Initialization failed")

239

240

try:

241

with ThreadPoolExecutor(initializer=failing_initializer) as executor:

242

future = executor.submit(lambda: "test")

243

result = future.result()

244

except BrokenThreadPool as e:

245

print(f"Thread pool broken: {e}")

246

```

247

248

**Shutdown after exceptions:**

249

250

```python

251

executor = ThreadPoolExecutor(max_workers=2)

252

try:

253

# Submit work

254

future = executor.submit(some_function)

255

result = future.result()

256

finally:

257

# Always clean up

258

executor.shutdown(wait=True)

259

```

260

261

## Performance Considerations

262

263

- **ThreadPoolExecutor**: Default worker count is `(cpu_count() or 1) * 5`, optimized for I/O-bound tasks

264

- **ProcessPoolExecutor**: Default worker count is `cpu_count()`, optimized for CPU-bound tasks

265

- **Context Managers**: Always use `with` statements or explicit `shutdown()` calls for proper cleanup

266

- **Task Granularity**: Balance task size - too small increases overhead, too large reduces parallelism

267

- **Pickling Overhead**: ProcessPoolExecutor requires pickling arguments and results, adding overhead