A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration
—
The reusable executor system provides singleton executor management to avoid the overhead of repeatedly creating and destroying ProcessPoolExecutor instances. This is particularly useful for workflows with frequent parallel processing needs.
Returns a singleton ProcessPoolExecutor instance, creating a new one if needed or reusing an existing one.
def get_reusable_executor(
max_workers=None,
context=None,
timeout=10,
kill_workers=False,
reuse="auto",
job_reducers=None,
result_reducers=None,
initializer=None,
initargs=(),
env=None
):
"""
Return the current reusable executor instance.
Parameters:
- max_workers (int, optional): Maximum number of worker processes
- context (multiprocessing context, optional): Context for creating processes
- timeout (int): Worker idle timeout in seconds before automatic shutdown
- kill_workers (bool): Whether to forcibly terminate previous workers
- reuse (str): Reuse strategy - "auto" or other values
- job_reducers (dict, optional): Custom reducers for job serialization
- result_reducers (dict, optional): Custom reducers for result serialization
- initializer (callable, optional): Function called at worker startup
- initargs (tuple): Arguments passed to initializer function
- env (dict, optional): Environment variables for worker processes
Returns:
ProcessPoolExecutor: Singleton executor instance
"""from loky import get_reusable_executor
import time
def compute_square(x):
"""Simple computation task."""
time.sleep(0.1) # Simulate work
return x * x
# Get reusable executor - will create new one if none exists
executor = get_reusable_executor(max_workers=4, timeout=2)
# First batch of tasks
results1 = list(executor.map(compute_square, range(5)))
print(f"First batch: {results1}")
# Second batch reuses same executor
results2 = list(executor.map(compute_square, range(5, 10)))
print(f"Second batch: {results2}")
# Executor will automatically shutdown after 2 seconds of inactivityfrom loky import get_reusable_executor
def cpu_task(x):
return sum(i * i for i in range(x * 1000))
# Start with 2 workers
executor = get_reusable_executor(max_workers=2)
results1 = list(executor.map(cpu_task, [1, 2, 3]))
# Resize to 4 workers for larger workload
executor = get_reusable_executor(max_workers=4)
results2 = list(executor.map(cpu_task, range(1, 9)))
print(f"Small batch: {results1}")
print(f"Large batch: {results2}")from loky import get_reusable_executor
def worker_state_task(x):
# This task might modify global state in workers
import os
os.environ["WORKER_STATE"] = str(x)
return os.environ.get("WORKER_STATE")
# Run tasks that modify worker state
executor = get_reusable_executor(max_workers=2)
results1 = list(executor.map(worker_state_task, [1, 2]))
# Force restart of workers to clear state
executor = get_reusable_executor(max_workers=2, kill_workers=True)
results2 = list(executor.map(worker_state_task, [3, 4]))
print(f"First run: {results1}")
print(f"After restart: {results2}")from loky import get_reusable_executor
import time
def quick_task(x):
return x * 2
def setup_long_running_executor():
# Executor with longer timeout for persistent workflows
return get_reusable_executor(
max_workers=4,
timeout=60 # Workers stay alive for 60 seconds
)
def setup_short_lived_executor():
# Executor that shuts down quickly to free resources
return get_reusable_executor(
max_workers=2,
timeout=5 # Workers shutdown after 5 seconds
)
# For workflows with frequent but spaced-out tasks
long_executor = setup_long_running_executor()
results = list(long_executor.map(quick_task, range(10)))
time.sleep(10) # Simulate gap between task batches
# Executor still available due to longer timeout
more_results = list(long_executor.map(quick_task, range(10, 20)))from loky import get_reusable_executor
import logging
def setup_worker_logging(log_level):
"""Initialize logging in each worker process."""
logging.basicConfig(
level=log_level,
format='%(processName)s: %(message)s'
)
logging.info("Worker initialized")
def logged_computation(x):
"""Task that uses logging."""
logging.info(f"Computing square of {x}")
result = x * x
logging.info(f"Result: {result}")
return result
# Reusable executor with worker initialization
executor = get_reusable_executor(
max_workers=3,
timeout=30,
initializer=setup_worker_logging,
initargs=(logging.INFO,)
)
# All tasks will run on workers with logging configured
results = list(executor.map(logged_computation, [1, 2, 3, 4, 5]))
print(f"Results with logging: {results}")from loky import get_reusable_executor
import os
def get_worker_env(var_name):
"""Get environment variable from worker."""
return f"{var_name}={os.environ.get(var_name, 'UNSET')}"
# Executor with custom environment
executor = get_reusable_executor(
max_workers=2,
env={
"PROCESSING_MODE": "parallel",
"WORKER_POOL": "loky",
"DEBUG_LEVEL": "2"
}
)
# Check environment variables in workers
env_vars = ["PROCESSING_MODE", "WORKER_POOL", "DEBUG_LEVEL", "PATH"]
results = list(executor.map(get_worker_env, env_vars))
for result in results:
print(result)Install with Tessl CLI
npx tessl i tessl/pypi-loky