CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-loky

A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Loky provides comprehensive error handling with specialized exception classes for different failure modes in parallel processing. These exceptions help identify and handle specific error conditions that can occur during parallel execution.

Capabilities

Core Exception Classes

Exception classes for handling various error conditions in parallel processing.

class BrokenProcessPool(Exception):
    """
    Raised when the process pool is in a broken state and cannot execute tasks.
    
    This exception indicates that the executor has encountered a fatal error
    and cannot continue processing tasks. The executor should be shutdown
    and recreated.
    """

class TerminatedWorkerError(BrokenProcessPool):
    """
    Raised when a worker process terminates unexpectedly.
    
    This is a subclass of BrokenProcessPool that specifically indicates
    worker process failure. The executor may be able to recover by 
    restarting workers.
    """

class ShutdownExecutorError(RuntimeError):
    """
    Raised when attempting to use an executor that has been shutdown.
    
    This exception occurs when trying to submit tasks to an executor
    that has already been shutdown via the shutdown() method.
    """

Standard Exceptions

Re-exported exceptions from concurrent.futures for convenience.

# Re-exported from concurrent.futures
CancelledError = concurrent.futures.CancelledError
TimeoutError = concurrent.futures.TimeoutError

Usage Examples

Handling Broken Process Pool

from loky import get_reusable_executor, BrokenProcessPool
import os
import signal

def problematic_task(x):
    """Task that might crash the worker process."""
    if x == 3:
        # Simulate a worker crash
        os._exit(1)  # Force process termination
    return x * 2

try:
    executor = get_reusable_executor(max_workers=2)
    
    # Submit tasks that include a problematic one
    futures = [executor.submit(problematic_task, i) for i in range(5)]
    
    results = []
    for i, future in enumerate(futures):
        try:
            result = future.result(timeout=5)
            results.append(result)
            print(f"Task {i}: {result}")
        except Exception as e:
            print(f"Task {i} failed: {e}")
            
except BrokenProcessPool as e:
    print(f"Process pool broken: {e}")
    print("Creating new executor...")
    
    # Create new executor after broken pool
    executor = get_reusable_executor(max_workers=2, kill_workers=True)
    print("New executor created successfully")

Handling Terminated Workers

from loky import ProcessPoolExecutor, TerminatedWorkerError
import time

def memory_intensive_task(size):
    """Task that might cause worker termination due to resource limits."""
    try:
        # Allocate large amount of memory
        data = [0] * (size * 1000000)  # size in millions of integers
        return sum(data[:1000])  # Return small result
    except MemoryError:
        raise MemoryError(f"Cannot allocate {size}M integers")

def handle_worker_termination():
    """Demonstrate handling of terminated worker errors."""
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Submit tasks with increasing memory requirements
        sizes = [1, 10, 100, 1000, 10000]  # Progressively larger
        
        for size in sizes:
            try:
                future = executor.submit(memory_intensive_task, size)
                result = future.result(timeout=10)
                print(f"Size {size}M: Success ({result})")
                
            except TerminatedWorkerError as e:
                print(f"Size {size}M: Worker terminated ({e})")
                # Executor may recover automatically
                
            except MemoryError as e:
                print(f"Size {size}M: Memory error ({e})")
                
            except Exception as e:
                print(f"Size {size}M: Other error ({e})")

handle_worker_termination()

Handling Shutdown Errors

from loky import ProcessPoolExecutor, ShutdownExecutorError

def task(x):
    return x * 2

# Demonstrate shutdown error handling
executor = ProcessPoolExecutor(max_workers=2)

# Submit and process some tasks
future1 = executor.submit(task, 5)
result1 = future1.result()
print(f"Before shutdown: {result1}")

# Shutdown the executor
executor.shutdown(wait=True)

# Attempt to use shutdown executor
try:
    future2 = executor.submit(task, 10)
    result2 = future2.result()
except ShutdownExecutorError as e:
    print(f"Cannot use shutdown executor: {e}")
    
    # Create new executor for continued processing
    new_executor = ProcessPoolExecutor(max_workers=2)
    future3 = new_executor.submit(task, 10)
    result3 = future3.result()
    print(f"With new executor: {result3}")
    new_executor.shutdown()

Timeout Handling

from loky import get_reusable_executor, TimeoutError
import time

def slow_task(duration):
    """Task that takes specified duration to complete."""
    time.sleep(duration)
    return f"Completed after {duration} seconds"

def handle_timeouts():
    """Demonstrate timeout error handling."""
    executor = get_reusable_executor(max_workers=2)
    
    tasks = [
        (1, 3),   # 1 second task, 3 second timeout - should succeed
        (5, 2),   # 5 second task, 2 second timeout - should timeout
        (2, 4),   # 2 second task, 4 second timeout - should succeed
    ]
    
    for duration, timeout in tasks:
        try:
            future = executor.submit(slow_task, duration)
            result = future.result(timeout=timeout)
            print(f"Task ({duration}s, timeout {timeout}s): {result}")
            
        except TimeoutError:
            print(f"Task ({duration}s, timeout {timeout}s): Timed out")
            # Task continues running in background
            
        except Exception as e:
            print(f"Task ({duration}s, timeout {timeout}s): Error - {e}")

handle_timeouts()

Cancellation Handling

from loky import get_reusable_executor, CancelledError
import time

def cancellable_task(task_id, duration):
    """Task that can be cancelled before completion."""
    print(f"Task {task_id} starting (duration: {duration}s)")
    time.sleep(duration)
    print(f"Task {task_id} completed")
    return f"Task {task_id} result"

def handle_cancellation():
    """Demonstrate task cancellation and error handling."""
    executor = get_reusable_executor(max_workers=2)
    
    # Submit multiple tasks
    futures = []
    for i in range(4):
        future = executor.submit(cancellable_task, i, 3)
        futures.append(future)
    
    # Cancel some tasks after a short delay
    time.sleep(0.5)
    cancelled_count = 0
    
    for i, future in enumerate(futures):
        if i % 2 == 1:  # Cancel odd-numbered tasks
            if future.cancel():
                print(f"Successfully cancelled task {i}")
                cancelled_count += 1
            else:
                print(f"Could not cancel task {i} (already running)")
    
    # Collect results
    for i, future in enumerate(futures):
        try:
            result = future.result(timeout=5)
            print(f"Task {i}: {result}")
            
        except CancelledError:
            print(f"Task {i}: Was cancelled")
            
        except TimeoutError:
            print(f"Task {i}: Timed out")
            
        except Exception as e:
            print(f"Task {i}: Error - {e}")
    
    print(f"Cancelled {cancelled_count} tasks")

handle_cancellation()

Comprehensive Error Recovery

from loky import get_reusable_executor
from loky import (BrokenProcessPool, TerminatedWorkerError, 
                  ShutdownExecutorError, TimeoutError, CancelledError)
import random
import time

def unreliable_task(task_id):
    """Task that randomly fails in different ways."""
    failure_type = random.choice(['success', 'error', 'crash', 'slow'])
    
    if failure_type == 'success':
        return f"Task {task_id}: Success"
    elif failure_type == 'error':
        raise ValueError(f"Task {task_id}: Intentional error")
    elif failure_type == 'crash':
        import os
        os._exit(1)  # Simulate process crash
    elif failure_type == 'slow':
        time.sleep(10)  # Very slow task
        return f"Task {task_id}: Slow success"

def robust_task_execution(task_ids, max_retries=2):
    """Execute tasks with comprehensive error handling and retries."""
    executor = None
    results = {}
    
    for task_id in task_ids:
        retries = 0
        success = False
        
        while not success and retries <= max_retries:
            try:
                # Ensure we have a working executor
                if executor is None:
                    executor = get_reusable_executor(max_workers=2)
                
                # Submit task with timeout
                future = executor.submit(unreliable_task, task_id)
                result = future.result(timeout=3)
                
                results[task_id] = result
                success = True
                print(f"✓ {result}")
                
            except (BrokenProcessPool, TerminatedWorkerError) as e:
                print(f"✗ Task {task_id}: Pool broken ({e})")
                executor = None  # Force new executor creation
                retries += 1
                
            except TimeoutError:
                print(f"✗ Task {task_id}: Timeout (retry {retries + 1})")
                retries += 1
                
            except ValueError as e:
                print(f"✗ Task {task_id}: Application error ({e})")
                results[task_id] = f"Error: {e}"
                success = True  # Don't retry application errors
                
            except Exception as e:
                print(f"✗ Task {task_id}: Unexpected error ({e})")
                retries += 1
        
        if not success:
            results[task_id] = f"Failed after {max_retries} retries"
            print(f"✗ Task {task_id}: Gave up after {max_retries} retries")
    
    # Clean up
    if executor:
        executor.shutdown(wait=False)
    
    return results

# Execute unreliable tasks with error recovery
task_ids = list(range(10))
random.seed(42)  # For reproducible results
results = robust_task_execution(task_ids)

print("\nFinal Results:")
for task_id, result in results.items():
    print(f"Task {task_id}: {result}")

Best Practices

Error Detection

  • Monitor for BrokenProcessPool to detect fatal executor errors
  • Use TerminatedWorkerError to identify worker process failures
  • Check for ShutdownExecutorError when reusing executor references

Recovery Strategies

  • Recreate executors after BrokenProcessPool exceptions
  • Implement retry logic for transient failures
  • Use timeouts to prevent hanging tasks

Resource Management

  • Always shutdown executors in finally blocks or use context managers
  • Monitor system resources to prevent worker termination

Install with Tessl CLI

npx tessl i tessl/pypi-loky

docs

backend-context.md

cloudpickle-integration.md

error-handling.md

index.md

process-pool-executor.md

reusable-executor.md

tile.json