A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Exception classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting. MPIRE provides graceful error propagation and enhanced debugging capabilities for parallel processing scenarios.
Custom exception classes for worker control and error handling.
class StopWorker(Exception):
"""Exception used to kill a worker"""
pass
class InterruptWorker(Exception):
"""Exception used to interrupt a worker"""
pass
class CannotPickleExceptionError(Exception):
"""Exception used when Pickle has trouble pickling the actual Exception"""
passStopWorker: Raise this exception to terminate a worker process immediately.
InterruptWorker: Raise this exception to interrupt a worker's current task.
CannotPickleExceptionError: Raised when an exception cannot be properly serialized for inter-process communication.
Functions for enhancing exception tracebacks and error display.
def highlight_traceback(traceback_str: str) -> str
def remove_highlighting(traceback_str: str) -> str
def populate_exception(err_type: type, err_args: Any, err_state: Dict,
traceback_str: str) -> Tuple[Exception, Exception]highlight_traceback: Add syntax highlighting to traceback strings for better readability in terminals.
remove_highlighting: Remove ANSI escape codes from highlighted traceback strings.
populate_exception: Reconstruct exception objects from serialized data with enhanced traceback information.
from mpire import WorkerPool
def risky_function(x):
if x < 0:
raise ValueError(f"Negative value not allowed: {x}")
if x == 0:
raise ZeroDivisionError("Cannot divide by zero")
return 10 / x
with WorkerPool(n_jobs=2) as pool:
test_data = [1, 2, -1, 0, 3, -2]
try:
results = pool.map(risky_function, test_data)
except Exception as e:
print(f"Map operation failed: {type(e).__name__}: {e}")
# MPIRE provides enhanced traceback information
import traceback
traceback.print_exc()from mpire import WorkerPool
from mpire.exception import StopWorker, InterruptWorker
def controlled_worker_function(x):
if x == 999:
# Terminate this worker
raise StopWorker("Worker received termination signal")
if x < 0:
# Interrupt current task but keep worker alive
raise InterruptWorker("Interrupting negative input processing")
# Normal processing
return x * 2
with WorkerPool(n_jobs=3) as pool:
test_data = [1, 2, 3, -1, 4, 999, 5, 6]
try:
results = pool.map(controlled_worker_function, test_data)
print("Results:", results)
except Exception as e:
print(f"Processing interrupted: {e}")from mpire import WorkerPool
import random
def unreliable_function(x):
# Simulate different types of failures
failure_type = random.choice(['none', 'value', 'runtime', 'custom'])
if failure_type == 'value':
raise ValueError(f"ValueError for input {x}")
elif failure_type == 'runtime':
raise RuntimeError(f"RuntimeError for input {x}")
elif failure_type == 'custom':
raise CustomError(f"CustomError for input {x}")
return x ** 2
class CustomError(Exception):
"""Custom exception for demonstration"""
pass
def error_handler(exception):
"""Handle different types of exceptions"""
error_type = type(exception).__name__
error_msg = str(exception)
print(f"Caught {error_type}: {error_msg}")
# Log or handle specific error types
if isinstance(exception, ValueError):
print(" → Handling ValueError with retry logic")
elif isinstance(exception, RuntimeError):
print(" → Handling RuntimeError with fallback")
else:
print(" → Handling unknown error type")
with WorkerPool(n_jobs=2) as pool:
# Apply async with error callback
async_results = []
for i in range(10):
result = pool.apply_async(
unreliable_function,
args=(i,),
error_callback=error_handler
)
async_results.append(result)
# Collect results
successful_results = []
for async_result in async_results:
try:
result = async_result.get()
successful_results.append(result)
except Exception:
# Error already handled by callback
pass
print(f"Successful results: {successful_results}")from mpire import WorkerPool
from mpire.exception import highlight_traceback, remove_highlighting
def nested_function_error(x):
"""Function with nested calls to show traceback enhancement"""
def level1(val):
return level2(val)
def level2(val):
return level3(val)
def level3(val):
if val == 5:
raise RuntimeError(f"Deep error at level 3 with value {val}")
return val * 10
return level1(x)
with WorkerPool(n_jobs=2) as pool:
try:
results = pool.map(nested_function_error, range(10))
except Exception as e:
print("Exception caught with enhanced traceback:")
# Get the traceback as string
import traceback
tb_str = traceback.format_exc()
# Highlight the traceback (MPIRE does this automatically)
highlighted_tb = highlight_traceback(tb_str)
print(highlighted_tb)
# Or remove highlighting for logging
clean_tb = remove_highlighting(highlighted_tb)
print("\nClean traceback for logging:")
print(clean_tb)from mpire import WorkerPool
import time
def slow_function(duration):
time.sleep(duration)
return f"Completed after {duration} seconds"
with WorkerPool(n_jobs=2) as pool:
test_data = [0.5, 1.0, 5.0, 0.1, 10.0] # Some will timeout
try:
results = pool.map(
slow_function,
test_data,
task_timeout=2.0 # 2 second timeout
)
print("All tasks completed:", results)
except TimeoutError as e:
print(f"Timeout occurred: {e}")
print("Some tasks exceeded the 2-second limit")
except Exception as e:
print(f"Other error occurred: {type(e).__name__}: {e}")from mpire import WorkerPool
class DataValidationError(Exception):
"""Custom exception for data validation"""
def __init__(self, message, data_item, validation_rule):
super().__init__(message)
self.data_item = data_item
self.validation_rule = validation_rule
def validate_and_process(data_item):
"""Function with custom validation and error handling"""
# Validation rules
if not isinstance(data_item, (int, float)):
raise DataValidationError(
f"Data must be numeric, got {type(data_item).__name__}",
data_item,
"type_check"
)
if data_item < 0:
raise DataValidationError(
f"Data must be non-negative, got {data_item}",
data_item,
"range_check"
)
if data_item > 1000:
raise DataValidationError(
f"Data must be <= 1000, got {data_item}",
data_item,
"upper_bound_check"
)
# Process valid data
return data_item ** 0.5
def custom_error_callback(exception):
"""Handle custom validation errors"""
if isinstance(exception, DataValidationError):
print(f"Validation failed: {exception}")
print(f" Data item: {exception.data_item}")
print(f" Rule: {exception.validation_rule}")
else:
print(f"Unexpected error: {type(exception).__name__}: {exception}")
# Test data with various validation issues
test_data = [4, 9, 16, -1, "invalid", 25, 1001, 36, None, 49]
with WorkerPool(n_jobs=2) as pool:
successful_results = []
for i, item in enumerate(test_data):
try:
result = pool.apply(
validate_and_process,
args=(item,),
error_callback=custom_error_callback
)
successful_results.append(result)
print(f"✓ Item {i}: {item} → {result}")
except DataValidationError as e:
print(f"✗ Item {i}: Validation failed - {e.validation_rule}")
except Exception as e:
print(f"✗ Item {i}: Unexpected error - {type(e).__name__}")
print(f"\nProcessed {len(successful_results)} items successfully")
print(f"Valid results: {successful_results}")from mpire import WorkerPool
def init_worker_with_validation(worker_state):
"""Initialize worker with validation"""
try:
# Simulate resource initialization that might fail
worker_state['resource'] = initialize_resource()
worker_state['initialized'] = True
except Exception as e:
worker_state['initialized'] = False
worker_state['init_error'] = str(e)
raise RuntimeError(f"Worker initialization failed: {e}")
def initialize_resource():
"""Simulate resource initialization"""
import random
if random.random() < 0.2: # 20% chance of failure
raise ConnectionError("Failed to connect to resource")
return "resource_handle"
def process_with_state_validation(worker_state, item):
"""Process item with state validation"""
if not worker_state.get('initialized', False):
raise RuntimeError("Worker not properly initialized")
# Use the initialized resource
resource = worker_state['resource']
return f"Processed {item} with {resource}"
with WorkerPool(n_jobs=3, use_worker_state=True) as pool:
try:
results = pool.map(
process_with_state_validation,
range(20),
worker_init=init_worker_with_validation,
chunk_size=5
)
print("All items processed successfully")
print(f"Results: {results[:5]}...") # Show first 5 results
except Exception as e:
print(f"Processing failed: {type(e).__name__}: {e}")
print("This could be due to worker initialization failure")Install with Tessl CLI
npx tessl i tessl/pypi-mpire