A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Core WorkerPool class providing the main interface for creating, configuring, and managing parallel worker processes or threads. The WorkerPool acts as a replacement for multiprocessing.Pool with enhanced features and better performance.
The main class for managing a pool of worker processes or threads.
class WorkerPool:
def __init__(self, n_jobs: Optional[int] = None, daemon: bool = True,
cpu_ids: CPUList = None, shared_objects: Any = None,
pass_worker_id: bool = False, use_worker_state: bool = False,
start_method: str = DEFAULT_START_METHOD, keep_alive: bool = False,
use_dill: bool = False, enable_insights: bool = False,
order_tasks: bool = False) -> NoneParameters:
n_jobs (Optional[int]): Number of workers to spawn. If None, uses cpu_count()daemon (bool): Whether to start child processes as daemon processes. Default: Truecpu_ids (CPUList): List of CPU IDs for pinning processes to specific CPUs. None disables CPU pinningshared_objects (Any): Objects passed as shared objects to workers (copy-on-write with fork)pass_worker_id (bool): Whether to pass worker ID as first argument to functions. Default: Falseuse_worker_state (bool): Whether workers maintain state between tasks. Default: Falsestart_method (str): Process start method ('fork', 'spawn', 'forkserver', 'threading')keep_alive (bool): Keep workers alive after map calls for reuse. Default: Falseuse_dill (bool): Use dill for serialization instead of pickle. Default: Falseenable_insights (bool): Enable worker performance insights. Default: Falseorder_tasks (bool): Provide tasks to workers in order (worker 0 gets chunk 0, etc.). Default: FalseWorkerPool supports context manager protocol for automatic resource cleanup.
def __enter__(self) -> 'WorkerPool'
def __exit__(self, *_: Any) -> NoneFunctions for controlling worker lifecycle and cleanup.
def stop_and_join(self, keep_alive: bool = False) -> None
def terminate(self) -> Nonestop_and_join: Gracefully stop all workers and wait for them to finish current tasks.
keep_alive (bool): If True, keep workers alive for future reuseterminate: Immediately terminate all workers without waiting for task completion.
from mpire import WorkerPool
# Simple worker pool with default settings
with WorkerPool() as pool:
results = pool.map(lambda x: x * 2, range(10))
# Custom number of workers
with WorkerPool(n_jobs=8) as pool:
results = pool.map(some_function, data)from mpire import WorkerPool
# Worker pool with custom configuration
with WorkerPool(
n_jobs=4,
daemon=False,
cpu_ids=[0, 1, 2, 3], # Pin to specific CPUs
use_worker_state=True,
enable_insights=True,
start_method='spawn'
) as pool:
results = pool.map(
process_function,
data,
worker_init=init_function,
worker_exit=cleanup_function
)# Keep workers alive for multiple map operations
pool = WorkerPool(n_jobs=4, keep_alive=True)
# First batch
results1 = pool.map(function1, data1)
# Second batch reuses same workers
results2 = pool.map(function2, data2)
# Cleanup when done
pool.stop_and_join()# Manual lifecycle management
pool = WorkerPool(n_jobs=4)
try:
results = pool.map(some_function, data)
except Exception:
# Force termination on error
pool.terminate()
else:
# Graceful shutdown
pool.stop_and_join()Install with Tessl CLI
npx tessl i tessl/pypi-mpire