or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/mpire@2.10.x

To install, run

npx @tessl/cli install tessl/pypi-mpire@2.10.0

0

# MPIRE

1

2

MPIRE (MultiProcessing Is Really Easy) is a Python multiprocessing library that provides faster execution than the standard multiprocessing package through optimized task distribution and copy-on-write shared objects. It offers intuitive map/apply functions with advanced features including worker state management, progress bars with tqdm integration, worker insights for performance monitoring, graceful exception handling, configurable timeouts, automatic task chunking, memory management through worker recycling, and support for nested pools and CPU pinning.

3

4

## Package Information

5

6

- **Package Name**: mpire

7

- **Language**: Python

8

- **Installation**: `pip install mpire`

9

10

Optional dependencies:

11

- Dashboard support: `pip install mpire[dashboard]` (requires flask)

12

- Dill serialization: `pip install mpire[dill]` (requires multiprocess)

13

14

## Core Imports

15

16

```python

17

from mpire import WorkerPool, cpu_count

18

```

19

20

## Basic Usage

21

22

```python

23

from mpire import WorkerPool

24

import time

25

26

def time_consuming_function(x):

27

time.sleep(0.1) # Simulate work

28

return x * x

29

30

# Simple parallel map

31

with WorkerPool(n_jobs=4) as pool:

32

results = pool.map(time_consuming_function, range(10))

33

print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

34

35

# With progress bar

36

with WorkerPool(n_jobs=4) as pool:

37

results = pool.map(time_consuming_function, range(100),

38

progress_bar=True)

39

40

# With worker state

41

def init_worker(worker_state):

42

worker_state['database'] = connect_to_database()

43

44

def process_with_state(worker_state, item):

45

return worker_state['database'].query(item)

46

47

with WorkerPool(n_jobs=4, use_worker_state=True) as pool:

48

results = pool.map(process_with_state, items,

49

worker_init=init_worker)

50

```

51

52

## Architecture

53

54

MPIRE's architecture centers around the WorkerPool class, which manages a pool of worker processes or threads. Key components include:

55

56

- **WorkerPool**: Main interface for parallel execution with configurable worker processes

57

- **Worker Classes**: AbstractWorker, SpawnWorker, ThreadingWorker for different execution contexts

58

- **Communication Layer**: WorkerComms for inter-process communication via queues and events

59

- **Result Management**: AsyncResult classes for handling asynchronous results and iterators

60

- **Insights System**: WorkerInsights for performance monitoring and profiling

61

- **Progress Tracking**: Integration with tqdm for progress bars and optional web dashboard

62

- **Exception Handling**: Graceful error propagation with highlighted tracebacks

63

64

The design supports multiple start methods (fork, spawn, forkserver, threading), automatic task chunking, worker state management, and copy-on-write shared objects for maximum performance.

65

66

## Capabilities

67

68

### WorkerPool Management

69

70

Core WorkerPool class with initialization, configuration, and lifecycle management. Provides the main interface for creating and managing parallel worker processes or threads.

71

72

```python { .api }

73

class WorkerPool:

74

def __init__(self, n_jobs: Optional[int] = None, daemon: bool = True,

75

cpu_ids: CPUList = None, shared_objects: Any = None,

76

pass_worker_id: bool = False, use_worker_state: bool = False,

77

start_method: str = DEFAULT_START_METHOD, keep_alive: bool = False,

78

use_dill: bool = False, enable_insights: bool = False,

79

order_tasks: bool = False) -> None

80

def __enter__(self) -> 'WorkerPool'

81

def __exit__(self, *_: Any) -> None

82

def stop_and_join(self, keep_alive: bool = False) -> None

83

def terminate(self) -> None

84

```

85

86

[WorkerPool Management](./workerpool-management.md)

87

88

### Parallel Map Functions

89

90

Map-style parallel execution functions including ordered and unordered variants, with iterator versions for memory-efficient processing of large datasets.

91

92

```python { .api }

93

def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable],

94

iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,

95

chunk_size: Optional[int] = None, n_splits: Optional[int] = None,

96

worker_lifespan: Optional[int] = None, progress_bar: bool = False,

97

concatenate_numpy_output: bool = True,

98

progress_bar_options: Optional[Dict[str, Any]] = None,

99

progress_bar_style: Optional[str] = None, enable_insights: bool = False) -> Any

100

def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Any

101

def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Generator

102

def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], **kwargs) -> Generator

103

```

104

105

[Parallel Map Functions](./parallel-map.md)

106

107

### Apply Functions

108

109

Apply-style parallel execution for single function calls and asynchronous operations with callback support.

110

111

```python { .api }

112

def apply(self, func: Callable, args: Any = (), kwargs: Dict = None,

113

callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> Any

114

def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,

115

callback: Optional[Callable] = None, error_callback: Optional[Callable] = None) -> AsyncResult

116

```

117

118

[Apply Functions](./apply-functions.md)

119

120

### Worker Configuration

121

122

Advanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions.

123

124

```python { .api }

125

def pass_on_worker_id(self, pass_on: bool = True) -> None

126

def set_shared_objects(self, shared_objects: Any = None) -> None

127

def set_use_worker_state(self, use_worker_state: bool = True) -> None

128

def set_keep_alive(self, keep_alive: bool = True) -> None

129

def set_order_tasks(self, order_tasks: bool = True) -> None

130

```

131

132

[Worker Configuration](./worker-configuration.md)

133

134

### Performance Insights

135

136

Worker performance monitoring and insights for analyzing multiprocessing efficiency, including timing data and task completion statistics.

137

138

```python { .api }

139

def print_insights(self) -> None

140

def get_insights(self) -> Dict

141

def get_exit_results(self) -> List

142

```

143

144

[Performance Insights](./performance-insights.md)

145

146

### Async Results

147

148

Asynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available.

149

150

```python { .api }

151

class AsyncResult:

152

def ready(self) -> bool

153

def successful(self) -> bool

154

def get(self, timeout: Optional[float] = None) -> Any

155

def wait(self, timeout: Optional[float] = None) -> None

156

```

157

158

[Async Results](./async-results.md)

159

160

### Exception Handling

161

162

Exception classes and utilities for handling errors in multiprocessing environments with enhanced traceback formatting.

163

164

```python { .api }

165

class StopWorker(Exception): ...

166

class InterruptWorker(Exception): ...

167

class CannotPickleExceptionError(Exception): ...

168

169

def highlight_traceback(traceback_str: str) -> str

170

def remove_highlighting(traceback_str: str) -> str

171

def populate_exception(err_type: type, err_args: Any, err_state: Dict, traceback_str: str) -> Tuple[Exception, Exception]

172

```

173

174

[Exception Handling](./exception-handling.md)

175

176

### Dashboard Integration

177

178

Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization.

179

180

```python { .api }

181

def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]

182

def shutdown_dashboard() -> None

183

def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> None

184

```

185

186

[Dashboard Integration](./dashboard-integration.md)

187

188

### Utility Functions

189

190

Utility functions for task chunking, CPU affinity management, timing operations, and other helper functionality.

191

192

```python { .api }

193

def cpu_count() -> int

194

def set_cpu_affinity(pid: int, mask: List[int]) -> None

195

def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None, **kwargs) -> Generator

196

def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> str

197

```

198

199

[Utility Functions](./utility-functions.md)

200

201

## Types

202

203

```python { .api }

204

from typing import Union, List, Optional, Any, Callable, Dict, Sized, Iterable, Generator, Tuple

205

206

# Type aliases

207

CPUList = Union[int, List[int], List[List[int]]]

208

209

# Context constants

210

DEFAULT_START_METHOD: str

211

FORK_AVAILABLE: bool

212

RUNNING_WINDOWS: bool

213

RUNNING_MACOS: bool

214

```