0
# Process Pool Executor
1
2
The ProcessPoolExecutor class provides a robust implementation of parallel task execution using worker processes. It offers enhanced reliability, consistent spawn behavior, and better error handling compared to the standard library implementation.
3
4
## Capabilities
5
6
### ProcessPoolExecutor Class
7
8
Main executor class for managing a pool of worker processes with configurable parameters and robust error handling.
9
10
```python { .api }
11
class ProcessPoolExecutor(Executor):
12
"""
13
Robust ProcessPoolExecutor with enhanced error handling and consistent spawn behavior.
14
15
Parameters:
16
- max_workers (int, optional): Maximum number of worker processes. Defaults to cpu_count()
17
- job_reducers (dict, optional): Custom reducers for job serialization
18
- result_reducers (dict, optional): Custom reducers for result serialization
19
- timeout (int, optional): Worker idle timeout in seconds. Default is None
20
- context (multiprocessing context, optional): Multiprocessing context for creating processes
21
- initializer (callable, optional): Function called at worker startup
22
- initargs (tuple): Arguments passed to initializer function
23
- env (dict, optional): Environment variables to set in worker processes
24
"""
25
def __init__(
26
self,
27
max_workers=None,
28
job_reducers=None,
29
result_reducers=None,
30
timeout=None,
31
context=None,
32
initializer=None,
33
initargs=(),
34
env=None
35
): ...
36
```
37
38
### Task Submission
39
40
Submit individual tasks for execution on worker processes.
41
42
```python { .api }
43
def submit(self, fn, *args, **kwargs):
44
"""
45
Submit a callable to be executed with given arguments.
46
47
Parameters:
48
- fn (callable): Function to execute
49
- *args: Positional arguments for the function
50
- **kwargs: Keyword arguments for the function
51
52
Returns:
53
Future: Future representing the execution of the callable
54
55
Raises:
56
RuntimeError: If executor is shutdown
57
"""
58
```
59
60
### Bulk Task Execution
61
62
Execute a function over multiple input values in parallel.
63
64
```python { .api }
65
def map(self, fn, *iterables, **kwargs):
66
"""
67
Apply function to every item of iterables in parallel.
68
69
Parameters:
70
- fn (callable): Function to apply to each item
71
- *iterables: One or more iterables to process
72
- timeout (float, optional): Maximum time to wait for results
73
- chunksize (int, optional): Size of chunks sent to worker processes
74
75
Returns:
76
Iterator: Iterator over results in same order as input
77
78
Raises:
79
TimeoutError: If timeout is reached before completion
80
"""
81
```
82
83
### Executor Shutdown
84
85
Clean shutdown of executor and worker processes.
86
87
```python { .api }
88
def shutdown(self, wait=True, kill_workers=False):
89
"""
90
Shutdown the executor and free associated resources.
91
92
Parameters:
93
- wait (bool): Whether to wait for pending tasks to complete. Default True
94
- kill_workers (bool): Whether to forcibly terminate workers. Default False
95
96
Returns:
97
None
98
"""
99
```
100
101
## Usage Examples
102
103
### Basic ProcessPoolExecutor Usage
104
105
```python
106
from loky import ProcessPoolExecutor
107
import time
108
109
def cpu_bound_task(n):
110
"""Simulate CPU-intensive work."""
111
result = sum(i * i for i in range(n))
112
return result
113
114
# Create executor with 4 workers
115
with ProcessPoolExecutor(max_workers=4) as executor:
116
# Submit individual tasks
117
future = executor.submit(cpu_bound_task, 10000)
118
result = future.result()
119
print(f"Result: {result}")
120
121
# Process multiple inputs
122
inputs = [1000, 2000, 3000, 4000, 5000]
123
results = list(executor.map(cpu_bound_task, inputs))
124
print(f"Results: {results}")
125
```
126
127
### With Initializer Function
128
129
```python
130
from loky import ProcessPoolExecutor
131
import logging
132
133
def worker_init(level):
134
"""Initialize logging in each worker process."""
135
logging.basicConfig(level=level)
136
logging.info("Worker process initialized")
137
138
def logged_task(x):
139
logging.info(f"Processing {x}")
140
return x * 2
141
142
# Executor with worker initialization
143
with ProcessPoolExecutor(
144
max_workers=2,
145
initializer=worker_init,
146
initargs=(logging.INFO,)
147
) as executor:
148
results = list(executor.map(logged_task, [1, 2, 3, 4]))
149
print(f"Results: {results}")
150
```
151
152
### Custom Environment Variables
153
154
```python
155
from loky import ProcessPoolExecutor
156
import os
157
158
def get_env_var(var_name):
159
"""Get environment variable from worker process."""
160
return os.environ.get(var_name, "Not set")
161
162
# Set custom environment in workers
163
with ProcessPoolExecutor(
164
max_workers=2,
165
env={"CUSTOM_VAR": "worker_value", "DEBUG": "1"}
166
) as executor:
167
results = list(executor.map(get_env_var, ["CUSTOM_VAR", "DEBUG"]))
168
print(f"Environment variables: {results}")
169
```
170
171
### Error Handling
172
173
```python
174
from loky import ProcessPoolExecutor, BrokenProcessPool
175
import time
176
177
def failing_task(x):
178
if x == 3:
179
raise ValueError(f"Task failed for input {x}")
180
return x * 2
181
182
try:
183
with ProcessPoolExecutor(max_workers=2) as executor:
184
futures = [executor.submit(failing_task, i) for i in range(5)]
185
186
for i, future in enumerate(futures):
187
try:
188
result = future.result(timeout=5)
189
print(f"Task {i}: {result}")
190
except ValueError as e:
191
print(f"Task {i} failed: {e}")
192
except TimeoutError:
193
print(f"Task {i} timed out")
194
195
except BrokenProcessPool as e:
196
print(f"Process pool broken: {e}")
197
```