0
# Exception Handling
1
2
Exception classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting. MPIRE provides graceful error propagation and enhanced debugging capabilities for parallel processing scenarios.
3
4
## Capabilities
5
6
### Exception Classes
7
8
Custom exception classes for worker control and error handling.
9
10
```python { .api }
11
class StopWorker(Exception):
12
"""Exception used to kill a worker"""
13
pass
14
15
class InterruptWorker(Exception):
16
"""Exception used to interrupt a worker"""
17
pass
18
19
class CannotPickleExceptionError(Exception):
20
"""Exception used when Pickle has trouble pickling the actual Exception"""
21
pass
22
```
23
24
**StopWorker**: Raise this exception to terminate a worker process immediately.
25
26
**InterruptWorker**: Raise this exception to interrupt a worker's current task.
27
28
**CannotPickleExceptionError**: Raised when an exception cannot be properly serialized for inter-process communication.
29
30
### Traceback Utilities
31
32
Functions for enhancing exception tracebacks and error display.
33
34
```python { .api }
35
def highlight_traceback(traceback_str: str) -> str
36
def remove_highlighting(traceback_str: str) -> str
37
def populate_exception(err_type: type, err_args: Any, err_state: Dict,
38
traceback_str: str) -> Tuple[Exception, Exception]
39
```
40
41
**highlight_traceback**: Add syntax highlighting to traceback strings for better readability in terminals.
42
43
**remove_highlighting**: Remove ANSI escape codes from highlighted traceback strings.
44
45
**populate_exception**: Reconstruct exception objects from serialized data with enhanced traceback information.
46
47
## Usage Examples
48
49
### Basic Exception Handling
50
51
```python
52
from mpire import WorkerPool
53
54
def risky_function(x):
55
if x < 0:
56
raise ValueError(f"Negative value not allowed: {x}")
57
if x == 0:
58
raise ZeroDivisionError("Cannot divide by zero")
59
return 10 / x
60
61
with WorkerPool(n_jobs=2) as pool:
62
test_data = [1, 2, -1, 0, 3, -2]
63
64
try:
65
results = pool.map(risky_function, test_data)
66
except Exception as e:
67
print(f"Map operation failed: {type(e).__name__}: {e}")
68
# MPIRE provides enhanced traceback information
69
import traceback
70
traceback.print_exc()
71
```
72
73
### Worker Control Exceptions
74
75
```python
76
from mpire import WorkerPool
77
from mpire.exception import StopWorker, InterruptWorker
78
79
def controlled_worker_function(x):
80
if x == 999:
81
# Terminate this worker
82
raise StopWorker("Worker received termination signal")
83
84
if x < 0:
85
# Interrupt current task but keep worker alive
86
raise InterruptWorker("Interrupting negative input processing")
87
88
# Normal processing
89
return x * 2
90
91
with WorkerPool(n_jobs=3) as pool:
92
test_data = [1, 2, 3, -1, 4, 999, 5, 6]
93
94
try:
95
results = pool.map(controlled_worker_function, test_data)
96
print("Results:", results)
97
except Exception as e:
98
print(f"Processing interrupted: {e}")
99
```
100
101
### Exception Callbacks
102
103
```python
104
from mpire import WorkerPool
105
import random
106
107
def unreliable_function(x):
108
# Simulate different types of failures
109
failure_type = random.choice(['none', 'value', 'runtime', 'custom'])
110
111
if failure_type == 'value':
112
raise ValueError(f"ValueError for input {x}")
113
elif failure_type == 'runtime':
114
raise RuntimeError(f"RuntimeError for input {x}")
115
elif failure_type == 'custom':
116
raise CustomError(f"CustomError for input {x}")
117
118
return x ** 2
119
120
class CustomError(Exception):
121
"""Custom exception for demonstration"""
122
pass
123
124
def error_handler(exception):
125
"""Handle different types of exceptions"""
126
error_type = type(exception).__name__
127
error_msg = str(exception)
128
129
print(f"Caught {error_type}: {error_msg}")
130
131
# Log or handle specific error types
132
if isinstance(exception, ValueError):
133
print(" → Handling ValueError with retry logic")
134
elif isinstance(exception, RuntimeError):
135
print(" → Handling RuntimeError with fallback")
136
else:
137
print(" → Handling unknown error type")
138
139
with WorkerPool(n_jobs=2) as pool:
140
# Apply async with error callback
141
async_results = []
142
for i in range(10):
143
result = pool.apply_async(
144
unreliable_function,
145
args=(i,),
146
error_callback=error_handler
147
)
148
async_results.append(result)
149
150
# Collect results
151
successful_results = []
152
for async_result in async_results:
153
try:
154
result = async_result.get()
155
successful_results.append(result)
156
except Exception:
157
# Error already handled by callback
158
pass
159
160
print(f"Successful results: {successful_results}")
161
```
162
163
### Enhanced Traceback Information
164
165
```python
166
from mpire import WorkerPool
167
from mpire.exception import highlight_traceback, remove_highlighting
168
169
def nested_function_error(x):
170
"""Function with nested calls to show traceback enhancement"""
171
def level1(val):
172
return level2(val)
173
174
def level2(val):
175
return level3(val)
176
177
def level3(val):
178
if val == 5:
179
raise RuntimeError(f"Deep error at level 3 with value {val}")
180
return val * 10
181
182
return level1(x)
183
184
with WorkerPool(n_jobs=2) as pool:
185
try:
186
results = pool.map(nested_function_error, range(10))
187
except Exception as e:
188
print("Exception caught with enhanced traceback:")
189
190
# Get the traceback as string
191
import traceback
192
tb_str = traceback.format_exc()
193
194
# Highlight the traceback (MPIRE does this automatically)
195
highlighted_tb = highlight_traceback(tb_str)
196
print(highlighted_tb)
197
198
# Or remove highlighting for logging
199
clean_tb = remove_highlighting(highlighted_tb)
200
print("\nClean traceback for logging:")
201
print(clean_tb)
202
```
203
204
### Timeout Exception Handling
205
206
```python
207
from mpire import WorkerPool
208
import time
209
210
def slow_function(duration):
211
time.sleep(duration)
212
return f"Completed after {duration} seconds"
213
214
with WorkerPool(n_jobs=2) as pool:
215
test_data = [0.5, 1.0, 5.0, 0.1, 10.0] # Some will timeout
216
217
try:
218
results = pool.map(
219
slow_function,
220
test_data,
221
task_timeout=2.0 # 2 second timeout
222
)
223
print("All tasks completed:", results)
224
except TimeoutError as e:
225
print(f"Timeout occurred: {e}")
226
print("Some tasks exceeded the 2-second limit")
227
except Exception as e:
228
print(f"Other error occurred: {type(e).__name__}: {e}")
229
```
230
231
### Custom Exception Handling in Workers
232
233
```python
234
from mpire import WorkerPool
235
236
class DataValidationError(Exception):
237
"""Custom exception for data validation"""
238
def __init__(self, message, data_item, validation_rule):
239
super().__init__(message)
240
self.data_item = data_item
241
self.validation_rule = validation_rule
242
243
def validate_and_process(data_item):
244
"""Function with custom validation and error handling"""
245
246
# Validation rules
247
if not isinstance(data_item, (int, float)):
248
raise DataValidationError(
249
f"Data must be numeric, got {type(data_item).__name__}",
250
data_item,
251
"type_check"
252
)
253
254
if data_item < 0:
255
raise DataValidationError(
256
f"Data must be non-negative, got {data_item}",
257
data_item,
258
"range_check"
259
)
260
261
if data_item > 1000:
262
raise DataValidationError(
263
f"Data must be <= 1000, got {data_item}",
264
data_item,
265
"upper_bound_check"
266
)
267
268
# Process valid data
269
return data_item ** 0.5
270
271
def custom_error_callback(exception):
272
"""Handle custom validation errors"""
273
if isinstance(exception, DataValidationError):
274
print(f"Validation failed: {exception}")
275
print(f" Data item: {exception.data_item}")
276
print(f" Rule: {exception.validation_rule}")
277
else:
278
print(f"Unexpected error: {type(exception).__name__}: {exception}")
279
280
# Test data with various validation issues
281
test_data = [4, 9, 16, -1, "invalid", 25, 1001, 36, None, 49]
282
283
with WorkerPool(n_jobs=2) as pool:
284
successful_results = []
285
286
for i, item in enumerate(test_data):
287
try:
288
result = pool.apply(
289
validate_and_process,
290
args=(item,),
291
error_callback=custom_error_callback
292
)
293
successful_results.append(result)
294
print(f"✓ Item {i}: {item} → {result}")
295
except DataValidationError as e:
296
print(f"✗ Item {i}: Validation failed - {e.validation_rule}")
297
except Exception as e:
298
print(f"✗ Item {i}: Unexpected error - {type(e).__name__}")
299
300
print(f"\nProcessed {len(successful_results)} items successfully")
301
print(f"Valid results: {successful_results}")
302
```
303
304
### Exception Propagation in Worker State
305
306
```python
307
from mpire import WorkerPool
308
309
def init_worker_with_validation(worker_state):
310
"""Initialize worker with validation"""
311
try:
312
# Simulate resource initialization that might fail
313
worker_state['resource'] = initialize_resource()
314
worker_state['initialized'] = True
315
except Exception as e:
316
worker_state['initialized'] = False
317
worker_state['init_error'] = str(e)
318
raise RuntimeError(f"Worker initialization failed: {e}")
319
320
def initialize_resource():
321
"""Simulate resource initialization"""
322
import random
323
if random.random() < 0.2: # 20% chance of failure
324
raise ConnectionError("Failed to connect to resource")
325
return "resource_handle"
326
327
def process_with_state_validation(worker_state, item):
328
"""Process item with state validation"""
329
if not worker_state.get('initialized', False):
330
raise RuntimeError("Worker not properly initialized")
331
332
# Use the initialized resource
333
resource = worker_state['resource']
334
return f"Processed {item} with {resource}"
335
336
with WorkerPool(n_jobs=3, use_worker_state=True) as pool:
337
try:
338
results = pool.map(
339
process_with_state_validation,
340
range(20),
341
worker_init=init_worker_with_validation,
342
chunk_size=5
343
)
344
print("All items processed successfully")
345
print(f"Results: {results[:5]}...") # Show first 5 results
346
347
except Exception as e:
348
print(f"Processing failed: {type(e).__name__}: {e}")
349
print("This could be due to worker initialization failure")
350
```