0
# Reusable Executor
1
2
The reusable executor system provides singleton executor management to avoid the overhead of repeatedly creating and destroying ProcessPoolExecutor instances. This is particularly useful for workflows with frequent parallel processing needs.
3
4
## Capabilities
5
6
### Get Reusable Executor
7
8
Returns a singleton ProcessPoolExecutor instance, creating a new one if needed or reusing an existing one.
9
10
```python { .api }
11
def get_reusable_executor(
12
max_workers=None,
13
context=None,
14
timeout=10,
15
kill_workers=False,
16
reuse="auto",
17
job_reducers=None,
18
result_reducers=None,
19
initializer=None,
20
initargs=(),
21
env=None
22
):
23
"""
24
Return the current reusable executor instance.
25
26
Parameters:
27
- max_workers (int, optional): Maximum number of worker processes
28
- context (multiprocessing context, optional): Context for creating processes
29
- timeout (int): Worker idle timeout in seconds before automatic shutdown
30
- kill_workers (bool): Whether to forcibly terminate previous workers
31
- reuse (str): Reuse strategy - "auto" or other values
32
- job_reducers (dict, optional): Custom reducers for job serialization
33
- result_reducers (dict, optional): Custom reducers for result serialization
34
- initializer (callable, optional): Function called at worker startup
35
- initargs (tuple): Arguments passed to initializer function
36
- env (dict, optional): Environment variables for worker processes
37
38
Returns:
39
ProcessPoolExecutor: Singleton executor instance
40
"""
41
```
42
43
## Usage Examples
44
45
### Basic Reusable Executor
46
47
```python
48
from loky import get_reusable_executor
49
import time
50
51
def compute_square(x):
52
"""Simple computation task."""
53
time.sleep(0.1) # Simulate work
54
return x * x
55
56
# Get reusable executor - will create new one if none exists
57
executor = get_reusable_executor(max_workers=4, timeout=2)
58
59
# First batch of tasks
60
results1 = list(executor.map(compute_square, range(5)))
61
print(f"First batch: {results1}")
62
63
# Second batch reuses same executor
64
results2 = list(executor.map(compute_square, range(5, 10)))
65
print(f"Second batch: {results2}")
66
67
# Executor will automatically shutdown after 2 seconds of inactivity
68
```
69
70
### Dynamic Resizing
71
72
```python
73
from loky import get_reusable_executor
74
75
def cpu_task(x):
76
return sum(i * i for i in range(x * 1000))
77
78
# Start with 2 workers
79
executor = get_reusable_executor(max_workers=2)
80
results1 = list(executor.map(cpu_task, [1, 2, 3]))
81
82
# Resize to 4 workers for larger workload
83
executor = get_reusable_executor(max_workers=4)
84
results2 = list(executor.map(cpu_task, range(1, 9)))
85
86
print(f"Small batch: {results1}")
87
print(f"Large batch: {results2}")
88
```
89
90
### Forcing Executor Restart
91
92
```python
93
from loky import get_reusable_executor
94
95
def worker_state_task(x):
96
# This task might modify global state in workers
97
import os
98
os.environ["WORKER_STATE"] = str(x)
99
return os.environ.get("WORKER_STATE")
100
101
# Run tasks that modify worker state
102
executor = get_reusable_executor(max_workers=2)
103
results1 = list(executor.map(worker_state_task, [1, 2]))
104
105
# Force restart of workers to clear state
106
executor = get_reusable_executor(max_workers=2, kill_workers=True)
107
results2 = list(executor.map(worker_state_task, [3, 4]))
108
109
print(f"First run: {results1}")
110
print(f"After restart: {results2}")
111
```
112
113
### Timeout Configuration
114
115
```python
116
from loky import get_reusable_executor
117
import time
118
119
def quick_task(x):
120
return x * 2
121
122
def setup_long_running_executor():
123
# Executor with longer timeout for persistent workflows
124
return get_reusable_executor(
125
max_workers=4,
126
timeout=60 # Workers stay alive for 60 seconds
127
)
128
129
def setup_short_lived_executor():
130
# Executor that shuts down quickly to free resources
131
return get_reusable_executor(
132
max_workers=2,
133
timeout=5 # Workers shutdown after 5 seconds
134
)
135
136
# For workflows with frequent but spaced-out tasks
137
long_executor = setup_long_running_executor()
138
results = list(long_executor.map(quick_task, range(10)))
139
140
time.sleep(10) # Simulate gap between task batches
141
142
# Executor still available due to longer timeout
143
more_results = list(long_executor.map(quick_task, range(10, 20)))
144
```
145
146
### Custom Initializer with Reusable Executor
147
148
```python
149
from loky import get_reusable_executor
150
import logging
151
152
def setup_worker_logging(log_level):
153
"""Initialize logging in each worker process."""
154
logging.basicConfig(
155
level=log_level,
156
format='%(processName)s: %(message)s'
157
)
158
logging.info("Worker initialized")
159
160
def logged_computation(x):
161
"""Task that uses logging."""
162
logging.info(f"Computing square of {x}")
163
result = x * x
164
logging.info(f"Result: {result}")
165
return result
166
167
# Reusable executor with worker initialization
168
executor = get_reusable_executor(
169
max_workers=3,
170
timeout=30,
171
initializer=setup_worker_logging,
172
initargs=(logging.INFO,)
173
)
174
175
# All tasks will run on workers with logging configured
176
results = list(executor.map(logged_computation, [1, 2, 3, 4, 5]))
177
print(f"Results with logging: {results}")
178
```
179
180
### Environment Variable Management
181
182
```python
183
from loky import get_reusable_executor
184
import os
185
186
def get_worker_env(var_name):
187
"""Get environment variable from worker."""
188
return f"{var_name}={os.environ.get(var_name, 'UNSET')}"
189
190
# Executor with custom environment
191
executor = get_reusable_executor(
192
max_workers=2,
193
env={
194
"PROCESSING_MODE": "parallel",
195
"WORKER_POOL": "loky",
196
"DEBUG_LEVEL": "2"
197
}
198
)
199
200
# Check environment variables in workers
201
env_vars = ["PROCESSING_MODE", "WORKER_POOL", "DEBUG_LEVEL", "PATH"]
202
results = list(executor.map(get_worker_env, env_vars))
203
204
for result in results:
205
print(result)
206
```
207
208
## Benefits
209
210
### Performance Advantages
211
212
- **Reduced Startup Overhead**: Workers remain alive between task batches, eliminating process creation costs
213
- **Memory Efficiency**: Imported modules and initialized state persist across task submissions
214
- **Dynamic Scaling**: Executor automatically adjusts worker count based on configuration
215
216
### Resource Management
217
218
- **Automatic Cleanup**: Workers automatically shutdown after configured idle timeout
219
- **Memory Leak Protection**: Built-in monitoring prevents worker memory accumulation
220
- **Graceful Shutdown**: Clean termination of workers when timeout expires
221
222
### Use Cases
223
224
- **Interactive Computing**: Jupyter notebooks and REPL environments
225
- **Batch Processing**: Multiple rounds of parallel computation
226
- **Scientific Computing**: Data analysis workflows with repeated parallel operations
227
- **Web Applications**: Background task processing with consistent resource usage