CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-loky

A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration

Pending
Overview
Eval results
Files

index.mddocs/

Loky

A robust, cross-platform and cross-version implementation of the ProcessPoolExecutor class from concurrent.futures. Loky provides reusable executors, transparent cloudpickle integration, and deadlock-free process management for parallel Python computing.

Package Information

  • Package Name: loky
  • Language: Python
  • Installation: pip install loky
  • Requirements: Python 3.9+, cloudpickle

Core Imports

import loky
from loky import get_reusable_executor, ProcessPoolExecutor

For specific functionality:

from loky import (
    ProcessPoolExecutor,
    get_reusable_executor,
    cpu_count,
    wrap_non_picklable_objects,
    set_loky_pickler,
    BrokenProcessPool,
    Future
)

Basic Usage

import os
from time import sleep
from loky import get_reusable_executor

def say_hello(k):
    pid = os.getpid()
    print(f"Hello from {pid} with arg {k}")
    sleep(.01)
    return pid

# Create an executor with 4 worker processes
# that will automatically shutdown after idling for 2s
executor = get_reusable_executor(max_workers=4, timeout=2)

# Submit a single task
res = executor.submit(say_hello, 1)
print("Got results:", res.result())

# Submit multiple tasks using map
results = executor.map(say_hello, range(10))
n_workers = len(set(results))
print("Number of used processes:", n_workers)

Architecture

Loky provides a robust parallel processing architecture built around three core components:

  • ProcessPoolExecutor: Drop-in replacement for concurrent.futures.ProcessPoolExecutor with enhanced robustness, consistent spawn behavior using fork+exec on POSIX systems, and better error handling
  • Reusable Executor: Singleton pattern executor that can be reused across consecutive calls to reduce spawning overhead, with configurable automatic shutdown after idling
  • Cloudpickle Integration: Transparent serialization for interactively defined functions and lambda expressions, with customizable pickling strategies

The library is designed for maximum reliability in parallel processing scenarios, particularly for scientific computing and data processing workflows where robust process management is critical.

Capabilities

Process Pool Executor

Core ProcessPoolExecutor implementation providing robust parallel task execution with configurable worker processes, timeout management, and enhanced error handling.

class ProcessPoolExecutor(Executor):
    def __init__(
        self,
        max_workers=None,
        job_reducers=None,
        result_reducers=None,
        timeout=None,
        context=None,
        initializer=None,
        initargs=(),
        env=None
    ): ...
    
    def submit(self, fn, *args, **kwargs): ...
    def map(self, fn, *iterables, **kwargs): ...
    def shutdown(self, wait=True, kill_workers=False): ...

Process Pool Executor

Reusable Executor Management

Singleton executor management for efficient resource usage across multiple parallel processing sessions.

def get_reusable_executor(
    max_workers=None,
    context=None,
    timeout=10,
    kill_workers=False,
    reuse="auto",
    job_reducers=None,
    result_reducers=None,
    initializer=None,
    initargs=(),
    env=None
): ...

Reusable Executor

Cloudpickle Integration

Functions for handling non-picklable objects and customizing serialization behavior.

def wrap_non_picklable_objects(obj, keep_wrapper=True): ...
def set_loky_pickler(loky_pickler=None): ...

Cloudpickle Integration

Backend Context Management

Context and system information functions for multiprocessing configuration.

def cpu_count(only_physical_cores=False): ...

Backend Context

Error Handling

Exception classes and error management for robust parallel processing.

class BrokenProcessPool(Exception): ...
class TerminatedWorkerError(BrokenProcessPool): ...
class ShutdownExecutorError(RuntimeError): ...

Error Handling

Types

class Future:
    """Enhanced Future implementation with improved callback handling."""
    def __init__(self): ...
    def result(self, timeout=None): ...
    def exception(self, timeout=None): ...
    def add_done_callback(self, fn): ...
    def cancel(self): ...
    def cancelled(self): ...
    def running(self): ...
    def done(self): ...

# Re-exported from concurrent.futures for convenience
def wait(fs, timeout=None, return_when=ALL_COMPLETED): ...
def as_completed(fs, timeout=None): ...

Executor = concurrent.futures.Executor
CancelledError = concurrent.futures.CancelledError  
TimeoutError = concurrent.futures.TimeoutError
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION

Install with Tessl CLI

npx tessl i tessl/pypi-loky

docs

backend-context.md

cloudpickle-integration.md

error-handling.md

index.md

process-pool-executor.md

reusable-executor.md

tile.json