0
# Process Pools
1
2
Parallel task execution using worker process pools. The Pool class provides a convenient way to distribute tasks across multiple processes with various execution patterns and result handling options.
3
4
## Capabilities
5
6
### Pool Class
7
8
Main class for managing a pool of worker processes for parallel task execution.
9
10
```python { .api }
11
class Pool:
12
"""
13
A pool of worker processes for parallel task execution.
14
15
Args:
16
processes: number of worker processes (default: cpu_count())
17
initializer: callable to run on each worker process startup
18
initargs: arguments for the initializer function
19
maxtasksperchild: maximum tasks per worker before restart (default: None)
20
context: multiprocess context to use for creating processes
21
"""
22
def __init__(self, processes=None, initializer=None, initargs=(),
23
maxtasksperchild=None, context=None): ...
24
25
def map(self, func, iterable, chunksize=None):
26
"""
27
Apply function to every item of iterable and return a list of results.
28
29
Args:
30
func: function to apply to each item
31
iterable: items to process
32
chunksize: items per task sent to worker processes
33
34
Returns:
35
list: results in same order as input
36
"""
37
38
def map_async(self, func, iterable, chunksize=None, callback=None,
39
error_callback=None):
40
"""
41
Asynchronous version of map() method.
42
43
Args:
44
func: function to apply to each item
45
iterable: items to process
46
chunksize: items per task sent to worker processes
47
callback: function to call with results when complete
48
error_callback: function to call if an error occurs
49
50
Returns:
51
AsyncResult: result object for async operation
52
"""
53
54
def imap(self, func, iterable, chunksize=1):
55
"""
56
Lazy version of map() that returns an iterator.
57
58
Args:
59
func: function to apply to each item
60
iterable: items to process
61
chunksize: items per task sent to worker processes
62
63
Returns:
64
iterator: iterator over results
65
"""
66
67
def imap_unordered(self, func, iterable, chunksize=1):
68
"""
69
Like imap() but results can be returned in any order.
70
71
Args:
72
func: function to apply to each item
73
iterable: items to process
74
chunksize: items per task sent to worker processes
75
76
Returns:
77
iterator: iterator over results in arbitrary order
78
"""
79
80
def starmap(self, func, iterable, chunksize=None):
81
"""
82
Like map() but arguments are unpacked from tuples.
83
84
Args:
85
func: function to apply (called with *args from each tuple)
86
iterable: sequence of tuples containing arguments
87
chunksize: items per task sent to worker processes
88
89
Returns:
90
list: results in same order as input
91
"""
92
93
def starmap_async(self, func, iterable, chunksize=None, callback=None,
94
error_callback=None):
95
"""
96
Asynchronous version of starmap() method.
97
98
Args:
99
func: function to apply (called with *args from each tuple)
100
iterable: sequence of tuples containing arguments
101
chunksize: items per task sent to worker processes
102
callback: function to call with results when complete
103
error_callback: function to call if an error occurs
104
105
Returns:
106
AsyncResult: result object for async operation
107
"""
108
109
def apply(self, func, args=(), kwds={}):
110
"""
111
Apply function with arguments and return the result.
112
113
Args:
114
func: function to call
115
args: positional arguments for func
116
kwds: keyword arguments for func
117
118
Returns:
119
object: result of function call
120
"""
121
122
def apply_async(self, func, args=(), kwds={}, callback=None,
123
error_callback=None):
124
"""
125
Asynchronous version of apply() method.
126
127
Args:
128
func: function to call
129
args: positional arguments for func
130
kwds: keyword arguments for func
131
callback: function to call with result when complete
132
error_callback: function to call if an error occurs
133
134
Returns:
135
AsyncResult: result object for async operation
136
"""
137
138
def close(self):
139
"""
140
Prevent any more tasks from being submitted to the pool.
141
Once closed, no new tasks can be submitted.
142
"""
143
144
def terminate(self):
145
"""
146
Stop the worker processes immediately without completing work.
147
"""
148
149
def join(self):
150
"""
151
Wait for the worker processes to exit.
152
Must call close() or terminate() before using join().
153
"""
154
155
def __enter__(self):
156
"""Context manager entry."""
157
158
def __exit__(self, exc_type, exc_val, exc_tb):
159
"""Context manager exit - closes pool and joins workers."""
160
```
161
162
### AsyncResult Class
163
164
Object representing the result of an asynchronous operation.
165
166
```python { .api }
167
class AsyncResult:
168
"""
169
Result object for asynchronous pool operations.
170
"""
171
def get(self, timeout=None):
172
"""
173
Return the result when it arrives.
174
175
Args:
176
timeout: maximum time to wait (seconds)
177
178
Returns:
179
object: result of the operation
180
181
Raises:
182
TimeoutError: if timeout exceeded
183
"""
184
185
def wait(self, timeout=None):
186
"""
187
Wait until the result is available.
188
189
Args:
190
timeout: maximum time to wait (seconds)
191
192
Returns:
193
bool: True if result is available, False if timeout
194
"""
195
196
def ready(self):
197
"""
198
Return True if the operation is complete.
199
200
Returns:
201
bool: True if operation is complete
202
"""
203
204
def successful(self):
205
"""
206
Return True if the operation completed without error.
207
Must call ready() first to ensure operation is complete.
208
209
Returns:
210
bool: True if successful
211
212
Raises:
213
ValueError: if operation is not yet complete
214
"""
215
```
216
217
## Usage Examples
218
219
### Basic Pool Map
220
221
```python
222
from multiprocess import Pool
223
224
def square(x):
225
return x * x
226
227
if __name__ == '__main__':
228
with Pool(processes=4) as pool:
229
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
230
results = pool.map(square, numbers)
231
print(f"Results: {results}")
232
# Output: Results: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
233
```
234
235
### Asynchronous Processing
236
237
```python
238
from multiprocess import Pool
239
import time
240
241
def slow_function(x):
242
time.sleep(1) # Simulate slow work
243
return x * x
244
245
def result_callback(result):
246
print(f"Got result: {result}")
247
248
def error_callback(error):
249
print(f"Got error: {error}")
250
251
if __name__ == '__main__':
252
with Pool(processes=2) as pool:
253
# Submit async job
254
async_result = pool.map_async(
255
slow_function,
256
[1, 2, 3, 4],
257
callback=result_callback,
258
error_callback=error_callback
259
)
260
261
# Do other work while waiting
262
print("Doing other work...")
263
time.sleep(0.5)
264
print("Still working...")
265
266
# Get results (blocks until complete)
267
results = async_result.get(timeout=10)
268
print(f"Final results: {results}")
269
```
270
271
### Starmap for Multiple Arguments
272
273
```python
274
from multiprocess import Pool
275
276
def multiply(x, y):
277
return x * y
278
279
def power(base, exponent):
280
return base ** exponent
281
282
if __name__ == '__main__':
283
with Pool(processes=3) as pool:
284
# Each tuple contains arguments for the function
285
multiply_args = [(2, 3), (4, 5), (6, 7)]
286
multiply_results = pool.starmap(multiply, multiply_args)
287
print(f"Multiply results: {multiply_results}")
288
# Output: Multiply results: [6, 20, 42]
289
290
power_args = [(2, 3), (3, 2), (4, 2), (5, 2)]
291
power_results = pool.starmap(power, power_args)
292
print(f"Power results: {power_results}")
293
# Output: Power results: [8, 9, 16, 25]
294
```
295
296
### Iterator-based Processing
297
298
```python
299
from multiprocess import Pool
300
import time
301
302
def process_item(x):
303
# Simulate variable processing time
304
time.sleep(x * 0.1)
305
return x * x
306
307
if __name__ == '__main__':
308
with Pool(processes=2) as pool:
309
items = range(1, 11)
310
311
# Ordered iterator (results in input order)
312
print("Ordered results:")
313
for result in pool.imap(process_item, items, chunksize=2):
314
print(f"Got result: {result}")
315
316
print("\nUnordered results:")
317
# Unordered iterator (results as they complete)
318
for result in pool.imap_unordered(process_item, items, chunksize=2):
319
print(f"Got result: {result}")
320
```
321
322
### Pool with Initializer
323
324
```python
325
from multiprocess import Pool
326
import os
327
328
# Global variable in worker processes
329
worker_state = None
330
331
def init_worker(initial_value):
332
global worker_state
333
worker_state = initial_value
334
print(f"Worker {os.getpid()} initialized with {initial_value}")
335
336
def worker_task(x):
337
global worker_state
338
pid = os.getpid()
339
result = x + worker_state
340
print(f"Worker {pid} processed {x} with state {worker_state} = {result}")
341
return result
342
343
if __name__ == '__main__':
344
# Each worker will be initialized with value 100
345
with Pool(processes=2, initializer=init_worker, initargs=(100,)) as pool:
346
tasks = [1, 2, 3, 4, 5]
347
results = pool.map(worker_task, tasks)
348
print(f"Results: {results}")
349
```
350
351
### Error Handling
352
353
```python
354
from multiprocess import Pool
355
import random
356
357
def unreliable_function(x):
358
if random.random() < 0.3: # 30% chance of error
359
raise ValueError(f"Error processing {x}")
360
return x * x
361
362
def handle_result(result):
363
print(f"Success: {result}")
364
365
def handle_error(error):
366
print(f"Error occurred: {error}")
367
368
if __name__ == '__main__':
369
with Pool(processes=2) as pool:
370
# Submit multiple async tasks
371
async_results = []
372
for i in range(10):
373
result = pool.apply_async(
374
unreliable_function,
375
(i,),
376
callback=handle_result,
377
error_callback=handle_error
378
)
379
async_results.append(result)
380
381
# Wait for all tasks and handle individual results
382
for i, async_result in enumerate(async_results):
383
try:
384
result = async_result.get(timeout=5)
385
print(f"Task {i} completed successfully: {result}")
386
except Exception as e:
387
print(f"Task {i} failed: {e}")
388
```
389
390
### Pool with Context Manager and Resource Cleanup
391
392
```python
393
from multiprocess import Pool
394
import time
395
import os
396
397
def cpu_intensive_task(n):
398
"""Simulate CPU-intensive work"""
399
pid = os.getpid()
400
start_time = time.time()
401
402
# Simulate computation
403
total = 0
404
for i in range(n * 1000000):
405
total += i * i
406
407
end_time = time.time()
408
duration = end_time - start_time
409
410
return {
411
'pid': pid,
412
'input': n,
413
'result': total,
414
'duration': duration
415
}
416
417
if __name__ == '__main__':
418
tasks = [10, 20, 30, 40, 50]
419
420
# Using context manager ensures proper cleanup
421
with Pool(processes=3) as pool:
422
print("Starting parallel processing...")
423
start_time = time.time()
424
425
# Process tasks in parallel
426
results = pool.map(cpu_intensive_task, tasks)
427
428
end_time = time.time()
429
total_time = end_time - start_time
430
431
print(f"\nAll tasks completed in {total_time:.2f} seconds")
432
print("\nResults:")
433
for result in results:
434
print(f"PID {result['pid']}: input={result['input']}, "
435
f"duration={result['duration']:.3f}s")
436
```
437
438
### Chunking for Performance
439
440
```python
441
from multiprocess import Pool
442
import time
443
444
def simple_task(x):
445
return x * x
446
447
def benchmark_chunking(items, pool_size, chunk_sizes):
448
"""Benchmark different chunk sizes"""
449
for chunk_size in chunk_sizes:
450
with Pool(processes=pool_size) as pool:
451
start_time = time.time()
452
results = pool.map(simple_task, items, chunksize=chunk_size)
453
end_time = time.time()
454
455
duration = end_time - start_time
456
print(f"Chunk size {chunk_size}: {duration:.3f} seconds")
457
458
if __name__ == '__main__':
459
# Large dataset
460
items = list(range(10000))
461
pool_size = 4
462
463
# Test different chunk sizes
464
chunk_sizes = [1, 10, 50, 100, 500, 1000]
465
466
print("Benchmarking chunk sizes:")
467
benchmark_chunking(items, pool_size, chunk_sizes)
468
```
469
470
### Advanced: Custom Result Processing
471
472
```python
473
from multiprocess import Pool
474
import json
475
import time
476
477
def fetch_and_process_data(item_id):
478
"""Simulate fetching and processing data"""
479
# Simulate network delay
480
time.sleep(0.1)
481
482
# Simulate data processing
483
data = {
484
'id': item_id,
485
'value': item_id * 10,
486
'processed_at': time.time(),
487
'status': 'completed'
488
}
489
490
return data
491
492
def save_result(result):
493
"""Callback to save each result as it completes"""
494
with open(f"result_{result['id']}.json", 'w') as f:
495
json.dump(result, f)
496
print(f"Saved result for item {result['id']}")
497
498
if __name__ == '__main__':
499
item_ids = list(range(1, 21)) # Process 20 items
500
501
with Pool(processes=4) as pool:
502
# Submit all tasks asynchronously with callback
503
async_results = []
504
for item_id in item_ids:
505
result = pool.apply_async(
506
fetch_and_process_data,
507
(item_id,),
508
callback=save_result
509
)
510
async_results.append(result)
511
512
# Monitor progress
513
completed = 0
514
while completed < len(async_results):
515
ready_count = sum(1 for r in async_results if r.ready())
516
if ready_count > completed:
517
completed = ready_count
518
print(f"Progress: {completed}/{len(async_results)} tasks completed")
519
time.sleep(0.5)
520
521
print("All tasks completed!")
522
```