CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Exception classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting. MPIRE provides graceful error propagation and enhanced debugging capabilities for parallel processing scenarios.

Capabilities

Exception Classes

Custom exception classes for worker control and error handling.

class StopWorker(Exception):
    """Exception used to kill a worker"""
    pass

class InterruptWorker(Exception):
    """Exception used to interrupt a worker"""
    pass

class CannotPickleExceptionError(Exception):
    """Exception used when Pickle has trouble pickling the actual Exception"""
    pass

StopWorker: Raise this exception to terminate a worker process immediately.

InterruptWorker: Raise this exception to interrupt a worker's current task.

CannotPickleExceptionError: Raised when an exception cannot be properly serialized for inter-process communication.

Traceback Utilities

Functions for enhancing exception tracebacks and error display.

def highlight_traceback(traceback_str: str) -> str
def remove_highlighting(traceback_str: str) -> str
def populate_exception(err_type: type, err_args: Any, err_state: Dict, 
                      traceback_str: str) -> Tuple[Exception, Exception]

highlight_traceback: Add syntax highlighting to traceback strings for better readability in terminals.

remove_highlighting: Remove ANSI escape codes from highlighted traceback strings.

populate_exception: Reconstruct exception objects from serialized data with enhanced traceback information.

Usage Examples

Basic Exception Handling

from mpire import WorkerPool

def risky_function(x):
    if x < 0:
        raise ValueError(f"Negative value not allowed: {x}")
    if x == 0:
        raise ZeroDivisionError("Cannot divide by zero")
    return 10 / x

with WorkerPool(n_jobs=2) as pool:
    test_data = [1, 2, -1, 0, 3, -2]
    
    try:
        results = pool.map(risky_function, test_data)
    except Exception as e:
        print(f"Map operation failed: {type(e).__name__}: {e}")
        # MPIRE provides enhanced traceback information
        import traceback
        traceback.print_exc()

Worker Control Exceptions

from mpire import WorkerPool
from mpire.exception import StopWorker, InterruptWorker

def controlled_worker_function(x):
    if x == 999:
        # Terminate this worker
        raise StopWorker("Worker received termination signal")
    
    if x < 0:
        # Interrupt current task but keep worker alive
        raise InterruptWorker("Interrupting negative input processing")
    
    # Normal processing
    return x * 2

with WorkerPool(n_jobs=3) as pool:
    test_data = [1, 2, 3, -1, 4, 999, 5, 6]
    
    try:
        results = pool.map(controlled_worker_function, test_data)
        print("Results:", results)
    except Exception as e:
        print(f"Processing interrupted: {e}")

Exception Callbacks

from mpire import WorkerPool
import random

def unreliable_function(x):
    # Simulate different types of failures
    failure_type = random.choice(['none', 'value', 'runtime', 'custom'])
    
    if failure_type == 'value':
        raise ValueError(f"ValueError for input {x}")
    elif failure_type == 'runtime':
        raise RuntimeError(f"RuntimeError for input {x}")
    elif failure_type == 'custom':
        raise CustomError(f"CustomError for input {x}")
    
    return x ** 2

class CustomError(Exception):
    """Custom exception for demonstration"""
    pass

def error_handler(exception):
    """Handle different types of exceptions"""
    error_type = type(exception).__name__
    error_msg = str(exception)
    
    print(f"Caught {error_type}: {error_msg}")
    
    # Log or handle specific error types
    if isinstance(exception, ValueError):
        print("  → Handling ValueError with retry logic")
    elif isinstance(exception, RuntimeError):
        print("  → Handling RuntimeError with fallback")
    else:
        print("  → Handling unknown error type")

with WorkerPool(n_jobs=2) as pool:
    # Apply async with error callback
    async_results = []
    for i in range(10):
        result = pool.apply_async(
            unreliable_function,
            args=(i,),
            error_callback=error_handler
        )
        async_results.append(result)
    
    # Collect results
    successful_results = []
    for async_result in async_results:
        try:
            result = async_result.get()
            successful_results.append(result)
        except Exception:
            # Error already handled by callback
            pass
    
    print(f"Successful results: {successful_results}")

Enhanced Traceback Information

from mpire import WorkerPool
from mpire.exception import highlight_traceback, remove_highlighting

def nested_function_error(x):
    """Function with nested calls to show traceback enhancement"""
    def level1(val):
        return level2(val)
    
    def level2(val):
        return level3(val)
    
    def level3(val):
        if val == 5:
            raise RuntimeError(f"Deep error at level 3 with value {val}")
        return val * 10
    
    return level1(x)

with WorkerPool(n_jobs=2) as pool:
    try:
        results = pool.map(nested_function_error, range(10))
    except Exception as e:
        print("Exception caught with enhanced traceback:")
        
        # Get the traceback as string
        import traceback
        tb_str = traceback.format_exc()
        
        # Highlight the traceback (MPIRE does this automatically)
        highlighted_tb = highlight_traceback(tb_str)
        print(highlighted_tb)
        
        # Or remove highlighting for logging
        clean_tb = remove_highlighting(highlighted_tb)
        print("\nClean traceback for logging:")
        print(clean_tb)

Timeout Exception Handling

from mpire import WorkerPool
import time

def slow_function(duration):
    time.sleep(duration)
    return f"Completed after {duration} seconds"

with WorkerPool(n_jobs=2) as pool:
    test_data = [0.5, 1.0, 5.0, 0.1, 10.0]  # Some will timeout
    
    try:
        results = pool.map(
            slow_function, 
            test_data, 
            task_timeout=2.0  # 2 second timeout
        )
        print("All tasks completed:", results)
    except TimeoutError as e:
        print(f"Timeout occurred: {e}")
        print("Some tasks exceeded the 2-second limit")
    except Exception as e:
        print(f"Other error occurred: {type(e).__name__}: {e}")

Custom Exception Handling in Workers

from mpire import WorkerPool

class DataValidationError(Exception):
    """Custom exception for data validation"""
    def __init__(self, message, data_item, validation_rule):
        super().__init__(message)
        self.data_item = data_item
        self.validation_rule = validation_rule

def validate_and_process(data_item):
    """Function with custom validation and error handling"""
    
    # Validation rules
    if not isinstance(data_item, (int, float)):
        raise DataValidationError(
            f"Data must be numeric, got {type(data_item).__name__}",
            data_item,
            "type_check"
        )
    
    if data_item < 0:
        raise DataValidationError(
            f"Data must be non-negative, got {data_item}",
            data_item,
            "range_check"
        )
    
    if data_item > 1000:
        raise DataValidationError(
            f"Data must be <= 1000, got {data_item}",
            data_item,
            "upper_bound_check"
        )
    
    # Process valid data
    return data_item ** 0.5

def custom_error_callback(exception):
    """Handle custom validation errors"""
    if isinstance(exception, DataValidationError):
        print(f"Validation failed: {exception}")
        print(f"  Data item: {exception.data_item}")
        print(f"  Rule: {exception.validation_rule}")
    else:
        print(f"Unexpected error: {type(exception).__name__}: {exception}")

# Test data with various validation issues
test_data = [4, 9, 16, -1, "invalid", 25, 1001, 36, None, 49]

with WorkerPool(n_jobs=2) as pool:
    successful_results = []
    
    for i, item in enumerate(test_data):
        try:
            result = pool.apply(
                validate_and_process,
                args=(item,),
                error_callback=custom_error_callback
            )
            successful_results.append(result)
            print(f"✓ Item {i}: {item} → {result}")
        except DataValidationError as e:
            print(f"✗ Item {i}: Validation failed - {e.validation_rule}")
        except Exception as e:
            print(f"✗ Item {i}: Unexpected error - {type(e).__name__}")
    
    print(f"\nProcessed {len(successful_results)} items successfully")
    print(f"Valid results: {successful_results}")

Exception Propagation in Worker State

from mpire import WorkerPool

def init_worker_with_validation(worker_state):
    """Initialize worker with validation"""
    try:
        # Simulate resource initialization that might fail
        worker_state['resource'] = initialize_resource()
        worker_state['initialized'] = True
    except Exception as e:
        worker_state['initialized'] = False
        worker_state['init_error'] = str(e)
        raise RuntimeError(f"Worker initialization failed: {e}")

def initialize_resource():
    """Simulate resource initialization"""
    import random
    if random.random() < 0.2:  # 20% chance of failure
        raise ConnectionError("Failed to connect to resource")
    return "resource_handle"

def process_with_state_validation(worker_state, item):
    """Process item with state validation"""
    if not worker_state.get('initialized', False):
        raise RuntimeError("Worker not properly initialized")
    
    # Use the initialized resource
    resource = worker_state['resource']
    return f"Processed {item} with {resource}"

with WorkerPool(n_jobs=3, use_worker_state=True) as pool:
    try:
        results = pool.map(
            process_with_state_validation,
            range(20),
            worker_init=init_worker_with_validation,
            chunk_size=5
        )
        print("All items processed successfully")
        print(f"Results: {results[:5]}...")  # Show first 5 results
    
    except Exception as e:
        print(f"Processing failed: {type(e).__name__}: {e}")
        print("This could be due to worker initialization failure")

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json