A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
npx @tessl/cli install tessl/pypi-mpire@2.10.0MPIRE (MultiProcessing Is Really Easy) is a Python multiprocessing library that provides faster execution than the standard multiprocessing package through optimized task distribution and copy-on-write shared objects. It offers intuitive map/apply functions with advanced features including worker state management, progress bars with tqdm integration, worker insights for performance monitoring, graceful exception handling, configurable timeouts, automatic task chunking, memory management through worker recycling, and support for nested pools and CPU pinning.
pip install mpireOptional dependencies:
pip install mpire[dashboard] (requires flask)pip install mpire[dill] (requires multiprocess)from mpire import WorkerPool, cpu_countfrom mpire import WorkerPool
import time
def time_consuming_function(x):
time.sleep(0.1) # Simulate work
return x * x
# Simple parallel map
with WorkerPool(n_jobs=4) as pool:
results = pool.map(time_consuming_function, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# With progress bar
with WorkerPool(n_jobs=4) as pool:
results = pool.map(time_consuming_function, range(100),
progress_bar=True)
# With worker state
def init_worker(worker_state):
worker_state['database'] = connect_to_database()
def process_with_state(worker_state, item):
return worker_state['database'].query(item)
with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
results = pool.map(process_with_state, items,
worker_init=init_worker)MPIRE's architecture centers around the WorkerPool class, which manages a pool of worker processes or threads. Key components include:
The design supports multiple start methods (fork, spawn, forkserver, threading), automatic task chunking, worker state management, and copy-on-write shared objects for maximum performance.
Core WorkerPool class with initialization, configuration, and lifecycle management. Provides the main interface for creating and managing parallel worker processes or threads.
class WorkerPool:
def __init__(self, n_jobs: Optional[int] = None, daemon: bool = True,
cpu_ids: CPUList = None, shared_objects: Any = None,
pass_worker_id: bool = False, use_worker_state: bool = False,
start_method: str = DEFAULT_START_METHOD, keep_alive: bool = False,
use_dill: bool = False, enable_insights: bool = False,
order_tasks: bool = False) -> None
def __enter__(self) -> 'WorkerPool'
def __exit__(self, *_: Any) -> None
def stop_and_join(self, keep_alive: bool = False) -> None
def terminate(self) -> NoneMap-style parallel execution functions including ordered and unordered variants, with iterator versions for memory-efficient processing of large datasets.
def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
concatenate_numpy_output: bool = True,
progress_bar_options: Optional[Dict[str, Any]] = None,
progress_bar_style: Optional[str] = None, enable_insights: bool = False) -> Any
def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Any
def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Generator
def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> GeneratorApply-style parallel execution for single function calls and asynchronous operations with callback support.
def apply(self, func: Callable, args: Any = (), kwargs: Dict = None,
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> Any
def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> AsyncResultAdvanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions.
def pass_on_worker_id(self, pass_on: bool = True) -> None
def set_shared_objects(self, shared_objects: Any = None) -> None
def set_use_worker_state(self, use_worker_state: bool = True) -> None
def set_keep_alive(self, keep_alive: bool = True) -> None
def set_order_tasks(self, order_tasks: bool = True) -> NoneWorker performance monitoring and insights for analyzing multiprocessing efficiency, including timing data and task completion statistics.
def print_insights(self) -> None
def get_insights(self) -> Dict
def get_exit_results(self) -> ListAsynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available.
class AsyncResult:
def ready(self) -> bool
def successful(self) -> bool
def get(self, timeout: Optional[float] = None) -> Any
def wait(self, timeout: Optional[float] = None) -> NoneException classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting.
class StopWorker(Exception): ...
class InterruptWorker(Exception): ...
class CannotPickleExceptionError(Exception): ...
def highlight_traceback(traceback_str: str) -> str
def remove_highlighting(traceback_str: str) -> str
def populate_exception(err_type: type, err_args: Any, err_state: Dict, traceback_str: str) -> Tuple[Exception, Exception]Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization.
def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]
def shutdown_dashboard() -> None
def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> NoneUtility functions for task chunking, CPU affinity management, timing operations, and other helper functionality.
def cpu_count() -> int
def set_cpu_affinity(pid: int, mask: List[int]) -> None
def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None, **kwargs) -> Generator
def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> strfrom typing import Union, List, Optional, Any, Callable, Dict, Sized, Iterable, Generator, Tuple
# Type aliases
CPUList = Union[int, List[int], List[List[int]]]
# Context constants
DEFAULT_START_METHOD: str
FORK_AVAILABLE: bool
RUNNING_WINDOWS: bool
RUNNING_MACOS: bool