CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-loky

A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration

Pending
Overview
Eval results
Files

cloudpickle-integration.mddocs/

Cloudpickle Integration

Loky provides transparent integration with cloudpickle to handle serialization of functions and objects that cannot be pickled with the standard library. This enables parallel execution of interactively defined functions, lambda expressions, and complex objects.

Capabilities

Wrap Non-Picklable Objects

Wraps objects that cannot be pickled with standard pickle using cloudpickle for serialization.

def wrap_non_picklable_objects(obj, keep_wrapper=True):
    """
    Wrapper for non-picklable objects to use cloudpickle for serialization.
    
    Parameters:
    - obj: The object to wrap (function, class, or instance)
    - keep_wrapper (bool): Whether to keep the wrapper after deserialization
    
    Returns:
    CloudpickledObjectWrapper or CloudpickledClassWrapper: Wrapped object
    
    Note:
    This wrapper tends to slow down serialization as cloudpickle is typically
    slower than pickle. The proper solution is to avoid defining functions in
    main scripts and implement __reduce__ methods for complex classes.
    """

Set Custom Pickler

Configure a custom pickler for loky's inter-process communication.

def set_loky_pickler(loky_pickler=None):
    """
    Set the pickler used by loky for inter-process communication.
    
    Parameters:
    - loky_pickler (optional): Custom pickler class. If None, resets to default
    
    Returns:
    None
    """

def register(type_, reduce_function):
    """
    Register a reduce function for objects of the given type.
    
    Parameters:
    - type_: The type to register a reducer for
    - reduce_function: Function that reduces objects of the given type
    
    Returns:
    None
    """

def dump(obj, file, reducers=None, protocol=None):
    """
    Pickle an object to a file using loky's pickling system.
    
    Parameters:
    - obj: Object to pickle
    - file: File-like object to write to
    - reducers (dict, optional): Custom reducers to use
    - protocol (int, optional): Pickle protocol version
    
    Returns:
    None
    """

def dumps(obj, reducers=None, protocol=None):
    """
    Pickle an object to bytes using loky's pickling system.
    
    Parameters:
    - obj: Object to pickle  
    - reducers (dict, optional): Custom reducers to use
    - protocol (int, optional): Pickle protocol version
    
    Returns:
    bytes: Pickled object as bytes
    """

Wrapper Classes

Internal wrapper classes for handling non-picklable objects.

class CloudpickledObjectWrapper:
    """Base wrapper for objects that need cloudpickle serialization."""
    def __init__(self, obj, keep_wrapper=False): ...
    def __reduce__(self): ...
    def __getattr__(self, attr): ...

class CallableObjectWrapper(CloudpickledObjectWrapper):
    """Wrapper that preserves callable property of wrapped objects."""
    def __call__(self, *args, **kwargs): ...

Usage Examples

Wrapping Lambda Functions

from loky import get_reusable_executor, wrap_non_picklable_objects

# Lambda functions cannot be pickled with standard pickle
lambda_func = lambda x: x * x + 1

# Wrap the lambda for use with loky
wrapped_lambda = wrap_non_picklable_objects(lambda_func)

executor = get_reusable_executor(max_workers=2)

# Use wrapped lambda in parallel execution
results = list(executor.map(wrapped_lambda, range(10)))
print(f"Lambda results: {results}")

Wrapping Interactively Defined Functions

from loky import get_reusable_executor, wrap_non_picklable_objects

# Function defined in interactive session or __main__ module
def interactive_function(x, multiplier=2):
    """This function is defined interactively and needs wrapping."""
    import math
    return math.pow(x * multiplier, 2)

# Wrap for parallel execution
wrapped_func = wrap_non_picklable_objects(interactive_function)

executor = get_reusable_executor(max_workers=3)
inputs = [1, 2, 3, 4, 5]
results = list(executor.map(wrapped_func, inputs))
print(f"Interactive function results: {results}")

Wrapping Classes

from loky import get_reusable_executor, wrap_non_picklable_objects

# Class defined in main module
class DataProcessor:
    def __init__(self, scale_factor):
        self.scale_factor = scale_factor
    
    def process(self, data):
        return [x * self.scale_factor for x in data]

# Wrap the class
WrappedProcessor = wrap_non_picklable_objects(DataProcessor)

def process_with_wrapped_class(data_chunk):
    processor = WrappedProcessor(2.5)
    return processor.process(data_chunk)

executor = get_reusable_executor(max_workers=2)
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
results = list(executor.map(process_with_wrapped_class, data_chunks))
print(f"Class processing results: {results}")

Automatic Wrapping

Loky automatically detects and wraps certain types of non-picklable objects:

from loky import get_reusable_executor
from functools import partial

def multiply_add(x, multiplier, addend):
    return x * multiplier + addend

# partial objects are automatically wrapped
partial_func = partial(multiply_add, multiplier=3, addend=10)

executor = get_reusable_executor(max_workers=2)

# No explicit wrapping needed - loky handles it automatically
results = list(executor.map(partial_func, range(5)))
print(f"Partial function results: {results}")

Custom Pickler Configuration

from loky import set_loky_pickler, get_reusable_executor
import pickle

class CustomPickler(pickle.Pickler):
    """Custom pickler with special handling for certain types."""
    
    def save_global(self, obj, name=None):
        # Custom handling for global objects
        print(f"Pickling global object: {obj}")
        super().save_global(obj, name)

# Set custom pickler
set_loky_pickler(CustomPickler)

def test_function(x):
    return x ** 2

executor = get_reusable_executor(max_workers=2)
results = list(executor.map(test_function, [1, 2, 3]))

# Reset to default pickler
set_loky_pickler(None)

Custom Type Reduction

from loky.backend.reduction import register, dump, dumps
from loky import get_reusable_executor
import io

class CustomData:
    """Custom class that needs special serialization."""
    def __init__(self, value):
        self.value = value
        self._computed_result = None
    
    def compute_expensive_result(self):
        # Simulate expensive computation that we don't want to serialize
        if self._computed_result is None:
            self._computed_result = self.value * 1000
        return self._computed_result

def reduce_custom_data(obj):
    """Custom reducer that only serializes the essential data."""
    return rebuild_custom_data, (obj.value,)

def rebuild_custom_data(value):
    """Rebuild CustomData from reduced form."""
    return CustomData(value)

# Register the custom reducer
register(CustomData, reduce_custom_data)

def process_custom_data(data):
    """Task that processes CustomData objects."""
    result = data.compute_expensive_result()
    return f"Processed {data.value} -> {result}"

# Use with custom serialization
executor = get_reusable_executor(max_workers=2)

# Create objects that will use custom serialization
custom_objects = [CustomData(i) for i in range(5)]
results = list(executor.map(process_custom_data, custom_objects))

for i, result in enumerate(results):
    print(f"Object {i}: {result}")

Manual Pickling Operations

from loky.backend.reduction import dump, dumps
import io

class ComplexObject:
    def __init__(self, data):
        self.data = data
    
    def process(self):
        return sum(self.data)

# Create test object
obj = ComplexObject([1, 2, 3, 4, 5])

# Serialize to bytes
serialized_bytes = dumps(obj)
print(f"Serialized size: {len(serialized_bytes)} bytes")

# Serialize to file
with io.BytesIO() as buffer:
    dump(obj, buffer)
    file_size = buffer.tell()
    print(f"File serialization size: {file_size} bytes")

# Use with custom reducers
custom_reducers = {
    ComplexObject: lambda obj: (ComplexObject, (obj.data,))
}

custom_serialized = dumps(obj, reducers=custom_reducers)
print(f"Custom serialized size: {len(custom_serialized)} bytes")

Handling Closures

from loky import get_reusable_executor, wrap_non_picklable_objects

def create_closure_function(base_value):
    """Create a closure that captures base_value."""
    def closure_func(x):
        return x + base_value  # References base_value from outer scope
    return closure_func

# Closures need wrapping due to captured variables
closure = create_closure_function(100)
wrapped_closure = wrap_non_picklable_objects(closure)

executor = get_reusable_executor(max_workers=2)
results = list(executor.map(wrapped_closure, range(5)))
print(f"Closure results: {results}")

Nested Function Handling

from loky import get_reusable_executor

def outer_function():
    """Outer function that defines a nested function."""
    
    def nested_worker(data):
        # Nested functions are automatically wrapped
        return [x * 2 for x in data]
    
    executor = get_reusable_executor(max_workers=2)
    
    # Loky automatically detects and wraps nested functions
    data_chunks = [[1, 2], [3, 4], [5, 6]]
    results = list(executor.map(nested_worker, data_chunks))
    return results

# Call function with nested parallel processing
results = outer_function()
print(f"Nested function results: {results}")

Best Practices

Performance Considerations

  • Minimize Cloudpickle Usage: While convenient, cloudpickle is slower than standard pickle
  • Define Functions at Module Level: Avoid interactive definitions when possible
  • Implement reduce Methods: For custom classes, implement proper serialization

Memory Management

  • Control Wrapper Retention: Use keep_wrapper=False when wrapper is not needed after deserialization
  • Cache Wrapped Objects: Reuse wrapped objects instead of wrapping repeatedly

Error Handling

from loky import get_reusable_executor, wrap_non_picklable_objects
import pickle

def problematic_function(x):
    # Function that might have serialization issues
    return x

try:
    # Attempt standard execution
    executor = get_reusable_executor(max_workers=2)
    results = list(executor.map(problematic_function, [1, 2, 3]))
except (pickle.PicklingError, AttributeError) as e:
    print(f"Pickling failed: {e}")
    # Fall back to wrapped version
    wrapped_func = wrap_non_picklable_objects(problematic_function)
    results = list(executor.map(wrapped_func, [1, 2, 3]))
    print(f"Wrapped execution succeeded: {results}")

Install with Tessl CLI

npx tessl i tessl/pypi-loky

docs

backend-context.md

cloudpickle-integration.md

error-handling.md

index.md

process-pool-executor.md

reusable-executor.md

tile.json