0
# Utility Functions
1
2
Utility functions for task chunking, CPU affinity management, timing operations, and other helper functionality. These utilities provide fine-grained control over multiprocessing performance and resource management.
3
4
## Capabilities
5
6
### CPU and System Utilities
7
8
Functions for managing CPU resources and system information.
9
10
```python { .api }
11
def cpu_count() -> int
12
def set_cpu_affinity(pid: int, mask: List[int]) -> None
13
```
14
15
**cpu_count**: Get the number of available CPU cores (imported from multiprocessing).
16
17
**set_cpu_affinity**: Set CPU affinity for a process to specific CPU cores.
18
- `pid` (int): Process ID to set affinity for
19
- `mask` (List[int]): List of CPU core IDs to bind the process to
20
21
### Task Chunking Utilities
22
23
Functions for optimizing task distribution and chunking strategies.
24
25
```python { .api }
26
def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
27
n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,
28
chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator
29
30
def apply_numpy_chunking(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
31
n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,
32
chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator
33
34
def get_n_chunks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
35
chunk_size: Optional[int] = None, n_jobs: Optional[int] = None,
36
n_tasks_per_job: Optional[int] = None, n_splits: Optional[int] = None) -> int
37
```
38
39
**chunk_tasks**: Split an iterable into optimally-sized chunks for parallel processing.
40
41
**apply_numpy_chunking**: Apply numpy-specific chunking optimizations for array processing.
42
43
**get_n_chunks**: Calculate the optimal number of chunks for given parameters.
44
45
### Argument Processing
46
47
Functions for preparing arguments for parallel processing.
48
49
```python { .api }
50
def make_single_arguments(iterable_of_args: Iterable, generator: bool = True) -> Union[List, Generator]
51
```
52
53
**make_single_arguments**: Convert multi-argument tuples to single arguments for functions expecting individual parameters.
54
55
### Time and Formatting Utilities
56
57
Functions for timing operations and formatting output.
58
59
```python { .api }
60
def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> str
61
62
class TimeIt:
63
def __init__(self, label: str = "Operation") -> None
64
def __enter__(self) -> 'TimeIt'
65
def __exit__(self, exc_type, exc_val, exc_tb) -> None
66
```
67
68
**format_seconds**: Format seconds into human-readable time strings.
69
70
**TimeIt**: Context manager for timing code blocks with automatic reporting.
71
72
### Manager and Communication Utilities
73
74
Functions for creating multiprocessing managers and communication objects.
75
76
```python { .api }
77
def create_sync_manager(use_dill: bool) -> SyncManager
78
79
class NonPickledSyncManager:
80
"""Synchronization manager that doesn't require pickling"""
81
pass
82
```
83
84
**create_sync_manager**: Create a multiprocessing SyncManager with optional dill support.
85
86
**NonPickledSyncManager**: Alternative sync manager for scenarios where pickling is problematic.
87
88
## Usage Examples
89
90
### CPU Affinity Management
91
92
```python
93
from mpire import WorkerPool
94
from mpire.utils import set_cpu_affinity
95
import os
96
import time
97
98
def cpu_intensive_task(x):
99
"""CPU-bound task that benefits from CPU pinning"""
100
# Show which CPU the process is running on
101
pid = os.getpid()
102
print(f"Process {pid} processing {x}")
103
104
# CPU-intensive computation
105
result = 0
106
for i in range(x * 100000):
107
result += i
108
return result
109
110
# Pin workers to specific CPUs
111
cpu_assignments = [0, 1, 2, 3] # Use first 4 CPUs
112
113
with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:
114
results = pool.map(cpu_intensive_task, range(8))
115
print(f"Results: {results}")
116
117
# Manual CPU affinity setting
118
def set_process_affinity():
119
pid = os.getpid()
120
set_cpu_affinity(pid, [0, 2]) # Pin to CPUs 0 and 2
121
print(f"Process {pid} pinned to CPUs 0 and 2")
122
123
set_process_affinity()
124
```
125
126
### Task Chunking Optimization
127
128
```python
129
from mpire import WorkerPool
130
from mpire.utils import chunk_tasks, get_n_chunks
131
import time
132
133
def quick_task(x):
134
"""Fast task that benefits from larger chunks"""
135
return x * 2
136
137
def slow_task(x):
138
"""Slow task that benefits from smaller chunks"""
139
time.sleep(0.01)
140
return x ** 2
141
142
# Analyze chunking for different scenarios
143
data = range(1000)
144
145
print("=== Chunking Analysis ===")
146
147
# Quick tasks - use larger chunks to reduce overhead
148
quick_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=50))
149
print(f"Quick task chunks: {len(quick_chunks)} chunks")
150
print(f"Chunk sizes: {[len(chunk) for chunk in quick_chunks[:5]]}...")
151
152
# Slow tasks - use smaller chunks for better load balancing
153
slow_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=10))
154
print(f"Slow task chunks: {len(slow_chunks)} chunks")
155
print(f"Chunk sizes: {[len(chunk) for chunk in slow_chunks[:5]]}...")
156
157
# Calculate optimal chunk count
158
optimal_chunks = get_n_chunks(data, n_jobs=4, n_tasks_per_job=25)
159
print(f"Optimal chunk count: {optimal_chunks}")
160
161
# Test with WorkerPool
162
with WorkerPool(n_jobs=4) as pool:
163
# Quick tasks with large chunks
164
start_time = time.time()
165
results1 = pool.map(quick_task, data, chunk_size=50)
166
quick_time = time.time() - start_time
167
168
# Slow tasks with small chunks
169
start_time = time.time()
170
results2 = pool.map(slow_task, range(100), chunk_size=5)
171
slow_time = time.time() - start_time
172
173
print(f"Quick tasks time: {quick_time:.2f}s")
174
print(f"Slow tasks time: {slow_time:.2f}s")
175
```
176
177
### Numpy Array Chunking
178
179
```python
180
import numpy as np
181
from mpire import WorkerPool
182
from mpire.utils import apply_numpy_chunking
183
184
def process_array_chunk(chunk):
185
"""Process a numpy array chunk"""
186
# Simulate array processing
187
return np.sum(chunk ** 2)
188
189
# Create large numpy array
190
large_array = np.random.rand(10000)
191
192
print("=== Numpy Chunking ===")
193
194
# Apply numpy-specific chunking
195
chunks = list(apply_numpy_chunking(large_array, n_jobs=4, chunk_size=1000))
196
print(f"Created {len(chunks)} chunks")
197
print(f"Chunk shapes: {[chunk.shape for chunk in chunks[:3]]}...")
198
199
# Process with WorkerPool
200
with WorkerPool(n_jobs=4) as pool:
201
results = pool.map(process_array_chunk, chunks)
202
total_result = sum(results)
203
print(f"Total processing result: {total_result:.2f}")
204
205
# Compare with direct numpy processing
206
direct_result = np.sum(large_array ** 2)
207
print(f"Direct numpy result: {direct_result:.2f}")
208
print(f"Results match: {abs(total_result - direct_result) < 1e-10}")
209
```
210
211
### Argument Processing
212
213
```python
214
from mpire import WorkerPool
215
from mpire.utils import make_single_arguments
216
217
def multi_arg_function(a, b, c):
218
"""Function that expects multiple arguments"""
219
return a + b * c
220
221
def single_arg_function(args):
222
"""Function that expects a single tuple argument"""
223
a, b, c = args
224
return a + b * c
225
226
# Original data as tuples
227
multi_arg_data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]
228
229
with WorkerPool(n_jobs=2) as pool:
230
# Method 1: Use starmap-like functionality (MPIRE handles this automatically)
231
results1 = pool.map(multi_arg_function, multi_arg_data)
232
print(f"Multi-arg results: {results1}")
233
234
# Method 2: Convert to single arguments if needed
235
single_args = make_single_arguments(multi_arg_data, generator=False)
236
results2 = pool.map(single_arg_function, single_args)
237
print(f"Single-arg results: {results2}")
238
239
# Verify results are the same
240
print(f"Results match: {results1 == results2}")
241
```
242
243
### Timing Operations
244
245
```python
246
from mpire import WorkerPool
247
from mpire.utils import TimeIt, format_seconds
248
import time
249
250
def timed_operation(duration):
251
"""Operation with known duration"""
252
time.sleep(duration)
253
return f"Slept for {duration} seconds"
254
255
# Time individual operations
256
with TimeIt("Single operation"):
257
result = timed_operation(0.5)
258
259
# Time parallel operations
260
with TimeIt("Parallel operations"):
261
with WorkerPool(n_jobs=3) as pool:
262
results = pool.map(timed_operation, [0.2, 0.3, 0.4, 0.1, 0.2])
263
264
# Manual timing with formatting
265
start_time = time.time()
266
with WorkerPool(n_jobs=2) as pool:
267
results = pool.map(timed_operation, [0.1] * 10)
268
elapsed_time = time.time() - start_time
269
270
formatted_time = format_seconds(elapsed_time, with_milliseconds=True)
271
print(f"Manual timing: {formatted_time}")
272
273
# Timing with different formatting options
274
test_times = [0.001, 0.1, 1.5, 65.3, 3661.7]
275
for t in test_times:
276
with_ms = format_seconds(t, with_milliseconds=True)
277
without_ms = format_seconds(t, with_milliseconds=False)
278
print(f"{t:8.3f}s -> With MS: {with_ms:>15} | Without MS: {without_ms:>10}")
279
```
280
281
### Custom Sync Manager Usage
282
283
```python
284
from mpire import WorkerPool
285
from mpire.utils import create_sync_manager, NonPickledSyncManager
286
import multiprocessing
287
288
def worker_with_shared_dict(shared_dict, worker_id, items):
289
"""Worker that updates a shared dictionary"""
290
for item in items:
291
shared_dict[f"worker_{worker_id}_item_{item}"] = item ** 2
292
return len(items)
293
294
# Example 1: Standard sync manager
295
print("=== Standard Sync Manager ===")
296
with create_sync_manager(use_dill=False) as manager:
297
shared_dict = manager.dict()
298
299
with WorkerPool(n_jobs=3, shared_objects=shared_dict) as pool:
300
results = pool.map(
301
worker_with_shared_dict,
302
[(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9])],
303
pass_worker_id=False # Pass worker_id manually in args
304
)
305
306
print(f"Processed items: {sum(results)}")
307
print(f"Shared dict contents: {dict(shared_dict)}")
308
309
# Example 2: Dill-enabled sync manager (for complex objects)
310
print("\n=== Dill Sync Manager ===")
311
try:
312
with create_sync_manager(use_dill=True) as manager:
313
# Create shared objects that might need dill
314
shared_list = manager.list()
315
shared_dict = manager.dict()
316
317
def complex_worker(shared_objects, data):
318
shared_list, shared_dict = shared_objects
319
# Process complex data types
320
shared_list.append(len(data))
321
shared_dict[f"len_{len(data)}"] = data
322
return sum(data)
323
324
test_data = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
325
326
with WorkerPool(n_jobs=2, shared_objects=(shared_list, shared_dict)) as pool:
327
results = pool.map(complex_worker, test_data)
328
329
print(f"Results: {results}")
330
print(f"Shared list: {list(shared_list)}")
331
print(f"Shared dict keys: {list(shared_dict.keys())}")
332
333
except ImportError:
334
print("Dill not available, skipping dill manager example")
335
336
# Example 3: Non-pickled manager for special cases
337
print("\n=== Non-Pickled Manager ===")
338
def simple_shared_counter():
339
"""Example using a simple shared counter"""
340
counter = multiprocessing.Value('i', 0)
341
342
def increment_counter(shared_counter, increment):
343
with shared_counter.get_lock():
344
shared_counter.value += increment
345
return shared_counter.value
346
347
with WorkerPool(n_jobs=2, shared_objects=counter) as pool:
348
results = pool.map(increment_counter, [1, 2, 3, 4, 5])
349
350
print(f"Final counter value: {counter.value}")
351
print(f"Worker results: {results}")
352
353
simple_shared_counter()
354
```
355
356
### Combined Utility Usage
357
358
```python
359
from mpire import WorkerPool, cpu_count
360
from mpire.utils import TimeIt, format_seconds, chunk_tasks, set_cpu_affinity
361
import numpy as np
362
import time
363
364
def comprehensive_utility_example():
365
"""Example combining multiple utilities"""
366
367
print(f"=== System Information ===")
368
print(f"Available CPUs: {cpu_count()}")
369
370
# Generate test data
371
data_size = 10000
372
test_array = np.random.rand(data_size)
373
374
def array_processing_task(chunk):
375
"""Process array chunk with timing"""
376
start_time = time.time()
377
result = np.sum(chunk ** 2) + np.mean(chunk)
378
processing_time = time.time() - start_time
379
return result, processing_time
380
381
# Optimize chunking strategy
382
n_workers = min(4, cpu_count())
383
optimal_chunks = list(chunk_tasks(
384
test_array,
385
n_jobs=n_workers,
386
n_tasks_per_job=data_size // (n_workers * 4)
387
))
388
389
print(f"Created {len(optimal_chunks)} chunks for {n_workers} workers")
390
print(f"Chunk sizes: {[len(chunk) for chunk in optimal_chunks[:3]]}...")
391
392
# Process with timing and CPU pinning
393
with TimeIt("Complete parallel processing"):
394
cpu_ids = list(range(min(n_workers, cpu_count())))
395
396
with WorkerPool(n_jobs=n_workers, cpu_ids=cpu_ids, enable_insights=True) as pool:
397
results = pool.map(array_processing_task, optimal_chunks)
398
399
# Extract results and timings
400
values, timings = zip(*results)
401
total_value = sum(values)
402
avg_chunk_time = np.mean(timings)
403
404
print(f"Total processing value: {total_value:.4f}")
405
print(f"Average chunk processing time: {format_seconds(avg_chunk_time, True)}")
406
407
# Show insights
408
pool.print_insights()
409
410
# Compare with serial processing
411
with TimeIt("Serial processing"):
412
serial_result, serial_time = array_processing_task(test_array)
413
414
print(f"\nComparison:")
415
print(f"Parallel result: {total_value:.4f}")
416
print(f"Serial result: {serial_result:.4f}")
417
print(f"Results match: {abs(total_value - serial_result) < 1e-10}")
418
print(f"Serial processing time: {format_seconds(serial_time, True)}")
419
420
comprehensive_utility_example()
421
```