or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md
tile.json

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/mpire@2.10.x

To install, run

npx @tessl/cli install tessl/pypi-mpire@2.10.0

index.mddocs/

MPIRE

MPIRE (MultiProcessing Is Really Easy) is a Python multiprocessing library that provides faster execution than the standard multiprocessing package through optimized task distribution and copy-on-write shared objects. It offers intuitive map/apply functions with advanced features including worker state management, progress bars with tqdm integration, worker insights for performance monitoring, graceful exception handling, configurable timeouts, automatic task chunking, memory management through worker recycling, and support for nested pools and CPU pinning.

Package Information

  • Package Name: mpire
  • Language: Python
  • Installation: pip install mpire

Optional dependencies:

  • Dashboard support: pip install mpire[dashboard] (requires flask)
  • Dill serialization: pip install mpire[dill] (requires multiprocess)

Core Imports

from mpire import WorkerPool, cpu_count

Basic Usage

from mpire import WorkerPool
import time

def time_consuming_function(x):
    time.sleep(0.1)  # Simulate work
    return x * x

# Simple parallel map
with WorkerPool(n_jobs=4) as pool:
    results = pool.map(time_consuming_function, range(10))
    print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# With progress bar
with WorkerPool(n_jobs=4) as pool:
    results = pool.map(time_consuming_function, range(100), 
                      progress_bar=True)

# With worker state
def init_worker(worker_state):
    worker_state['database'] = connect_to_database()

def process_with_state(worker_state, item):
    return worker_state['database'].query(item)

with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
    results = pool.map(process_with_state, items, 
                      worker_init=init_worker)

Architecture

MPIRE's architecture centers around the WorkerPool class, which manages a pool of worker processes or threads. Key components include:

  • WorkerPool: Main interface for parallel execution with configurable worker processes
  • Worker Classes: AbstractWorker, SpawnWorker, ThreadingWorker for different execution contexts
  • Communication Layer: WorkerComms for inter-process communication via queues and events
  • Result Management: AsyncResult classes for handling asynchronous results and iterators
  • Insights System: WorkerInsights for performance monitoring and profiling
  • Progress Tracking: Integration with tqdm for progress bars and optional web dashboard
  • Exception Handling: Graceful error propagation with highlighted tracebacks

The design supports multiple start methods (fork, spawn, forkserver, threading), automatic task chunking, worker state management, and copy-on-write shared objects for maximum performance.

Capabilities

WorkerPool Management

Core WorkerPool class with initialization, configuration, and lifecycle management. Provides the main interface for creating and managing parallel worker processes or threads.

class WorkerPool:
    def __init__(self, n_jobs: Optional[int] = None, daemon: bool = True, 
                 cpu_ids: CPUList = None, shared_objects: Any = None, 
                 pass_worker_id: bool = False, use_worker_state: bool = False,
                 start_method: str = DEFAULT_START_METHOD, keep_alive: bool = False,
                 use_dill: bool = False, enable_insights: bool = False, 
                 order_tasks: bool = False) -> None
    def __enter__(self) -> 'WorkerPool'
    def __exit__(self, *_: Any) -> None
    def stop_and_join(self, keep_alive: bool = False) -> None
    def terminate(self) -> None

WorkerPool Management

Parallel Map Functions

Map-style parallel execution functions including ordered and unordered variants, with iterator versions for memory-efficient processing of large datasets.

def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable], 
        iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
        chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
        worker_lifespan: Optional[int] = None, progress_bar: bool = False,
        concatenate_numpy_output: bool = True,
        progress_bar_options: Optional[Dict[str, Any]] = None,
        progress_bar_style: Optional[str] = None, enable_insights: bool = False) -> Any
def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Any
def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Generator
def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Generator

Parallel Map Functions

Apply Functions

Apply-style parallel execution for single function calls and asynchronous operations with callback support.

def apply(self, func: Callable, args: Any = (), kwargs: Dict = None, 
          callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> Any
def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,
                callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> AsyncResult

Apply Functions

Worker Configuration

Advanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions.

def pass_on_worker_id(self, pass_on: bool = True) -> None
def set_shared_objects(self, shared_objects: Any = None) -> None
def set_use_worker_state(self, use_worker_state: bool = True) -> None
def set_keep_alive(self, keep_alive: bool = True) -> None
def set_order_tasks(self, order_tasks: bool = True) -> None

Worker Configuration

Performance Insights

Worker performance monitoring and insights for analyzing multiprocessing efficiency, including timing data and task completion statistics.

def print_insights(self) -> None
def get_insights(self) -> Dict
def get_exit_results(self) -> List

Performance Insights

Async Results

Asynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available.

class AsyncResult:
    def ready(self) -> bool
    def successful(self) -> bool
    def get(self, timeout: Optional[float] = None) -> Any
    def wait(self, timeout: Optional[float] = None) -> None

Async Results

Exception Handling

Exception classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting.

class StopWorker(Exception): ...
class InterruptWorker(Exception): ...
class CannotPickleExceptionError(Exception): ...

def highlight_traceback(traceback_str: str) -> str
def remove_highlighting(traceback_str: str) -> str
def populate_exception(err_type: type, err_args: Any, err_state: Dict, traceback_str: str) -> Tuple[Exception, Exception]

Exception Handling

Dashboard Integration

Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization.

def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]
def shutdown_dashboard() -> None
def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> None

Dashboard Integration

Utility Functions

Utility functions for task chunking, CPU affinity management, timing operations, and other helper functionality.

def cpu_count() -> int
def set_cpu_affinity(pid: int, mask: List[int]) -> None
def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None, **kwargs) -> Generator
def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> str

Utility Functions

Types

from typing import Union, List, Optional, Any, Callable, Dict, Sized, Iterable, Generator, Tuple

# Type aliases
CPUList = Union[int, List[int], List[List[int]]]

# Context constants
DEFAULT_START_METHOD: str
FORK_AVAILABLE: bool
RUNNING_WINDOWS: bool
RUNNING_MACOS: bool