0
# Parallel Map Functions
1
2
Map-style parallel execution functions for processing iterables across multiple workers. Includes ordered and unordered variants, plus iterator versions for memory-efficient processing of large datasets.
3
4
## Capabilities
5
6
### Ordered Map Functions
7
8
Process iterables in parallel while maintaining result order.
9
10
```python { .api }
11
def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
12
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
13
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
14
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
15
concatenate_numpy_output: bool = True, worker_init: Optional[Callable] = None,
16
worker_exit: Optional[Callable] = None, task_timeout: Optional[float] = None,
17
worker_init_timeout: Optional[float] = None, worker_exit_timeout: Optional[float] = None,
18
progress_bar_options: Optional[Dict[str, Any]] = None,
19
progress_bar_style: Optional[str] = None) -> Any
20
```
21
22
**Parameters:**
23
- `func` (Callable): Function to apply to each item
24
- `iterable_of_args` (Union[Sized, Iterable]): Arguments to process
25
- `iterable_len` (Optional[int]): Length of iterable if not sized
26
- `max_tasks_active` (Optional[int]): Maximum number of active tasks to prevent memory issues
27
- `chunk_size` (Optional[int]): Number of tasks per chunk for worker processing
28
- `n_splits` (Optional[int]): Number of splits for automatic chunking
29
- `worker_lifespan` (Optional[int]): Number of tasks before worker restart
30
- `progress_bar` (bool): Show progress bar during execution
31
- `concatenate_numpy_output` (bool): Whether to concatenate numpy array outputs
32
- `progress_bar_options` (Optional[Dict]): Custom tqdm progress bar options
33
- `progress_bar_style` (Optional[str]): Progress bar style ('std', 'notebook', 'dashboard')
34
- `enable_insights` (bool): Enable worker performance insights
35
- `worker_init` (Optional[Callable]): Function called when worker starts
36
- `worker_exit` (Optional[Callable]): Function called when worker exits
37
- `task_timeout` (Optional[float]): Timeout in seconds for individual tasks
38
- `worker_init_timeout` (Optional[float]): Timeout for worker initialization
39
- `worker_exit_timeout` (Optional[float]): Timeout for worker exit
40
41
### Unordered Map Functions
42
43
Process iterables in parallel without maintaining result order for better performance.
44
45
```python { .api }
46
def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
47
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
48
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
49
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
50
progress_bar_options: Optional[Dict[str, Any]] = None,
51
progress_bar_style: Optional[str] = None, enable_insights: bool = False,
52
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
53
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
54
worker_exit_timeout: Optional[float] = None) -> List
55
```
56
57
### Iterator Map Functions
58
59
Memory-efficient iterator versions that yield results as they become available.
60
61
```python { .api }
62
def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
63
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
64
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
65
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
66
progress_bar_options: Optional[Dict[str, Any]] = None,
67
progress_bar_style: Optional[str] = None, enable_insights: bool = False,
68
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
69
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
70
worker_exit_timeout: Optional[float] = None) -> Iterator
71
72
def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
73
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
74
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
75
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
76
progress_bar_options: Optional[Dict[str, Any]] = None,
77
progress_bar_style: Optional[str] = None, enable_insights: bool = False,
78
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
79
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
80
worker_exit_timeout: Optional[float] = None) -> Iterator
81
```
82
83
## Usage Examples
84
85
### Basic Map Operations
86
87
```python
88
from mpire import WorkerPool
89
90
def square(x):
91
return x * x
92
93
with WorkerPool(n_jobs=4) as pool:
94
# Ordered results
95
results = pool.map(square, range(10))
96
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
97
98
# Unordered results (potentially faster)
99
results = pool.map_unordered(square, range(10))
100
print(sorted(results)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
101
```
102
103
### Iterator Processing
104
105
```python
106
# Memory-efficient processing of large datasets
107
with WorkerPool(n_jobs=4) as pool:
108
# Process results as they become available
109
for result in pool.imap(expensive_function, large_dataset):
110
process_result(result)
111
112
# Unordered iterator for maximum performance
113
for result in pool.imap_unordered(expensive_function, large_dataset):
114
process_result(result)
115
```
116
117
### Progress Tracking
118
119
```python
120
# Basic progress bar
121
with WorkerPool(n_jobs=4) as pool:
122
results = pool.map(slow_function, range(100), progress_bar=True)
123
124
# Custom progress bar options
125
progress_options = {
126
'desc': 'Processing items',
127
'unit': 'items',
128
'disable': False
129
}
130
131
with WorkerPool(n_jobs=4) as pool:
132
results = pool.map(
133
slow_function,
134
range(100),
135
progress_bar=True,
136
progress_bar_options=progress_options
137
)
138
```
139
140
### Task Chunking and Performance Tuning
141
142
```python
143
# Manual chunk size control
144
with WorkerPool(n_jobs=4) as pool:
145
results = pool.map(
146
quick_function,
147
range(10000),
148
chunk_size=50 # Process 50 items per chunk
149
)
150
151
# Automatic chunking with splits
152
with WorkerPool(n_jobs=4) as pool:
153
results = pool.map(
154
function,
155
data,
156
n_splits=20 # Split into 20 chunks automatically
157
)
158
159
# Memory management with active task limit
160
with WorkerPool(n_jobs=4) as pool:
161
results = pool.map(
162
memory_intensive_function,
163
large_dataset,
164
max_tasks_active=8 # Limit active tasks to prevent memory issues
165
)
166
```
167
168
### Worker Lifecycle Management
169
170
```python
171
def init_worker(worker_state):
172
"""Initialize worker with expensive resources"""
173
worker_state['model'] = load_machine_learning_model()
174
worker_state['database'] = connect_to_database()
175
176
def exit_worker(worker_state):
177
"""Clean up worker resources"""
178
worker_state['database'].close()
179
180
def process_item(worker_state, item):
181
"""Process item using worker state"""
182
prediction = worker_state['model'].predict(item)
183
worker_state['database'].save_result(item, prediction)
184
return prediction
185
186
with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
187
results = pool.map(
188
process_item,
189
items,
190
worker_init=init_worker,
191
worker_exit=exit_worker,
192
worker_lifespan=100 # Restart workers every 100 tasks
193
)
194
```
195
196
### Timeout Management
197
198
```python
199
# Function that might hang
200
def unreliable_function(x):
201
import random, time
202
if random.random() < 0.1: # 10% chance of hanging
203
time.sleep(1000)
204
return x * 2
205
206
with WorkerPool(n_jobs=4) as pool:
207
results = pool.map(
208
unreliable_function,
209
range(100),
210
task_timeout=5.0, # 5 second timeout per task
211
worker_init_timeout=10.0, # 10 second worker init timeout
212
worker_exit_timeout=5.0 # 5 second worker exit timeout
213
)
214
```