0
# Thread and Process Executors
1
2
Executor classes provide high-level interfaces for asynchronously executing callables using either threads or processes. Both executors inherit from the abstract Executor base class and provide the same interface with different underlying implementations.
3
4
## Capabilities
5
6
### ThreadPoolExecutor
7
8
Executes calls asynchronously using a pool of worker threads. Ideal for I/O-bound tasks and situations where you want to overlap I/O with computation rather than CPU-bound parallel processing.
9
10
```python { .api }
11
class ThreadPoolExecutor:
12
def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
13
"""
14
Initialize ThreadPoolExecutor.
15
16
Parameters:
17
- max_workers (int, optional): Maximum number of threads. Default: (cpu_count() or 1) * 5
18
- thread_name_prefix (str, optional): Name prefix for worker threads
19
- initializer (callable, optional): Function called at start of each worker thread
20
- initargs (tuple, optional): Arguments passed to initializer function
21
"""
22
23
def submit(self, fn, *args, **kwargs):
24
"""
25
Submit a callable to be executed asynchronously.
26
27
Parameters:
28
- fn (callable): Function to execute
29
- *args: Positional arguments for fn
30
- **kwargs: Keyword arguments for fn
31
32
Returns:
33
Future: Future representing the execution
34
"""
35
36
def map(self, fn, *iterables, **kwargs):
37
"""
38
Apply function to iterables in parallel.
39
40
Parameters:
41
- fn (callable): Function to apply to each element
42
- *iterables: Iterables to process
43
- timeout (float, optional): Maximum time to wait for all results
44
45
Returns:
46
iterator: Results in same order as input
47
48
Raises:
49
TimeoutError: If timeout exceeded
50
"""
51
52
def shutdown(self, wait=True):
53
"""
54
Clean up executor resources.
55
56
Parameters:
57
- wait (bool): Whether to wait for pending futures to complete
58
"""
59
```
60
61
#### Usage Examples
62
63
**Basic ThreadPoolExecutor Usage:**
64
65
```python
66
from concurrent.futures import ThreadPoolExecutor
67
import time
68
69
def io_task(n):
70
time.sleep(0.1) # Simulate I/O
71
return f"Task {n} completed"
72
73
# Context manager ensures proper cleanup
74
with ThreadPoolExecutor(max_workers=4) as executor:
75
# Submit individual tasks
76
future1 = executor.submit(io_task, 1)
77
future2 = executor.submit(io_task, 2)
78
79
# Get results
80
print(future1.result()) # "Task 1 completed"
81
print(future2.result()) # "Task 2 completed"
82
```
83
84
**Using map() for batch processing:**
85
86
```python
87
with ThreadPoolExecutor(max_workers=3) as executor:
88
# Process multiple items in parallel
89
results = list(executor.map(io_task, range(5)))
90
print(results) # ['Task 0 completed', 'Task 1 completed', ...]
91
```
92
93
**Thread naming and initialization:**
94
95
```python
96
def init_worker():
97
print(f"Worker {threading.current_thread().name} starting")
98
99
with ThreadPoolExecutor(
100
max_workers=2,
101
thread_name_prefix='MyWorker',
102
initializer=init_worker
103
) as executor:
104
future = executor.submit(io_task, 1)
105
result = future.result()
106
```
107
108
### ProcessPoolExecutor
109
110
Executes calls asynchronously using a pool of worker processes. Best for CPU-bound tasks that can benefit from true parallelism, though it has known limitations on Python 2.
111
112
```python { .api }
113
class ProcessPoolExecutor:
114
def __init__(self, max_workers=None):
115
"""
116
Initialize ProcessPoolExecutor.
117
118
Parameters:
119
- max_workers (int, optional): Maximum number of processes. Default: cpu_count()
120
"""
121
122
def submit(self, fn, *args, **kwargs):
123
"""
124
Submit a callable to be executed asynchronously.
125
126
Parameters:
127
- fn (callable): Function to execute (must be picklable)
128
- *args: Positional arguments for fn (must be picklable)
129
- **kwargs: Keyword arguments for fn (must be picklable)
130
131
Returns:
132
Future: Future representing the execution
133
"""
134
135
def map(self, fn, *iterables, **kwargs):
136
"""
137
Apply function to iterables in parallel across processes.
138
139
Parameters:
140
- fn (callable): Function to apply (must be picklable)
141
- *iterables: Iterables to process (must be picklable)
142
- timeout (float, optional): Maximum time to wait for all results
143
144
Returns:
145
iterator: Results in same order as input
146
147
Raises:
148
TimeoutError: If timeout exceeded
149
"""
150
151
def shutdown(self, wait=True):
152
"""
153
Clean up executor resources.
154
155
Parameters:
156
- wait (bool): Whether to wait for pending futures to complete
157
"""
158
```
159
160
#### Usage Examples
161
162
**Basic ProcessPoolExecutor Usage:**
163
164
```python
165
from concurrent.futures import ProcessPoolExecutor
166
import math
167
168
def cpu_task(n):
169
# CPU-intensive calculation
170
return sum(math.sqrt(i) for i in range(n * 1000))
171
172
# Only use ProcessPoolExecutor for CPU-bound tasks
173
with ProcessPoolExecutor(max_workers=2) as executor:
174
future1 = executor.submit(cpu_task, 100)
175
future2 = executor.submit(cpu_task, 200)
176
177
result1 = future1.result()
178
result2 = future2.result()
179
print(f"Results: {result1}, {result2}")
180
```
181
182
**Important ProcessPoolExecutor Considerations:**
183
184
```python
185
# Functions and arguments must be picklable
186
def process_data(data_list):
187
return [x * 2 for x in data_list]
188
189
# This works - function and arguments are picklable
190
with ProcessPoolExecutor() as executor:
191
data = [1, 2, 3, 4, 5]
192
future = executor.submit(process_data, data)
193
result = future.result() # [2, 4, 6, 8, 10]
194
```
195
196
### Executor Base Class
197
198
Both executor classes inherit from this abstract base class:
199
200
```python { .api }
201
class Executor:
202
def submit(self, fn, *args, **kwargs):
203
"""Submit callable for execution. Returns Future."""
204
205
def map(self, fn, *iterables, **kwargs):
206
"""Map function over iterables in parallel."""
207
208
def shutdown(self, wait=True):
209
"""Clean up resources."""
210
211
def __enter__(self):
212
"""Context manager entry."""
213
214
def __exit__(self, exc_type, exc_val, exc_tb):
215
"""Context manager exit with cleanup."""
216
```
217
218
## Error Handling
219
220
### BrokenExecutor Exceptions
221
222
```python { .api }
223
class BrokenExecutor(RuntimeError):
224
"""Raised when executor becomes non-functional after severe failure."""
225
226
class BrokenThreadPool(BrokenExecutor):
227
"""Raised when ThreadPoolExecutor worker thread fails during initialization."""
228
```
229
230
### Common Error Patterns
231
232
**Handling executor errors:**
233
234
```python
235
from concurrent.futures import ThreadPoolExecutor, BrokenThreadPool
236
237
def failing_initializer():
238
raise ValueError("Initialization failed")
239
240
try:
241
with ThreadPoolExecutor(initializer=failing_initializer) as executor:
242
future = executor.submit(lambda: "test")
243
result = future.result()
244
except BrokenThreadPool as e:
245
print(f"Thread pool broken: {e}")
246
```
247
248
**Shutdown after exceptions:**
249
250
```python
251
executor = ThreadPoolExecutor(max_workers=2)
252
try:
253
# Submit work
254
future = executor.submit(some_function)
255
result = future.result()
256
finally:
257
# Always clean up
258
executor.shutdown(wait=True)
259
```
260
261
## Performance Considerations
262
263
- **ThreadPoolExecutor**: Default worker count is `(cpu_count() or 1) * 5`, optimized for I/O-bound tasks
264
- **ProcessPoolExecutor**: Default worker count is `cpu_count()`, optimized for CPU-bound tasks
265
- **Context Managers**: Always use `with` statements or explicit `shutdown()` calls for proper cleanup
266
- **Task Granularity**: Balance task size - too small increases overhead, too large reduces parallelism
267
- **Pickling Overhead**: ProcessPoolExecutor requires pickling arguments and results, adding overhead