0
# MPIRE
1
2
MPIRE (MultiProcessing Is Really Easy) is a Python multiprocessing library that provides faster execution than the standard multiprocessing package through optimized task distribution and copy-on-write shared objects. It offers intuitive map/apply functions with advanced features including worker state management, progress bars with tqdm integration, worker insights for performance monitoring, graceful exception handling, configurable timeouts, automatic task chunking, memory management through worker recycling, and support for nested pools and CPU pinning.
3
4
## Package Information
5
6
- **Package Name**: mpire
7
- **Language**: Python
8
- **Installation**: `pip install mpire`
9
10
Optional dependencies:
11
- Dashboard support: `pip install mpire[dashboard]` (requires flask)
12
- Dill serialization: `pip install mpire[dill]` (requires multiprocess)
13
14
## Core Imports
15
16
```python
17
from mpire import WorkerPool, cpu_count
18
```
19
20
## Basic Usage
21
22
```python
23
from mpire import WorkerPool
24
import time
25
26
def time_consuming_function(x):
27
time.sleep(0.1) # Simulate work
28
return x * x
29
30
# Simple parallel map
31
with WorkerPool(n_jobs=4) as pool:
32
results = pool.map(time_consuming_function, range(10))
33
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
34
35
# With progress bar
36
with WorkerPool(n_jobs=4) as pool:
37
results = pool.map(time_consuming_function, range(100),
38
progress_bar=True)
39
40
# With worker state
41
def init_worker(worker_state):
42
worker_state['database'] = connect_to_database()
43
44
def process_with_state(worker_state, item):
45
return worker_state['database'].query(item)
46
47
with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
48
results = pool.map(process_with_state, items,
49
worker_init=init_worker)
50
```
51
52
## Architecture
53
54
MPIRE's architecture centers around the WorkerPool class, which manages a pool of worker processes or threads. Key components include:
55
56
- **WorkerPool**: Main interface for parallel execution with configurable worker processes
57
- **Worker Classes**: AbstractWorker, SpawnWorker, ThreadingWorker for different execution contexts
58
- **Communication Layer**: WorkerComms for inter-process communication via queues and events
59
- **Result Management**: AsyncResult classes for handling asynchronous results and iterators
60
- **Insights System**: WorkerInsights for performance monitoring and profiling
61
- **Progress Tracking**: Integration with tqdm for progress bars and optional web dashboard
62
- **Exception Handling**: Graceful error propagation with highlighted tracebacks
63
64
The design supports multiple start methods (fork, spawn, forkserver, threading), automatic task chunking, worker state management, and copy-on-write shared objects for maximum performance.
65
66
## Capabilities
67
68
### WorkerPool Management
69
70
Core WorkerPool class with initialization, configuration, and lifecycle management. Provides the main interface for creating and managing parallel worker processes or threads.
71
72
```python { .api }
73
class WorkerPool:
74
def __init__(self, n_jobs: Optional[int] = None, daemon: bool = True,
75
cpu_ids: CPUList = None, shared_objects: Any = None,
76
pass_worker_id: bool = False, use_worker_state: bool = False,
77
start_method: str = DEFAULT_START_METHOD, keep_alive: bool = False,
78
use_dill: bool = False, enable_insights: bool = False,
79
order_tasks: bool = False) -> None
80
def __enter__(self) -> 'WorkerPool'
81
def __exit__(self, *_: Any) -> None
82
def stop_and_join(self, keep_alive: bool = False) -> None
83
def terminate(self) -> None
84
```
85
86
[WorkerPool Management](./workerpool-management.md)
87
88
### Parallel Map Functions
89
90
Map-style parallel execution functions including ordered and unordered variants, with iterator versions for memory-efficient processing of large datasets.
91
92
```python { .api }
93
def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
94
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
95
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
96
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
97
concatenate_numpy_output: bool = True,
98
progress_bar_options: Optional[Dict[str, Any]] = None,
99
progress_bar_style: Optional[str] = None, enable_insights: bool = False) -> Any
100
def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Any
101
def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Generator
102
def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Generator
103
```
104
105
[Parallel Map Functions](./parallel-map.md)
106
107
### Apply Functions
108
109
Apply-style parallel execution for single function calls and asynchronous operations with callback support.
110
111
```python { .api }
112
def apply(self, func: Callable, args: Any = (), kwargs: Dict = None,
113
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> Any
114
def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,
115
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> AsyncResult
116
```
117
118
[Apply Functions](./apply-functions.md)
119
120
### Worker Configuration
121
122
Advanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions.
123
124
```python { .api }
125
def pass_on_worker_id(self, pass_on: bool = True) -> None
126
def set_shared_objects(self, shared_objects: Any = None) -> None
127
def set_use_worker_state(self, use_worker_state: bool = True) -> None
128
def set_keep_alive(self, keep_alive: bool = True) -> None
129
def set_order_tasks(self, order_tasks: bool = True) -> None
130
```
131
132
[Worker Configuration](./worker-configuration.md)
133
134
### Performance Insights
135
136
Worker performance monitoring and insights for analyzing multiprocessing efficiency, including timing data and task completion statistics.
137
138
```python { .api }
139
def print_insights(self) -> None
140
def get_insights(self) -> Dict
141
def get_exit_results(self) -> List
142
```
143
144
[Performance Insights](./performance-insights.md)
145
146
### Async Results
147
148
Asynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available.
149
150
```python { .api }
151
class AsyncResult:
152
def ready(self) -> bool
153
def successful(self) -> bool
154
def get(self, timeout: Optional[float] = None) -> Any
155
def wait(self, timeout: Optional[float] = None) -> None
156
```
157
158
[Async Results](./async-results.md)
159
160
### Exception Handling
161
162
Exception classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting.
163
164
```python { .api }
165
class StopWorker(Exception): ...
166
class InterruptWorker(Exception): ...
167
class CannotPickleExceptionError(Exception): ...
168
169
def highlight_traceback(traceback_str: str) -> str
170
def remove_highlighting(traceback_str: str) -> str
171
def populate_exception(err_type: type, err_args: Any, err_state: Dict, traceback_str: str) -> Tuple[Exception, Exception]
172
```
173
174
[Exception Handling](./exception-handling.md)
175
176
### Dashboard Integration
177
178
Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization.
179
180
```python { .api }
181
def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]
182
def shutdown_dashboard() -> None
183
def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> None
184
```
185
186
[Dashboard Integration](./dashboard-integration.md)
187
188
### Utility Functions
189
190
Utility functions for task chunking, CPU affinity management, timing operations, and other helper functionality.
191
192
```python { .api }
193
def cpu_count() -> int
194
def set_cpu_affinity(pid: int, mask: List[int]) -> None
195
def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None, **kwargs) -> Generator
196
def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> str
197
```
198
199
[Utility Functions](./utility-functions.md)
200
201
## Types
202
203
```python { .api }
204
from typing import Union, List, Optional, Any, Callable, Dict, Sized, Iterable, Generator, Tuple
205
206
# Type aliases
207
CPUList = Union[int, List[int], List[List[int]]]
208
209
# Context constants
210
DEFAULT_START_METHOD: str
211
FORK_AVAILABLE: bool
212
RUNNING_WINDOWS: bool
213
RUNNING_MACOS: bool
214
```