A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration
—
Loky provides transparent integration with cloudpickle to handle serialization of functions and objects that cannot be pickled with the standard library. This enables parallel execution of interactively defined functions, lambda expressions, and complex objects.
Wraps objects that cannot be pickled with standard pickle using cloudpickle for serialization.
def wrap_non_picklable_objects(obj, keep_wrapper=True):
"""
Wrapper for non-picklable objects to use cloudpickle for serialization.
Parameters:
- obj: The object to wrap (function, class, or instance)
- keep_wrapper (bool): Whether to keep the wrapper after deserialization
Returns:
CloudpickledObjectWrapper or CloudpickledClassWrapper: Wrapped object
Note:
This wrapper tends to slow down serialization as cloudpickle is typically
slower than pickle. The proper solution is to avoid defining functions in
main scripts and implement __reduce__ methods for complex classes.
"""Configure a custom pickler for loky's inter-process communication.
def set_loky_pickler(loky_pickler=None):
"""
Set the pickler used by loky for inter-process communication.
Parameters:
- loky_pickler (optional): Custom pickler class. If None, resets to default
Returns:
None
"""
def register(type_, reduce_function):
"""
Register a reduce function for objects of the given type.
Parameters:
- type_: The type to register a reducer for
- reduce_function: Function that reduces objects of the given type
Returns:
None
"""
def dump(obj, file, reducers=None, protocol=None):
"""
Pickle an object to a file using loky's pickling system.
Parameters:
- obj: Object to pickle
- file: File-like object to write to
- reducers (dict, optional): Custom reducers to use
- protocol (int, optional): Pickle protocol version
Returns:
None
"""
def dumps(obj, reducers=None, protocol=None):
"""
Pickle an object to bytes using loky's pickling system.
Parameters:
- obj: Object to pickle
- reducers (dict, optional): Custom reducers to use
- protocol (int, optional): Pickle protocol version
Returns:
bytes: Pickled object as bytes
"""Internal wrapper classes for handling non-picklable objects.
class CloudpickledObjectWrapper:
"""Base wrapper for objects that need cloudpickle serialization."""
def __init__(self, obj, keep_wrapper=False): ...
def __reduce__(self): ...
def __getattr__(self, attr): ...
class CallableObjectWrapper(CloudpickledObjectWrapper):
"""Wrapper that preserves callable property of wrapped objects."""
def __call__(self, *args, **kwargs): ...from loky import get_reusable_executor, wrap_non_picklable_objects
# Lambda functions cannot be pickled with standard pickle
lambda_func = lambda x: x * x + 1
# Wrap the lambda for use with loky
wrapped_lambda = wrap_non_picklable_objects(lambda_func)
executor = get_reusable_executor(max_workers=2)
# Use wrapped lambda in parallel execution
results = list(executor.map(wrapped_lambda, range(10)))
print(f"Lambda results: {results}")from loky import get_reusable_executor, wrap_non_picklable_objects
# Function defined in interactive session or __main__ module
def interactive_function(x, multiplier=2):
"""This function is defined interactively and needs wrapping."""
import math
return math.pow(x * multiplier, 2)
# Wrap for parallel execution
wrapped_func = wrap_non_picklable_objects(interactive_function)
executor = get_reusable_executor(max_workers=3)
inputs = [1, 2, 3, 4, 5]
results = list(executor.map(wrapped_func, inputs))
print(f"Interactive function results: {results}")from loky import get_reusable_executor, wrap_non_picklable_objects
# Class defined in main module
class DataProcessor:
def __init__(self, scale_factor):
self.scale_factor = scale_factor
def process(self, data):
return [x * self.scale_factor for x in data]
# Wrap the class
WrappedProcessor = wrap_non_picklable_objects(DataProcessor)
def process_with_wrapped_class(data_chunk):
processor = WrappedProcessor(2.5)
return processor.process(data_chunk)
executor = get_reusable_executor(max_workers=2)
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
results = list(executor.map(process_with_wrapped_class, data_chunks))
print(f"Class processing results: {results}")Loky automatically detects and wraps certain types of non-picklable objects:
from loky import get_reusable_executor
from functools import partial
def multiply_add(x, multiplier, addend):
return x * multiplier + addend
# partial objects are automatically wrapped
partial_func = partial(multiply_add, multiplier=3, addend=10)
executor = get_reusable_executor(max_workers=2)
# No explicit wrapping needed - loky handles it automatically
results = list(executor.map(partial_func, range(5)))
print(f"Partial function results: {results}")from loky import set_loky_pickler, get_reusable_executor
import pickle
class CustomPickler(pickle.Pickler):
"""Custom pickler with special handling for certain types."""
def save_global(self, obj, name=None):
# Custom handling for global objects
print(f"Pickling global object: {obj}")
super().save_global(obj, name)
# Set custom pickler
set_loky_pickler(CustomPickler)
def test_function(x):
return x ** 2
executor = get_reusable_executor(max_workers=2)
results = list(executor.map(test_function, [1, 2, 3]))
# Reset to default pickler
set_loky_pickler(None)from loky.backend.reduction import register, dump, dumps
from loky import get_reusable_executor
import io
class CustomData:
"""Custom class that needs special serialization."""
def __init__(self, value):
self.value = value
self._computed_result = None
def compute_expensive_result(self):
# Simulate expensive computation that we don't want to serialize
if self._computed_result is None:
self._computed_result = self.value * 1000
return self._computed_result
def reduce_custom_data(obj):
"""Custom reducer that only serializes the essential data."""
return rebuild_custom_data, (obj.value,)
def rebuild_custom_data(value):
"""Rebuild CustomData from reduced form."""
return CustomData(value)
# Register the custom reducer
register(CustomData, reduce_custom_data)
def process_custom_data(data):
"""Task that processes CustomData objects."""
result = data.compute_expensive_result()
return f"Processed {data.value} -> {result}"
# Use with custom serialization
executor = get_reusable_executor(max_workers=2)
# Create objects that will use custom serialization
custom_objects = [CustomData(i) for i in range(5)]
results = list(executor.map(process_custom_data, custom_objects))
for i, result in enumerate(results):
print(f"Object {i}: {result}")from loky.backend.reduction import dump, dumps
import io
class ComplexObject:
def __init__(self, data):
self.data = data
def process(self):
return sum(self.data)
# Create test object
obj = ComplexObject([1, 2, 3, 4, 5])
# Serialize to bytes
serialized_bytes = dumps(obj)
print(f"Serialized size: {len(serialized_bytes)} bytes")
# Serialize to file
with io.BytesIO() as buffer:
dump(obj, buffer)
file_size = buffer.tell()
print(f"File serialization size: {file_size} bytes")
# Use with custom reducers
custom_reducers = {
ComplexObject: lambda obj: (ComplexObject, (obj.data,))
}
custom_serialized = dumps(obj, reducers=custom_reducers)
print(f"Custom serialized size: {len(custom_serialized)} bytes")from loky import get_reusable_executor, wrap_non_picklable_objects
def create_closure_function(base_value):
"""Create a closure that captures base_value."""
def closure_func(x):
return x + base_value # References base_value from outer scope
return closure_func
# Closures need wrapping due to captured variables
closure = create_closure_function(100)
wrapped_closure = wrap_non_picklable_objects(closure)
executor = get_reusable_executor(max_workers=2)
results = list(executor.map(wrapped_closure, range(5)))
print(f"Closure results: {results}")from loky import get_reusable_executor
def outer_function():
"""Outer function that defines a nested function."""
def nested_worker(data):
# Nested functions are automatically wrapped
return [x * 2 for x in data]
executor = get_reusable_executor(max_workers=2)
# Loky automatically detects and wraps nested functions
data_chunks = [[1, 2], [3, 4], [5, 6]]
results = list(executor.map(nested_worker, data_chunks))
return results
# Call function with nested parallel processing
results = outer_function()
print(f"Nested function results: {results}")keep_wrapper=False when wrapper is not needed after deserializationfrom loky import get_reusable_executor, wrap_non_picklable_objects
import pickle
def problematic_function(x):
# Function that might have serialization issues
return x
try:
# Attempt standard execution
executor = get_reusable_executor(max_workers=2)
results = list(executor.map(problematic_function, [1, 2, 3]))
except (pickle.PicklingError, AttributeError) as e:
print(f"Pickling failed: {e}")
# Fall back to wrapped version
wrapped_func = wrap_non_picklable_objects(problematic_function)
results = list(executor.map(wrapped_func, [1, 2, 3]))
print(f"Wrapped execution succeeded: {results}")Install with Tessl CLI
npx tessl i tessl/pypi-loky