A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration
—
A robust, cross-platform and cross-version implementation of the ProcessPoolExecutor class from concurrent.futures. Loky provides reusable executors, transparent cloudpickle integration, and deadlock-free process management for parallel Python computing.
pip install lokyimport loky
from loky import get_reusable_executor, ProcessPoolExecutorFor specific functionality:
from loky import (
ProcessPoolExecutor,
get_reusable_executor,
cpu_count,
wrap_non_picklable_objects,
set_loky_pickler,
BrokenProcessPool,
Future
)import os
from time import sleep
from loky import get_reusable_executor
def say_hello(k):
pid = os.getpid()
print(f"Hello from {pid} with arg {k}")
sleep(.01)
return pid
# Create an executor with 4 worker processes
# that will automatically shutdown after idling for 2s
executor = get_reusable_executor(max_workers=4, timeout=2)
# Submit a single task
res = executor.submit(say_hello, 1)
print("Got results:", res.result())
# Submit multiple tasks using map
results = executor.map(say_hello, range(10))
n_workers = len(set(results))
print("Number of used processes:", n_workers)Loky provides a robust parallel processing architecture built around three core components:
concurrent.futures.ProcessPoolExecutor with enhanced robustness, consistent spawn behavior using fork+exec on POSIX systems, and better error handlingThe library is designed for maximum reliability in parallel processing scenarios, particularly for scientific computing and data processing workflows where robust process management is critical.
Core ProcessPoolExecutor implementation providing robust parallel task execution with configurable worker processes, timeout management, and enhanced error handling.
class ProcessPoolExecutor(Executor):
def __init__(
self,
max_workers=None,
job_reducers=None,
result_reducers=None,
timeout=None,
context=None,
initializer=None,
initargs=(),
env=None
): ...
def submit(self, fn, *args, **kwargs): ...
def map(self, fn, *iterables, **kwargs): ...
def shutdown(self, wait=True, kill_workers=False): ...Singleton executor management for efficient resource usage across multiple parallel processing sessions.
def get_reusable_executor(
max_workers=None,
context=None,
timeout=10,
kill_workers=False,
reuse="auto",
job_reducers=None,
result_reducers=None,
initializer=None,
initargs=(),
env=None
): ...Functions for handling non-picklable objects and customizing serialization behavior.
def wrap_non_picklable_objects(obj, keep_wrapper=True): ...
def set_loky_pickler(loky_pickler=None): ...Context and system information functions for multiprocessing configuration.
def cpu_count(only_physical_cores=False): ...Exception classes and error management for robust parallel processing.
class BrokenProcessPool(Exception): ...
class TerminatedWorkerError(BrokenProcessPool): ...
class ShutdownExecutorError(RuntimeError): ...class Future:
"""Enhanced Future implementation with improved callback handling."""
def __init__(self): ...
def result(self, timeout=None): ...
def exception(self, timeout=None): ...
def add_done_callback(self, fn): ...
def cancel(self): ...
def cancelled(self): ...
def running(self): ...
def done(self): ...
# Re-exported from concurrent.futures for convenience
def wait(fs, timeout=None, return_when=ALL_COMPLETED): ...
def as_completed(fs, timeout=None): ...
Executor = concurrent.futures.Executor
CancelledError = concurrent.futures.CancelledError
TimeoutError = concurrent.futures.TimeoutError
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTIONInstall with Tessl CLI
npx tessl i tessl/pypi-loky