0
# Error Handling
1
2
Loky provides comprehensive error handling with specialized exception classes for different failure modes in parallel processing. These exceptions help identify and handle specific error conditions that can occur during parallel execution.
3
4
## Capabilities
5
6
### Core Exception Classes
7
8
Exception classes for handling various error conditions in parallel processing.
9
10
```python { .api }
11
class BrokenProcessPool(Exception):
12
"""
13
Raised when the process pool is in a broken state and cannot execute tasks.
14
15
This exception indicates that the executor has encountered a fatal error
16
and cannot continue processing tasks. The executor should be shutdown
17
and recreated.
18
"""
19
20
class TerminatedWorkerError(BrokenProcessPool):
21
"""
22
Raised when a worker process terminates unexpectedly.
23
24
This is a subclass of BrokenProcessPool that specifically indicates
25
worker process failure. The executor may be able to recover by
26
restarting workers.
27
"""
28
29
class ShutdownExecutorError(RuntimeError):
30
"""
31
Raised when attempting to use an executor that has been shutdown.
32
33
This exception occurs when trying to submit tasks to an executor
34
that has already been shutdown via the shutdown() method.
35
"""
36
37
```
38
39
### Standard Exceptions
40
41
Re-exported exceptions from concurrent.futures for convenience.
42
43
```python { .api }
44
# Re-exported from concurrent.futures
45
CancelledError = concurrent.futures.CancelledError
46
TimeoutError = concurrent.futures.TimeoutError
47
```
48
49
## Usage Examples
50
51
### Handling Broken Process Pool
52
53
```python
54
from loky import get_reusable_executor, BrokenProcessPool
55
import os
56
import signal
57
58
def problematic_task(x):
59
"""Task that might crash the worker process."""
60
if x == 3:
61
# Simulate a worker crash
62
os._exit(1) # Force process termination
63
return x * 2
64
65
try:
66
executor = get_reusable_executor(max_workers=2)
67
68
# Submit tasks that include a problematic one
69
futures = [executor.submit(problematic_task, i) for i in range(5)]
70
71
results = []
72
for i, future in enumerate(futures):
73
try:
74
result = future.result(timeout=5)
75
results.append(result)
76
print(f"Task {i}: {result}")
77
except Exception as e:
78
print(f"Task {i} failed: {e}")
79
80
except BrokenProcessPool as e:
81
print(f"Process pool broken: {e}")
82
print("Creating new executor...")
83
84
# Create new executor after broken pool
85
executor = get_reusable_executor(max_workers=2, kill_workers=True)
86
print("New executor created successfully")
87
```
88
89
### Handling Terminated Workers
90
91
```python
92
from loky import ProcessPoolExecutor, TerminatedWorkerError
93
import time
94
95
def memory_intensive_task(size):
96
"""Task that might cause worker termination due to resource limits."""
97
try:
98
# Allocate large amount of memory
99
data = [0] * (size * 1000000) # size in millions of integers
100
return sum(data[:1000]) # Return small result
101
except MemoryError:
102
raise MemoryError(f"Cannot allocate {size}M integers")
103
104
def handle_worker_termination():
105
"""Demonstrate handling of terminated worker errors."""
106
with ProcessPoolExecutor(max_workers=2) as executor:
107
# Submit tasks with increasing memory requirements
108
sizes = [1, 10, 100, 1000, 10000] # Progressively larger
109
110
for size in sizes:
111
try:
112
future = executor.submit(memory_intensive_task, size)
113
result = future.result(timeout=10)
114
print(f"Size {size}M: Success ({result})")
115
116
except TerminatedWorkerError as e:
117
print(f"Size {size}M: Worker terminated ({e})")
118
# Executor may recover automatically
119
120
except MemoryError as e:
121
print(f"Size {size}M: Memory error ({e})")
122
123
except Exception as e:
124
print(f"Size {size}M: Other error ({e})")
125
126
handle_worker_termination()
127
```
128
129
### Handling Shutdown Errors
130
131
```python
132
from loky import ProcessPoolExecutor, ShutdownExecutorError
133
134
def task(x):
135
return x * 2
136
137
# Demonstrate shutdown error handling
138
executor = ProcessPoolExecutor(max_workers=2)
139
140
# Submit and process some tasks
141
future1 = executor.submit(task, 5)
142
result1 = future1.result()
143
print(f"Before shutdown: {result1}")
144
145
# Shutdown the executor
146
executor.shutdown(wait=True)
147
148
# Attempt to use shutdown executor
149
try:
150
future2 = executor.submit(task, 10)
151
result2 = future2.result()
152
except ShutdownExecutorError as e:
153
print(f"Cannot use shutdown executor: {e}")
154
155
# Create new executor for continued processing
156
new_executor = ProcessPoolExecutor(max_workers=2)
157
future3 = new_executor.submit(task, 10)
158
result3 = future3.result()
159
print(f"With new executor: {result3}")
160
new_executor.shutdown()
161
```
162
163
164
### Timeout Handling
165
166
```python
167
from loky import get_reusable_executor, TimeoutError
168
import time
169
170
def slow_task(duration):
171
"""Task that takes specified duration to complete."""
172
time.sleep(duration)
173
return f"Completed after {duration} seconds"
174
175
def handle_timeouts():
176
"""Demonstrate timeout error handling."""
177
executor = get_reusable_executor(max_workers=2)
178
179
tasks = [
180
(1, 3), # 1 second task, 3 second timeout - should succeed
181
(5, 2), # 5 second task, 2 second timeout - should timeout
182
(2, 4), # 2 second task, 4 second timeout - should succeed
183
]
184
185
for duration, timeout in tasks:
186
try:
187
future = executor.submit(slow_task, duration)
188
result = future.result(timeout=timeout)
189
print(f"Task ({duration}s, timeout {timeout}s): {result}")
190
191
except TimeoutError:
192
print(f"Task ({duration}s, timeout {timeout}s): Timed out")
193
# Task continues running in background
194
195
except Exception as e:
196
print(f"Task ({duration}s, timeout {timeout}s): Error - {e}")
197
198
handle_timeouts()
199
```
200
201
### Cancellation Handling
202
203
```python
204
from loky import get_reusable_executor, CancelledError
205
import time
206
207
def cancellable_task(task_id, duration):
208
"""Task that can be cancelled before completion."""
209
print(f"Task {task_id} starting (duration: {duration}s)")
210
time.sleep(duration)
211
print(f"Task {task_id} completed")
212
return f"Task {task_id} result"
213
214
def handle_cancellation():
215
"""Demonstrate task cancellation and error handling."""
216
executor = get_reusable_executor(max_workers=2)
217
218
# Submit multiple tasks
219
futures = []
220
for i in range(4):
221
future = executor.submit(cancellable_task, i, 3)
222
futures.append(future)
223
224
# Cancel some tasks after a short delay
225
time.sleep(0.5)
226
cancelled_count = 0
227
228
for i, future in enumerate(futures):
229
if i % 2 == 1: # Cancel odd-numbered tasks
230
if future.cancel():
231
print(f"Successfully cancelled task {i}")
232
cancelled_count += 1
233
else:
234
print(f"Could not cancel task {i} (already running)")
235
236
# Collect results
237
for i, future in enumerate(futures):
238
try:
239
result = future.result(timeout=5)
240
print(f"Task {i}: {result}")
241
242
except CancelledError:
243
print(f"Task {i}: Was cancelled")
244
245
except TimeoutError:
246
print(f"Task {i}: Timed out")
247
248
except Exception as e:
249
print(f"Task {i}: Error - {e}")
250
251
print(f"Cancelled {cancelled_count} tasks")
252
253
handle_cancellation()
254
```
255
256
### Comprehensive Error Recovery
257
258
```python
259
from loky import get_reusable_executor
260
from loky import (BrokenProcessPool, TerminatedWorkerError,
261
ShutdownExecutorError, TimeoutError, CancelledError)
262
import random
263
import time
264
265
def unreliable_task(task_id):
266
"""Task that randomly fails in different ways."""
267
failure_type = random.choice(['success', 'error', 'crash', 'slow'])
268
269
if failure_type == 'success':
270
return f"Task {task_id}: Success"
271
elif failure_type == 'error':
272
raise ValueError(f"Task {task_id}: Intentional error")
273
elif failure_type == 'crash':
274
import os
275
os._exit(1) # Simulate process crash
276
elif failure_type == 'slow':
277
time.sleep(10) # Very slow task
278
return f"Task {task_id}: Slow success"
279
280
def robust_task_execution(task_ids, max_retries=2):
281
"""Execute tasks with comprehensive error handling and retries."""
282
executor = None
283
results = {}
284
285
for task_id in task_ids:
286
retries = 0
287
success = False
288
289
while not success and retries <= max_retries:
290
try:
291
# Ensure we have a working executor
292
if executor is None:
293
executor = get_reusable_executor(max_workers=2)
294
295
# Submit task with timeout
296
future = executor.submit(unreliable_task, task_id)
297
result = future.result(timeout=3)
298
299
results[task_id] = result
300
success = True
301
print(f"✓ {result}")
302
303
except (BrokenProcessPool, TerminatedWorkerError) as e:
304
print(f"✗ Task {task_id}: Pool broken ({e})")
305
executor = None # Force new executor creation
306
retries += 1
307
308
except TimeoutError:
309
print(f"✗ Task {task_id}: Timeout (retry {retries + 1})")
310
retries += 1
311
312
except ValueError as e:
313
print(f"✗ Task {task_id}: Application error ({e})")
314
results[task_id] = f"Error: {e}"
315
success = True # Don't retry application errors
316
317
except Exception as e:
318
print(f"✗ Task {task_id}: Unexpected error ({e})")
319
retries += 1
320
321
if not success:
322
results[task_id] = f"Failed after {max_retries} retries"
323
print(f"✗ Task {task_id}: Gave up after {max_retries} retries")
324
325
# Clean up
326
if executor:
327
executor.shutdown(wait=False)
328
329
return results
330
331
# Execute unreliable tasks with error recovery
332
task_ids = list(range(10))
333
random.seed(42) # For reproducible results
334
results = robust_task_execution(task_ids)
335
336
print("\nFinal Results:")
337
for task_id, result in results.items():
338
print(f"Task {task_id}: {result}")
339
```
340
341
## Best Practices
342
343
### Error Detection
344
- Monitor for `BrokenProcessPool` to detect fatal executor errors
345
- Use `TerminatedWorkerError` to identify worker process failures
346
- Check for `ShutdownExecutorError` when reusing executor references
347
348
### Recovery Strategies
349
- Recreate executors after `BrokenProcessPool` exceptions
350
- Implement retry logic for transient failures
351
- Use timeouts to prevent hanging tasks
352
353
### Resource Management
354
- Always shutdown executors in finally blocks or use context managers
355
- Monitor system resources to prevent worker termination
356