A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration
—
Loky's backend context management provides functions for configuring multiprocessing contexts, determining system resources, and managing process creation methods. These functions offer enhanced CPU detection and cross-platform compatibility.
Enhanced CPU count function with support for physical cores and container awareness.
def cpu_count(only_physical_cores=False):
"""
Return the number of CPUs available to the current process.
This implementation is CFS-aware (Linux Control Groups) and can distinguish
between logical and physical cores.
Parameters:
- only_physical_cores (bool): If True, return only physical cores.
If False, return logical cores. Default False.
Returns:
int: Number of available CPU cores
Note:
This function respects container limits, CPU affinity settings, and
cgroup constraints on Linux systems.
"""Functions for managing multiprocessing contexts and start methods.
def get_context(method=None):
"""
Get a multiprocessing context for the specified start method.
Parameters:
- method (str, optional): Start method name ('loky', 'spawn', 'fork', 'forkserver').
Defaults to 'loky' if not specified.
Returns:
LokyContext: Multiprocessing context for the specified method
Available methods:
- 'loky': Loky's enhanced context (default)
- 'loky_init_main': Special context for main process initialization
- 'spawn': Pure spawn context
- 'fork': Fork context (POSIX only, not recommended)
- 'forkserver': Fork server context (POSIX only)
"""
def set_start_method(method, force=False):
"""
Set the method for starting worker processes.
Parameters:
- method (str): Start method to use
- force (bool): Whether to force setting even if already set. Default False.
Returns:
None
Raises:
RuntimeError: If start method is already set and force=False
"""
def get_start_method():
"""
Get the current start method.
Returns:
str: Current start method name
"""Specialized context classes for different process creation strategies.
class LokyContext:
"""Enhanced multiprocessing context with loky-specific features."""
def Process(self, *args, **kwargs): ...
def Queue(self, maxsize=0): ...
def SimpleQueue(self): ...
def Lock(self): ...
def RLock(self): ...
def Semaphore(self, value=1): ...
def BoundedSemaphore(self, value=1): ...
def Condition(self, lock=None): ...
def Event(self): ...
class LokyInitMainContext(LokyContext):
"""Context for main process initialization scenarios."""from loky import cpu_count
# Get logical CPU count (includes hyperthreading)
logical_cpus = cpu_count()
print(f"Logical CPUs: {logical_cpus}")
# Get physical CPU count (excludes hyperthreading)
physical_cpus = cpu_count(only_physical_cores=True)
print(f"Physical CPUs: {physical_cpus}")
# Use for executor sizing
from loky import get_reusable_executor
executor = get_reusable_executor(max_workers=physical_cpus)from loky import cpu_count
# In containerized environments, cpu_count respects limits
available_cpus = cpu_count()
print(f"CPUs available to container: {available_cpus}")
# This is particularly useful in Docker containers with CPU limits
# where os.cpu_count() might return the host's CPU count
import os
host_cpus = os.cpu_count()
container_cpus = cpu_count()
print(f"Host CPUs: {host_cpus}")
print(f"Container CPUs: {container_cpus}")from loky.backend import get_context
from loky.backend.context import set_start_method, get_start_method
# Set the start method globally
set_start_method('loky')
# Get current start method
current_method = get_start_method()
print(f"Current start method: {current_method}")
# Get context for specific method
loky_context = get_context('loky')
spawn_context = get_context('spawn')
print(f"Loky context: {loky_context}")
print(f"Spawn context: {spawn_context}")
# Use context to create process-related objects
queue = loky_context.Queue()
lock = loky_context.Lock()
event = loky_context.Event()from loky import ProcessPoolExecutor
from loky.backend.context import get_context
def worker_function(x):
import os
return f"Process {os.getpid()}: {x * 2}"
# Use specific context
loky_context = get_context('loky')
# Create executor with custom context
executor = ProcessPoolExecutor(
max_workers=2,
context=loky_context
)
results = list(executor.map(worker_function, range(5)))
for result in results:
print(result)
executor.shutdown()import sys
from loky.backend.context import set_start_method, get_start_method
def configure_optimal_start_method():
"""Configure the best start method for the current platform."""
if sys.platform == 'win32':
# Windows only supports spawn
method = 'spawn'
else:
# Unix-like systems can use loky (recommended)
method = 'loky'
set_start_method(method, force=True)
return method
# Configure and display start method
method = configure_optimal_start_method()
current_method = get_start_method()
print(f"Configured start method: {method}")
print(f"Current start method: {current_method}")from loky.backend.context import get_context
# Get loky context with enhanced features
ctx = get_context('loky')
# Create synchronization primitives
lock = ctx.Lock()
rlock = ctx.RLock()
semaphore = ctx.Semaphore(2)
condition = ctx.Condition()
event = ctx.Event()
# Create queues
queue = ctx.Queue(maxsize=10)
simple_queue = ctx.SimpleQueue()
print("Created loky context objects:")
print(f"Lock: {lock}")
print(f"RLock: {rlock}")
print(f"Semaphore: {semaphore}")
print(f"Queue: {queue}")
print(f"SimpleQueue: {simple_queue}")from loky import get_reusable_executor, cpu_count
import psutil
def get_optimal_worker_count():
"""Determine optimal worker count based on system resources."""
# Get CPU information
physical_cores = cpu_count(only_physical_cores=True)
logical_cores = cpu_count(only_physical_cores=False)
# Get memory information (requires psutil)
try:
memory_gb = psutil.virtual_memory().total / (1024**3)
# Rule of thumb: 1 worker per 2GB RAM, but not more than physical cores
memory_workers = int(memory_gb / 2)
optimal_workers = min(physical_cores, memory_workers)
print(f"Physical cores: {physical_cores}")
print(f"Logical cores: {logical_cores}")
print(f"Memory: {memory_gb:.1f} GB")
print(f"Memory-based workers: {memory_workers}")
print(f"Optimal workers: {optimal_workers}")
return optimal_workers
except ImportError:
# Fallback if psutil not available
return physical_cores
# Configure executor with optimal worker count
optimal_workers = get_optimal_worker_count()
executor = get_reusable_executor(max_workers=optimal_workers)from loky.backend.context import get_context
import time
def timing_test(context_method, iterations=1000):
"""Test process creation speed for different context methods."""
ctx = get_context(context_method)
start_time = time.time()
processes = []
for _ in range(iterations):
# Create but don't start processes for timing
p = ctx.Process(target=lambda: None)
processes.append(p)
creation_time = time.time() - start_time
# Clean up
for p in processes:
del p
return creation_time
# Compare different context methods (on supported platforms)
methods = ['loky', 'spawn']
if sys.platform != 'win32':
methods.extend(['fork', 'forkserver'])
print("Process creation timing comparison:")
for method in methods:
try:
elapsed = timing_test(method, 100)
print(f"{method}: {elapsed:.4f} seconds")
except Exception as e:
print(f"{method}: Not available ({e})")Install with Tessl CLI
npx tessl i tessl/pypi-loky