CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-loky

A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration

Pending
Overview
Eval results
Files

reusable-executor.mddocs/

Reusable Executor

The reusable executor system provides singleton executor management to avoid the overhead of repeatedly creating and destroying ProcessPoolExecutor instances. This is particularly useful for workflows with frequent parallel processing needs.

Capabilities

Get Reusable Executor

Returns a singleton ProcessPoolExecutor instance, creating a new one if needed or reusing an existing one.

def get_reusable_executor(
    max_workers=None,
    context=None,
    timeout=10,
    kill_workers=False,
    reuse="auto",
    job_reducers=None,
    result_reducers=None,
    initializer=None,
    initargs=(),
    env=None
):
    """
    Return the current reusable executor instance.
    
    Parameters:
    - max_workers (int, optional): Maximum number of worker processes
    - context (multiprocessing context, optional): Context for creating processes
    - timeout (int): Worker idle timeout in seconds before automatic shutdown
    - kill_workers (bool): Whether to forcibly terminate previous workers
    - reuse (str): Reuse strategy - "auto" or other values
    - job_reducers (dict, optional): Custom reducers for job serialization
    - result_reducers (dict, optional): Custom reducers for result serialization  
    - initializer (callable, optional): Function called at worker startup
    - initargs (tuple): Arguments passed to initializer function
    - env (dict, optional): Environment variables for worker processes
    
    Returns:
    ProcessPoolExecutor: Singleton executor instance
    """

Usage Examples

Basic Reusable Executor

from loky import get_reusable_executor
import time

def compute_square(x):
    """Simple computation task."""
    time.sleep(0.1)  # Simulate work
    return x * x

# Get reusable executor - will create new one if none exists
executor = get_reusable_executor(max_workers=4, timeout=2)

# First batch of tasks
results1 = list(executor.map(compute_square, range(5)))
print(f"First batch: {results1}")

# Second batch reuses same executor
results2 = list(executor.map(compute_square, range(5, 10)))
print(f"Second batch: {results2}")

# Executor will automatically shutdown after 2 seconds of inactivity

Dynamic Resizing

from loky import get_reusable_executor

def cpu_task(x):
    return sum(i * i for i in range(x * 1000))

# Start with 2 workers
executor = get_reusable_executor(max_workers=2)
results1 = list(executor.map(cpu_task, [1, 2, 3]))

# Resize to 4 workers for larger workload
executor = get_reusable_executor(max_workers=4)
results2 = list(executor.map(cpu_task, range(1, 9)))

print(f"Small batch: {results1}")
print(f"Large batch: {results2}")

Forcing Executor Restart

from loky import get_reusable_executor

def worker_state_task(x):
    # This task might modify global state in workers
    import os
    os.environ["WORKER_STATE"] = str(x)
    return os.environ.get("WORKER_STATE")

# Run tasks that modify worker state
executor = get_reusable_executor(max_workers=2)
results1 = list(executor.map(worker_state_task, [1, 2]))

# Force restart of workers to clear state
executor = get_reusable_executor(max_workers=2, kill_workers=True)
results2 = list(executor.map(worker_state_task, [3, 4]))

print(f"First run: {results1}")
print(f"After restart: {results2}")

Timeout Configuration

from loky import get_reusable_executor
import time

def quick_task(x):
    return x * 2

def setup_long_running_executor():
    # Executor with longer timeout for persistent workflows
    return get_reusable_executor(
        max_workers=4,
        timeout=60  # Workers stay alive for 60 seconds
    )

def setup_short_lived_executor():
    # Executor that shuts down quickly to free resources
    return get_reusable_executor(
        max_workers=2,
        timeout=5   # Workers shutdown after 5 seconds
    )

# For workflows with frequent but spaced-out tasks
long_executor = setup_long_running_executor()
results = list(long_executor.map(quick_task, range(10)))

time.sleep(10)  # Simulate gap between task batches

# Executor still available due to longer timeout
more_results = list(long_executor.map(quick_task, range(10, 20)))

Custom Initializer with Reusable Executor

from loky import get_reusable_executor
import logging

def setup_worker_logging(log_level):
    """Initialize logging in each worker process."""
    logging.basicConfig(
        level=log_level,
        format='%(processName)s: %(message)s'
    )
    logging.info("Worker initialized")

def logged_computation(x):
    """Task that uses logging."""
    logging.info(f"Computing square of {x}")
    result = x * x
    logging.info(f"Result: {result}")
    return result

# Reusable executor with worker initialization
executor = get_reusable_executor(
    max_workers=3,
    timeout=30,
    initializer=setup_worker_logging,
    initargs=(logging.INFO,)
)

# All tasks will run on workers with logging configured
results = list(executor.map(logged_computation, [1, 2, 3, 4, 5]))
print(f"Results with logging: {results}")

Environment Variable Management

from loky import get_reusable_executor
import os

def get_worker_env(var_name):
    """Get environment variable from worker."""
    return f"{var_name}={os.environ.get(var_name, 'UNSET')}"

# Executor with custom environment
executor = get_reusable_executor(
    max_workers=2,
    env={
        "PROCESSING_MODE": "parallel",
        "WORKER_POOL": "loky",
        "DEBUG_LEVEL": "2"
    }
)

# Check environment variables in workers
env_vars = ["PROCESSING_MODE", "WORKER_POOL", "DEBUG_LEVEL", "PATH"]
results = list(executor.map(get_worker_env, env_vars))

for result in results:
    print(result)

Benefits

Performance Advantages

  • Reduced Startup Overhead: Workers remain alive between task batches, eliminating process creation costs
  • Memory Efficiency: Imported modules and initialized state persist across task submissions
  • Dynamic Scaling: Executor automatically adjusts worker count based on configuration

Resource Management

  • Automatic Cleanup: Workers automatically shutdown after configured idle timeout
  • Memory Leak Protection: Built-in monitoring prevents worker memory accumulation
  • Graceful Shutdown: Clean termination of workers when timeout expires

Use Cases

  • Interactive Computing: Jupyter notebooks and REPL environments
  • Batch Processing: Multiple rounds of parallel computation
  • Scientific Computing: Data analysis workflows with repeated parallel operations
  • Web Applications: Background task processing with consistent resource usage

Install with Tessl CLI

npx tessl i tessl/pypi-loky

docs

backend-context.md

cloudpickle-integration.md

error-handling.md

index.md

process-pool-executor.md

reusable-executor.md

tile.json