A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration
—
The ProcessPoolExecutor class provides a robust implementation of parallel task execution using worker processes. It offers enhanced reliability, consistent spawn behavior, and better error handling compared to the standard library implementation.
Main executor class for managing a pool of worker processes with configurable parameters and robust error handling.
class ProcessPoolExecutor(Executor):
"""
Robust ProcessPoolExecutor with enhanced error handling and consistent spawn behavior.
Parameters:
- max_workers (int, optional): Maximum number of worker processes. Defaults to cpu_count()
- job_reducers (dict, optional): Custom reducers for job serialization
- result_reducers (dict, optional): Custom reducers for result serialization
- timeout (int, optional): Worker idle timeout in seconds. Default is None
- context (multiprocessing context, optional): Multiprocessing context for creating processes
- initializer (callable, optional): Function called at worker startup
- initargs (tuple): Arguments passed to initializer function
- env (dict, optional): Environment variables to set in worker processes
"""
def __init__(
self,
max_workers=None,
job_reducers=None,
result_reducers=None,
timeout=None,
context=None,
initializer=None,
initargs=(),
env=None
): ...Submit individual tasks for execution on worker processes.
def submit(self, fn, *args, **kwargs):
"""
Submit a callable to be executed with given arguments.
Parameters:
- fn (callable): Function to execute
- *args: Positional arguments for the function
- **kwargs: Keyword arguments for the function
Returns:
Future: Future representing the execution of the callable
Raises:
RuntimeError: If executor is shutdown
"""Execute a function over multiple input values in parallel.
def map(self, fn, *iterables, **kwargs):
"""
Apply function to every item of iterables in parallel.
Parameters:
- fn (callable): Function to apply to each item
- *iterables: One or more iterables to process
- timeout (float, optional): Maximum time to wait for results
- chunksize (int, optional): Size of chunks sent to worker processes
Returns:
Iterator: Iterator over results in same order as input
Raises:
TimeoutError: If timeout is reached before completion
"""Clean shutdown of executor and worker processes.
def shutdown(self, wait=True, kill_workers=False):
"""
Shutdown the executor and free associated resources.
Parameters:
- wait (bool): Whether to wait for pending tasks to complete. Default True
- kill_workers (bool): Whether to forcibly terminate workers. Default False
Returns:
None
"""from loky import ProcessPoolExecutor
import time
def cpu_bound_task(n):
"""Simulate CPU-intensive work."""
result = sum(i * i for i in range(n))
return result
# Create executor with 4 workers
with ProcessPoolExecutor(max_workers=4) as executor:
# Submit individual tasks
future = executor.submit(cpu_bound_task, 10000)
result = future.result()
print(f"Result: {result}")
# Process multiple inputs
inputs = [1000, 2000, 3000, 4000, 5000]
results = list(executor.map(cpu_bound_task, inputs))
print(f"Results: {results}")from loky import ProcessPoolExecutor
import logging
def worker_init(level):
"""Initialize logging in each worker process."""
logging.basicConfig(level=level)
logging.info("Worker process initialized")
def logged_task(x):
logging.info(f"Processing {x}")
return x * 2
# Executor with worker initialization
with ProcessPoolExecutor(
max_workers=2,
initializer=worker_init,
initargs=(logging.INFO,)
) as executor:
results = list(executor.map(logged_task, [1, 2, 3, 4]))
print(f"Results: {results}")from loky import ProcessPoolExecutor
import os
def get_env_var(var_name):
"""Get environment variable from worker process."""
return os.environ.get(var_name, "Not set")
# Set custom environment in workers
with ProcessPoolExecutor(
max_workers=2,
env={"CUSTOM_VAR": "worker_value", "DEBUG": "1"}
) as executor:
results = list(executor.map(get_env_var, ["CUSTOM_VAR", "DEBUG"]))
print(f"Environment variables: {results}")from loky import ProcessPoolExecutor, BrokenProcessPool
import time
def failing_task(x):
if x == 3:
raise ValueError(f"Task failed for input {x}")
return x * 2
try:
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(failing_task, i) for i in range(5)]
for i, future in enumerate(futures):
try:
result = future.result(timeout=5)
print(f"Task {i}: {result}")
except ValueError as e:
print(f"Task {i} failed: {e}")
except TimeoutError:
print(f"Task {i} timed out")
except BrokenProcessPool as e:
print(f"Process pool broken: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-loky