A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration
—
Loky provides comprehensive error handling with specialized exception classes for different failure modes in parallel processing. These exceptions help identify and handle specific error conditions that can occur during parallel execution.
Exception classes for handling various error conditions in parallel processing.
class BrokenProcessPool(Exception):
"""
Raised when the process pool is in a broken state and cannot execute tasks.
This exception indicates that the executor has encountered a fatal error
and cannot continue processing tasks. The executor should be shutdown
and recreated.
"""
class TerminatedWorkerError(BrokenProcessPool):
"""
Raised when a worker process terminates unexpectedly.
This is a subclass of BrokenProcessPool that specifically indicates
worker process failure. The executor may be able to recover by
restarting workers.
"""
class ShutdownExecutorError(RuntimeError):
"""
Raised when attempting to use an executor that has been shutdown.
This exception occurs when trying to submit tasks to an executor
that has already been shutdown via the shutdown() method.
"""Re-exported exceptions from concurrent.futures for convenience.
# Re-exported from concurrent.futures
CancelledError = concurrent.futures.CancelledError
TimeoutError = concurrent.futures.TimeoutErrorfrom loky import get_reusable_executor, BrokenProcessPool
import os
import signal
def problematic_task(x):
"""Task that might crash the worker process."""
if x == 3:
# Simulate a worker crash
os._exit(1) # Force process termination
return x * 2
try:
executor = get_reusable_executor(max_workers=2)
# Submit tasks that include a problematic one
futures = [executor.submit(problematic_task, i) for i in range(5)]
results = []
for i, future in enumerate(futures):
try:
result = future.result(timeout=5)
results.append(result)
print(f"Task {i}: {result}")
except Exception as e:
print(f"Task {i} failed: {e}")
except BrokenProcessPool as e:
print(f"Process pool broken: {e}")
print("Creating new executor...")
# Create new executor after broken pool
executor = get_reusable_executor(max_workers=2, kill_workers=True)
print("New executor created successfully")from loky import ProcessPoolExecutor, TerminatedWorkerError
import time
def memory_intensive_task(size):
"""Task that might cause worker termination due to resource limits."""
try:
# Allocate large amount of memory
data = [0] * (size * 1000000) # size in millions of integers
return sum(data[:1000]) # Return small result
except MemoryError:
raise MemoryError(f"Cannot allocate {size}M integers")
def handle_worker_termination():
"""Demonstrate handling of terminated worker errors."""
with ProcessPoolExecutor(max_workers=2) as executor:
# Submit tasks with increasing memory requirements
sizes = [1, 10, 100, 1000, 10000] # Progressively larger
for size in sizes:
try:
future = executor.submit(memory_intensive_task, size)
result = future.result(timeout=10)
print(f"Size {size}M: Success ({result})")
except TerminatedWorkerError as e:
print(f"Size {size}M: Worker terminated ({e})")
# Executor may recover automatically
except MemoryError as e:
print(f"Size {size}M: Memory error ({e})")
except Exception as e:
print(f"Size {size}M: Other error ({e})")
handle_worker_termination()from loky import ProcessPoolExecutor, ShutdownExecutorError
def task(x):
return x * 2
# Demonstrate shutdown error handling
executor = ProcessPoolExecutor(max_workers=2)
# Submit and process some tasks
future1 = executor.submit(task, 5)
result1 = future1.result()
print(f"Before shutdown: {result1}")
# Shutdown the executor
executor.shutdown(wait=True)
# Attempt to use shutdown executor
try:
future2 = executor.submit(task, 10)
result2 = future2.result()
except ShutdownExecutorError as e:
print(f"Cannot use shutdown executor: {e}")
# Create new executor for continued processing
new_executor = ProcessPoolExecutor(max_workers=2)
future3 = new_executor.submit(task, 10)
result3 = future3.result()
print(f"With new executor: {result3}")
new_executor.shutdown()from loky import get_reusable_executor, TimeoutError
import time
def slow_task(duration):
"""Task that takes specified duration to complete."""
time.sleep(duration)
return f"Completed after {duration} seconds"
def handle_timeouts():
"""Demonstrate timeout error handling."""
executor = get_reusable_executor(max_workers=2)
tasks = [
(1, 3), # 1 second task, 3 second timeout - should succeed
(5, 2), # 5 second task, 2 second timeout - should timeout
(2, 4), # 2 second task, 4 second timeout - should succeed
]
for duration, timeout in tasks:
try:
future = executor.submit(slow_task, duration)
result = future.result(timeout=timeout)
print(f"Task ({duration}s, timeout {timeout}s): {result}")
except TimeoutError:
print(f"Task ({duration}s, timeout {timeout}s): Timed out")
# Task continues running in background
except Exception as e:
print(f"Task ({duration}s, timeout {timeout}s): Error - {e}")
handle_timeouts()from loky import get_reusable_executor, CancelledError
import time
def cancellable_task(task_id, duration):
"""Task that can be cancelled before completion."""
print(f"Task {task_id} starting (duration: {duration}s)")
time.sleep(duration)
print(f"Task {task_id} completed")
return f"Task {task_id} result"
def handle_cancellation():
"""Demonstrate task cancellation and error handling."""
executor = get_reusable_executor(max_workers=2)
# Submit multiple tasks
futures = []
for i in range(4):
future = executor.submit(cancellable_task, i, 3)
futures.append(future)
# Cancel some tasks after a short delay
time.sleep(0.5)
cancelled_count = 0
for i, future in enumerate(futures):
if i % 2 == 1: # Cancel odd-numbered tasks
if future.cancel():
print(f"Successfully cancelled task {i}")
cancelled_count += 1
else:
print(f"Could not cancel task {i} (already running)")
# Collect results
for i, future in enumerate(futures):
try:
result = future.result(timeout=5)
print(f"Task {i}: {result}")
except CancelledError:
print(f"Task {i}: Was cancelled")
except TimeoutError:
print(f"Task {i}: Timed out")
except Exception as e:
print(f"Task {i}: Error - {e}")
print(f"Cancelled {cancelled_count} tasks")
handle_cancellation()from loky import get_reusable_executor
from loky import (BrokenProcessPool, TerminatedWorkerError,
ShutdownExecutorError, TimeoutError, CancelledError)
import random
import time
def unreliable_task(task_id):
"""Task that randomly fails in different ways."""
failure_type = random.choice(['success', 'error', 'crash', 'slow'])
if failure_type == 'success':
return f"Task {task_id}: Success"
elif failure_type == 'error':
raise ValueError(f"Task {task_id}: Intentional error")
elif failure_type == 'crash':
import os
os._exit(1) # Simulate process crash
elif failure_type == 'slow':
time.sleep(10) # Very slow task
return f"Task {task_id}: Slow success"
def robust_task_execution(task_ids, max_retries=2):
"""Execute tasks with comprehensive error handling and retries."""
executor = None
results = {}
for task_id in task_ids:
retries = 0
success = False
while not success and retries <= max_retries:
try:
# Ensure we have a working executor
if executor is None:
executor = get_reusable_executor(max_workers=2)
# Submit task with timeout
future = executor.submit(unreliable_task, task_id)
result = future.result(timeout=3)
results[task_id] = result
success = True
print(f"✓ {result}")
except (BrokenProcessPool, TerminatedWorkerError) as e:
print(f"✗ Task {task_id}: Pool broken ({e})")
executor = None # Force new executor creation
retries += 1
except TimeoutError:
print(f"✗ Task {task_id}: Timeout (retry {retries + 1})")
retries += 1
except ValueError as e:
print(f"✗ Task {task_id}: Application error ({e})")
results[task_id] = f"Error: {e}"
success = True # Don't retry application errors
except Exception as e:
print(f"✗ Task {task_id}: Unexpected error ({e})")
retries += 1
if not success:
results[task_id] = f"Failed after {max_retries} retries"
print(f"✗ Task {task_id}: Gave up after {max_retries} retries")
# Clean up
if executor:
executor.shutdown(wait=False)
return results
# Execute unreliable tasks with error recovery
task_ids = list(range(10))
random.seed(42) # For reproducible results
results = robust_task_execution(task_ids)
print("\nFinal Results:")
for task_id, result in results.items():
print(f"Task {task_id}: {result}")BrokenProcessPool to detect fatal executor errorsTerminatedWorkerError to identify worker process failuresShutdownExecutorError when reusing executor referencesBrokenProcessPool exceptionsInstall with Tessl CLI
npx tessl i tessl/pypi-loky