0
# Process Pools
1
2
Managed pools of worker processes for executing CPU-intensive tasks. Process pools bypass Python's Global Interpreter Lock (GIL) to provide true parallelism with advanced features like timeouts, automatic worker restart, and comprehensive error handling with process isolation.
3
4
## Capabilities
5
6
### ProcessPool Class
7
8
A managed pool of worker processes that can execute multiple tasks concurrently with true parallelism, automatic process lifecycle management, and advanced error handling.
9
10
```python { .api }
11
class ProcessPool:
12
def __init__(
13
self,
14
max_workers: int = multiprocessing.cpu_count(),
15
max_tasks: int = 0,
16
initializer: Callable = None,
17
initargs: list = (),
18
context: multiprocessing.context.BaseContext = multiprocessing
19
):
20
"""
21
Create a process pool for CPU-intensive concurrent task execution.
22
23
Parameters:
24
- max_workers: Maximum number of worker processes (defaults to CPU count)
25
- max_tasks: Maximum tasks per worker before restart (0 = no limit)
26
- initializer: Function called when each worker process starts
27
- initargs: Arguments passed to initializer function
28
- context: Multiprocessing context (spawn, fork, forkserver)
29
"""
30
```
31
32
#### Basic Usage
33
34
```python
35
from pebble import ProcessPool
36
import time
37
38
# Create pool with default settings
39
pool = ProcessPool()
40
41
# Create pool with custom configuration
42
pool = ProcessPool(max_workers=4, max_tasks=50)
43
44
def cpu_intensive_task(n, multiplier=1):
45
# CPU-intensive computation that benefits from true parallelism
46
total = 0
47
for i in range(n):
48
total += (i ** 2) * multiplier
49
return total
50
51
# Schedule tasks
52
future1 = pool.schedule(cpu_intensive_task, args=(100000,), kwargs={"multiplier": 2})
53
future2 = pool.schedule(cpu_intensive_task, args=(200000,))
54
55
# Get results
56
result1 = future1.result()
57
result2 = future2.result()
58
59
print(f"Results: {result1}, {result2}")
60
61
# Always clean up
62
pool.close()
63
pool.join()
64
```
65
66
### Task Scheduling with Timeouts
67
68
Schedule individual tasks with timeout protection to prevent runaway processes:
69
70
```python { .api }
71
def schedule(
72
self,
73
function: Callable,
74
args: list = (),
75
kwargs: dict = {},
76
timeout: float = None
77
) -> ProcessFuture:
78
"""
79
Schedule a function for execution in the process pool.
80
81
Parameters:
82
- function: The function to execute
83
- args: Positional arguments to pass to function
84
- kwargs: Keyword arguments to pass to function
85
- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
86
87
Returns:
88
ProcessFuture object for retrieving the result
89
"""
90
91
def submit(
92
self,
93
function: Callable,
94
timeout: Optional[float],
95
/,
96
*args,
97
**kwargs
98
) -> ProcessFuture:
99
"""
100
Submit a function for execution (compatibility with concurrent.futures).
101
102
Parameters:
103
- function: The function to execute
104
- timeout: Maximum execution time in seconds (positional-only parameter)
105
- args: Positional arguments to pass to function
106
- kwargs: Keyword arguments to pass to function
107
108
Returns:
109
ProcessFuture object for retrieving the result
110
"""
111
```
112
113
#### Scheduling Examples
114
115
```python
116
from pebble import ProcessPool, ProcessExpired
117
import time
118
import math
119
120
def prime_factorization(n):
121
"""CPU-intensive task: find prime factors"""
122
factors = []
123
d = 2
124
while d * d <= n:
125
while n % d == 0:
126
factors.append(d)
127
n //= d
128
d += 1
129
if n > 1:
130
factors.append(n)
131
return factors
132
133
def monte_carlo_pi(iterations):
134
"""CPU-intensive task: estimate Pi using Monte Carlo method"""
135
import random
136
inside_circle = 0
137
for _ in range(iterations):
138
x = random.random()
139
y = random.random()
140
if x*x + y*y <= 1:
141
inside_circle += 1
142
return 4.0 * inside_circle / iterations
143
144
def potentially_slow_task(delay):
145
"""Task that might run too long"""
146
time.sleep(delay)
147
return f"Completed after {delay} seconds"
148
149
# CPU-intensive work with timeouts
150
with ProcessPool(max_workers=4) as pool:
151
# Schedule CPU-intensive tasks
152
numbers = [982451653, 982451654, 982451655, 982451656]
153
factor_futures = []
154
155
for num in numbers:
156
future = pool.schedule(
157
prime_factorization,
158
args=(num,),
159
timeout=30.0 # 30 second timeout
160
)
161
factor_futures.append(future)
162
163
# Monte Carlo Pi estimation
164
pi_futures = []
165
for iterations in [1000000, 2000000, 3000000]:
166
future = pool.schedule(
167
monte_carlo_pi,
168
args=(iterations,),
169
timeout=60.0 # 60 second timeout
170
)
171
pi_futures.append(future)
172
173
# Schedule tasks that might timeout
174
timeout_futures = [
175
pool.schedule(potentially_slow_task, args=(1,), timeout=5.0), # Should complete
176
pool.schedule(potentially_slow_task, args=(10,), timeout=5.0) # Should timeout
177
]
178
179
# Collect results with error handling
180
print("Prime factorizations:")
181
for i, future in enumerate(factor_futures):
182
try:
183
result = future.result()
184
print(f" {numbers[i]} = {' × '.join(map(str, result))}")
185
except TimeoutError:
186
print(f" {numbers[i]} = TIMEOUT")
187
except Exception as e:
188
print(f" {numbers[i]} = ERROR: {e}")
189
190
print("\nPi estimations:")
191
for i, future in enumerate(pi_futures):
192
try:
193
pi_estimate = future.result()
194
iterations = [1000000, 2000000, 3000000][i]
195
error = abs(pi_estimate - math.pi)
196
print(f" {iterations:,} iterations: π ≈ {pi_estimate:.6f} (error: {error:.6f})")
197
except Exception as e:
198
print(f" ERROR: {e}")
199
200
print("\nTimeout examples:")
201
for i, future in enumerate(timeout_futures):
202
try:
203
result = future.result()
204
print(f" Task {i+1}: {result}")
205
except TimeoutError:
206
print(f" Task {i+1}: TIMEOUT")
207
except ProcessExpired as e:
208
print(f" Task {i+1}: PROCESS DIED: {e}")
209
```
210
211
### Bulk Operations with Map
212
213
Execute a function across multiple inputs efficiently using process pools:
214
215
```python { .api }
216
def map(
217
self,
218
function: Callable,
219
*iterables,
220
chunksize: int = None,
221
timeout: float = None
222
) -> ProcessMapFuture:
223
"""
224
Apply function to every item of iterables in parallel using processes.
225
226
Parameters:
227
- function: Function to apply to each item
228
- iterables: One or more iterables to process
229
- chunksize: Number of items per chunk sent to each process
230
- timeout: Maximum time to wait for all results
231
232
Returns:
233
ProcessMapFuture object that yields results as they become available
234
"""
235
```
236
237
#### Map Usage Examples
238
239
```python
240
from pebble import ProcessPool
241
import math
242
import time
243
244
def cpu_bound_function(x):
245
"""Simulate CPU-intensive work"""
246
result = 0
247
for i in range(x * 1000):
248
result += math.sin(i) * math.cos(i)
249
return result
250
251
def data_processing_pipeline(data_chunk):
252
"""Process a chunk of data"""
253
processed = []
254
for item in data_chunk:
255
# Simulate complex processing
256
processed_item = {
257
'original': item,
258
'squared': item ** 2,
259
'sqrt': math.sqrt(abs(item)),
260
'factorial': math.factorial(min(abs(item), 10)) # Limit to prevent huge numbers
261
}
262
processed.append(processed_item)
263
return processed
264
265
def matrix_operation(matrix_row):
266
"""Perform operations on matrix row"""
267
return [x ** 2 + math.sin(x) for x in matrix_row]
268
269
# Efficient parallel processing with map
270
with ProcessPool(max_workers=6) as pool:
271
# Process large dataset
272
large_dataset = list(range(1, 101))
273
274
print("Processing large dataset...")
275
start_time = time.time()
276
277
# Use map with optimal chunk size
278
results = pool.map(
279
cpu_bound_function,
280
large_dataset,
281
chunksize=10, # Process 10 items per chunk
282
timeout=120 # 2 minute timeout for entire operation
283
)
284
285
# Convert to list to get all results
286
processed_results = list(results)
287
end_time = time.time()
288
289
print(f"Processed {len(processed_results)} items in {end_time - start_time:.2f} seconds")
290
print(f"Average result: {sum(processed_results) / len(processed_results):.2f}")
291
292
# Data processing pipeline
293
raw_data = [list(range(i*10, (i+1)*10)) for i in range(20)] # 20 chunks of 10 items each
294
295
print("\nRunning data processing pipeline...")
296
pipeline_results = pool.map(
297
data_processing_pipeline,
298
raw_data,
299
chunksize=2, # 2 data chunks per process
300
timeout=60
301
)
302
303
# Flatten results
304
all_processed = []
305
for chunk_result in pipeline_results:
306
all_processed.extend(chunk_result)
307
308
print(f"Processed {len(all_processed)} data items through pipeline")
309
310
# Matrix operations
311
matrix = [[i+j for j in range(100)] for i in range(50)] # 50x100 matrix
312
313
print("\nPerforming matrix operations...")
314
matrix_results = pool.map(
315
matrix_operation,
316
matrix,
317
chunksize=5, # 5 rows per process
318
timeout=30
319
)
320
321
processed_matrix = list(matrix_results)
322
print(f"Processed matrix with {len(processed_matrix)} rows")
323
```
324
325
### Multiprocessing Context Configuration
326
327
Configure the multiprocessing context for different process creation methods:
328
329
```python
330
import multiprocessing
331
from pebble import ProcessPool
332
333
def worker_task(data, worker_id=None):
334
import os
335
return {
336
'data': data,
337
'worker_pid': os.getpid(),
338
'worker_id': worker_id
339
}
340
341
# Different multiprocessing contexts
342
def context_examples():
343
# Spawn context (creates fresh Python interpreter)
344
spawn_ctx = multiprocessing.get_context('spawn')
345
spawn_pool = ProcessPool(max_workers=2, context=spawn_ctx)
346
347
# Fork context (copies current process) - Unix only
348
try:
349
fork_ctx = multiprocessing.get_context('fork')
350
fork_pool = ProcessPool(max_workers=2, context=fork_ctx)
351
except RuntimeError:
352
print("Fork context not available on this platform")
353
fork_pool = None
354
355
# Forkserver context (hybrid approach) - Unix only
356
try:
357
forkserver_ctx = multiprocessing.get_context('forkserver')
358
forkserver_pool = ProcessPool(max_workers=2, context=forkserver_ctx)
359
except RuntimeError:
360
print("Forkserver context not available on this platform")
361
forkserver_pool = None
362
363
# Test different contexts
364
test_data = list(range(10))
365
366
print("Testing spawn context:")
367
with spawn_pool:
368
spawn_results = [
369
spawn_pool.schedule(worker_task, args=(data, f"spawn-{data}"))
370
for data in test_data
371
]
372
for future in spawn_results:
373
print(f" {future.result()}")
374
375
if fork_pool:
376
print("\nTesting fork context:")
377
with fork_pool:
378
fork_results = [
379
fork_pool.schedule(worker_task, args=(data, f"fork-{data}"))
380
for data in test_data
381
]
382
for future in fork_results:
383
print(f" {future.result()}")
384
385
if forkserver_pool:
386
print("\nTesting forkserver context:")
387
with forkserver_pool:
388
forkserver_results = [
389
forkserver_pool.schedule(worker_task, args=(data, f"forkserver-{data}"))
390
for data in test_data
391
]
392
for future in forkserver_results:
393
print(f" {future.result()}")
394
395
# Run context examples
396
context_examples()
397
```
398
399
### Process Initialization and Cleanup
400
401
Initialize worker processes with shared resources and handle cleanup:
402
403
```python
404
from pebble import ProcessPool
405
import multiprocessing
406
import logging
407
import os
408
409
# Global state for worker processes
410
worker_state = {}
411
412
def init_worker_process(config, log_level):
413
"""Initialize each worker process"""
414
global worker_state
415
416
# Setup logging for this process
417
logging.basicConfig(
418
level=log_level,
419
format=f'PID-{os.getpid()}: %(levelname)s - %(message)s'
420
)
421
logger = logging.getLogger(__name__)
422
423
# Initialize worker state
424
worker_state = {
425
'config': config,
426
'logger': logger,
427
'task_count': 0,
428
'process_id': os.getpid()
429
}
430
431
logger.info(f"Worker process {os.getpid()} initialized with config: {config}")
432
433
def worker_task_with_state(task_data):
434
"""Task that uses initialized worker state"""
435
global worker_state
436
437
worker_state['task_count'] += 1
438
logger = worker_state['logger']
439
440
logger.info(f"Processing task {worker_state['task_count']}: {task_data}")
441
442
# Simulate work using config
443
multiplier = worker_state['config'].get('multiplier', 1)
444
result = task_data * multiplier
445
446
# Simulate some processing time
447
import time
448
time.sleep(0.1)
449
450
logger.info(f"Task completed. Result: {result}")
451
452
return {
453
'input': task_data,
454
'result': result,
455
'task_number': worker_state['task_count'],
456
'process_id': worker_state['process_id']
457
}
458
459
# Create pool with worker initialization
460
config = {'multiplier': 3, 'timeout': 30}
461
462
pool = ProcessPool(
463
max_workers=3,
464
max_tasks=5, # Restart workers every 5 tasks
465
initializer=init_worker_process,
466
initargs=(config, logging.INFO)
467
)
468
469
try:
470
# Schedule multiple tasks to see worker behavior
471
tasks = list(range(1, 16)) # 15 tasks
472
futures = []
473
474
for task in tasks:
475
future = pool.schedule(worker_task_with_state, args=(task,))
476
futures.append(future)
477
478
# Collect results
479
results = []
480
for future in futures:
481
try:
482
result = future.result(timeout=10)
483
results.append(result)
484
except Exception as e:
485
print(f"Task failed: {e}")
486
487
# Print results showing worker process recycling
488
print(f"\nProcessed {len(results)} tasks:")
489
for result in results:
490
print(f" Task {result['task_number']} in PID {result['process_id']}: "
491
f"{result['input']} -> {result['result']}")
492
493
# Group by process ID to see worker recycling
494
by_process = {}
495
for result in results:
496
pid = result['process_id']
497
if pid not in by_process:
498
by_process[pid] = []
499
by_process[pid].append(result['task_number'])
500
501
print(f"\nTasks by worker process:")
502
for pid, task_numbers in by_process.items():
503
print(f" PID {pid}: tasks {task_numbers}")
504
505
finally:
506
pool.close()
507
pool.join()
508
```
509
510
### Error Handling and Recovery
511
512
Handle various error conditions specific to process-based execution:
513
514
```python
515
from pebble import ProcessPool, ProcessExpired
516
import signal
517
import time
518
import os
519
520
def normal_task(x):
521
return x * 2
522
523
def crashing_task():
524
# This will cause the process to crash
525
os._exit(1) # Immediate process termination
526
527
def hanging_task():
528
# This task will hang indefinitely
529
while True:
530
time.sleep(1)
531
532
def memory_intensive_task(size):
533
# This might run out of memory
534
big_list = [0] * size
535
return len(big_list)
536
537
def signal_task():
538
# This task will receive a signal
539
import signal
540
os.kill(os.getpid(), signal.SIGTERM)
541
542
# Comprehensive error handling
543
with ProcessPool(max_workers=4) as pool:
544
# Schedule various types of tasks
545
futures = {
546
'normal': pool.schedule(normal_task, args=(42,)),
547
'crashing': pool.schedule(crashing_task),
548
'hanging': pool.schedule(hanging_task),
549
'memory': pool.schedule(memory_intensive_task, args=(10**9,)), # Huge allocation
550
'timeout': pool.schedule(hanging_task, timeout=2.0), # Will timeout
551
'signal': pool.schedule(signal_task)
552
}
553
554
# Handle each type of error
555
for task_name, future in futures.items():
556
try:
557
if task_name == 'hanging':
558
# Don't wait for hanging task
559
result = future.result(timeout=1.0)
560
else:
561
result = future.result(timeout=10.0)
562
print(f"{task_name}: SUCCESS - {result}")
563
564
except TimeoutError:
565
print(f"{task_name}: TIMEOUT - Task exceeded time limit")
566
567
except ProcessExpired as e:
568
print(f"{task_name}: PROCESS DIED - PID: {e.pid}, Exit code: {e.exitcode}")
569
570
except MemoryError:
571
print(f"{task_name}: MEMORY ERROR - Not enough memory")
572
573
except OSError as e:
574
print(f"{task_name}: OS ERROR - {e}")
575
576
except Exception as e:
577
print(f"{task_name}: UNEXPECTED ERROR - {type(e).__name__}: {e}")
578
579
print("\nAll error handling completed")
580
```
581
582
### Advanced Pool Configuration
583
584
Configure pools for specific performance and reliability requirements:
585
586
```python
587
from pebble import ProcessPool
588
import multiprocessing
589
import time
590
591
# High-performance pool for CPU-intensive work
592
def create_high_performance_pool():
593
return ProcessPool(
594
max_workers=multiprocessing.cpu_count() * 2, # Oversubscribe for mixed workloads
595
max_tasks=0, # No worker recycling for maximum performance
596
context=multiprocessing.get_context('spawn') # Clean process creation
597
)
598
599
# Reliable pool with frequent worker recycling
600
def create_reliable_pool():
601
return ProcessPool(
602
max_workers=multiprocessing.cpu_count(),
603
max_tasks=10, # Recycle workers frequently to prevent memory leaks
604
context=multiprocessing.get_context('spawn')
605
)
606
607
# Memory-conscious pool
608
def create_memory_conscious_pool():
609
return ProcessPool(
610
max_workers=max(1, multiprocessing.cpu_count() // 2), # Fewer workers
611
max_tasks=5, # Frequent recycling to free memory
612
context=multiprocessing.get_context('spawn')
613
)
614
615
def benchmark_task(iterations):
616
"""Benchmark task for testing pool performance"""
617
import math
618
total = 0
619
for i in range(iterations):
620
total += math.sin(i) * math.cos(i)
621
return total
622
623
# Benchmark different pool configurations
624
configurations = {
625
'high_performance': create_high_performance_pool(),
626
'reliable': create_reliable_pool(),
627
'memory_conscious': create_memory_conscious_pool()
628
}
629
630
for config_name, pool in configurations.items():
631
print(f"\nTesting {config_name} configuration:")
632
633
start_time = time.time()
634
635
with pool:
636
# Submit benchmark tasks
637
futures = [
638
pool.schedule(benchmark_task, args=(100000,))
639
for _ in range(20)
640
]
641
642
# Wait for completion
643
results = [f.result() for f in futures]
644
645
end_time = time.time()
646
647
print(f" Completed {len(results)} tasks in {end_time - start_time:.2f} seconds")
648
print(f" Average result: {sum(results) / len(results):.2f}")
649
```