0
# Process Pools
1
2
Advanced process pool implementation for parallel execution with timeout support, worker management, and enhanced error handling. Billiard's Pool class extends Python's standard multiprocessing.Pool with additional features for production environments.
3
4
## Capabilities
5
6
### Pool Creation and Configuration
7
8
Create process pools with extensive configuration options for timeout handling, worker management, and restart policies.
9
10
```python { .api }
11
class Pool:
12
"""
13
A process pool object which controls worker processes to execute tasks.
14
"""
15
def __init__(self, processes=None, initializer=None, initargs=(),
16
maxtasksperchild=None, timeout=None, soft_timeout=None,
17
lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,
18
on_process_up=None, on_process_down=None, on_timeout_set=None,
19
on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,
20
allow_restart=False, synack=False, on_process_exit=None,
21
context=None, max_memory_per_child=None, enable_timeouts=False):
22
"""
23
Create a process pool.
24
25
Parameters:
26
- processes: number of worker processes (default: cpu_count())
27
- initializer: callable to run on worker startup
28
- initargs: arguments for initializer
29
- maxtasksperchild: tasks per worker before restart
30
- timeout: hard timeout for tasks (seconds)
31
- soft_timeout: soft timeout allowing cleanup (seconds)
32
- lost_worker_timeout: timeout for detecting lost workers
33
- max_restarts: maximum worker restarts
34
- max_restart_freq: restart frequency limit
35
- on_process_up: callback when worker starts
36
- on_process_down: callback when worker stops
37
- on_timeout_set: callback when timeout is set (job, soft_timeout, hard_timeout)
38
- on_timeout_cancel: callback when timeout is cancelled (job)
39
- threads: use threads for result handling
40
- semaphore: custom semaphore for task limiting
41
- putlocks: use locks for putting tasks
42
- allow_restart: allow pool restarts
43
- synack: enable synchronous acknowledgment mode for task cancellation
44
- on_process_exit: callback when process exits (pid, exitcode)
45
- context: multiprocessing context to use (default: None)
46
- max_memory_per_child: memory limit per child process in kilobytes
47
- enable_timeouts: explicitly enable timeout handling (default: False)
48
"""
49
```
50
51
Usage example:
52
53
```python
54
from billiard import Pool
55
import time
56
import signal
57
58
def init_worker():
59
"""Initialize worker process"""
60
print(f"Worker {os.getpid()} initialized")
61
# Ignore interrupt signals in worker
62
signal.signal(signal.SIGINT, signal.SIG_IGN)
63
64
def long_task(x):
65
"""Task that might take a while"""
66
time.sleep(x * 0.1) # Simulate work
67
return x * x
68
69
def worker_up_callback(pid):
70
print(f"Worker {pid} started")
71
72
def worker_down_callback(pid, exitcode):
73
print(f"Worker {pid} stopped with exit code {exitcode}")
74
75
# Create pool with advanced configuration
76
with Pool(
77
processes=4,
78
initializer=init_worker,
79
timeout=30, # Hard timeout: 30 seconds
80
soft_timeout=25, # Soft timeout: 25 seconds
81
maxtasksperchild=100, # Restart workers after 100 tasks
82
max_restarts=5, # Allow up to 5 worker restarts
83
on_process_up=worker_up_callback,
84
on_process_down=worker_down_callback,
85
allow_restart=True
86
) as pool:
87
88
# Submit tasks
89
numbers = list(range(20))
90
results = pool.map(long_task, numbers)
91
print(f"Results: {results}")
92
```
93
94
### Synchronous Task Execution
95
96
Execute tasks synchronously with blocking calls that return results immediately.
97
98
```python { .api }
99
def apply(self, func, args=(), kwds={}):
100
"""
101
Call func with arguments args and keyword arguments kwds.
102
Blocks until result is ready.
103
104
Parameters:
105
- func: callable to execute
106
- args: positional arguments
107
- kwds: keyword arguments
108
109
Returns:
110
Result of func(*args, **kwds)
111
"""
112
113
def map(self, func, iterable, chunksize=None):
114
"""
115
Apply func to each element of iterable, collecting results in a list.
116
117
Parameters:
118
- func: callable to apply
119
- iterable: sequence of arguments
120
- chunksize: size of chunks sent to workers
121
122
Returns:
123
List of results
124
"""
125
126
def starmap(self, func, iterable, chunksize=None):
127
"""
128
Like map() but arguments are unpacked from tuples.
129
130
Parameters:
131
- func: callable to apply
132
- iterable: sequence of argument tuples
133
- chunksize: size of chunks sent to workers
134
135
Returns:
136
List of results
137
"""
138
```
139
140
Usage example:
141
142
```python
143
from billiard import Pool
144
145
def add(a, b):
146
return a + b
147
148
def multiply(args):
149
x, y = args
150
return x * y
151
152
with Pool(processes=2) as pool:
153
# Apply single function call
154
result = pool.apply(add, (5, 3))
155
print(f"5 + 3 = {result}")
156
157
# Map function over sequence
158
numbers = [1, 2, 3, 4, 5]
159
squares = pool.map(lambda x: x**2, numbers)
160
print(f"Squares: {squares}")
161
162
# Starmap with argument tuples
163
pairs = [(2, 3), (4, 5), (6, 7)]
164
products = pool.starmap(multiply, pairs)
165
print(f"Products: {products}")
166
```
167
168
### Asynchronous Task Execution
169
170
Execute tasks asynchronously with non-blocking calls that return result objects for later retrieval.
171
172
```python { .api }
173
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None,
174
accept_callback=None, timeout_callback=None, waitforslot=None,
175
soft_timeout=None, timeout=None, lost_worker_timeout=None,
176
callbacks_propagate=None, correlation_id=None):
177
"""
178
Asynchronous version of apply() method.
179
180
Parameters:
181
- func: callable to execute
182
- args: positional arguments
183
- kwds: keyword arguments
184
- callback: callable for successful results
185
- error_callback: callable for exceptions
186
- accept_callback: callable for task acceptance
187
- timeout_callback: callable for task timeout
188
- waitforslot: wait for available slot before submitting
189
- soft_timeout: task-specific soft timeout (seconds)
190
- timeout: task-specific hard timeout (seconds)
191
- lost_worker_timeout: worker loss timeout for this task
192
- callbacks_propagate: control error propagation through callbacks
193
- correlation_id: identifier for task correlation
194
195
Returns:
196
ApplyResult object
197
"""
198
199
def map_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):
200
"""
201
Asynchronous version of map() method.
202
203
Parameters:
204
- func: callable to apply
205
- iterable: sequence of arguments
206
- chunksize: size of chunks sent to workers
207
- callback: callable for successful results
208
- error_callback: callable for exceptions
209
210
Returns:
211
MapResult object
212
"""
213
214
def starmap_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):
215
"""
216
Asynchronous version of starmap() method.
217
218
Parameters:
219
- func: callable to apply
220
- iterable: sequence of argument tuples
221
- chunksize: size of chunks sent to workers
222
- callback: callable for successful results
223
- error_callback: callable for exceptions
224
225
Returns:
226
MapResult object
227
"""
228
229
def imap(self, func, iterable, chunksize=1):
230
"""
231
Lazy equivalent of map() returning an iterator.
232
233
Parameters:
234
- func: callable to apply
235
- iterable: sequence of arguments
236
- chunksize: size of chunks sent to workers
237
238
Returns:
239
Iterator yielding results
240
"""
241
242
def imap_unordered(self, func, iterable, chunksize=1):
243
"""
244
Like imap() but results may be returned in arbitrary order.
245
246
Parameters:
247
- func: callable to apply
248
- iterable: sequence of arguments
249
- chunksize: size of chunks sent to workers
250
251
Returns:
252
Iterator yielding results in arbitrary order
253
"""
254
```
255
256
Usage example:
257
258
```python
259
from billiard import Pool
260
import time
261
262
def slow_task(x):
263
time.sleep(0.1)
264
return x * x
265
266
def success_callback(result):
267
print(f"Task completed with result: {result}")
268
269
def error_callback(error):
270
print(f"Task failed with error: {error}")
271
272
with Pool(processes=4) as pool:
273
# Async apply
274
result = pool.apply_async(
275
slow_task,
276
(5,),
277
callback=success_callback,
278
error_callback=error_callback
279
)
280
281
# Continue other work while task runs
282
print("Task submitted, doing other work...")
283
time.sleep(0.05)
284
285
# Get result when ready
286
value = result.get(timeout=1)
287
print(f"Got result: {value}")
288
289
# Async map
290
numbers = list(range(10))
291
map_result = pool.map_async(slow_task, numbers)
292
293
# Use iterator for streaming results
294
for i, result in enumerate(pool.imap(slow_task, range(5))):
295
print(f"Streaming result {i}: {result}")
296
```
297
298
### Result Objects
299
300
Objects returned by asynchronous operations for result retrieval and status checking.
301
302
```python { .api }
303
class ApplyResult:
304
"""
305
Result object returned by Pool.apply_async().
306
"""
307
def get(self, timeout=None):
308
"""
309
Return result when available.
310
311
Parameters:
312
- timeout: timeout in seconds (None for no timeout)
313
314
Returns:
315
Result value
316
317
Raises:
318
- TimeoutError: if timeout exceeded
319
- Exception: if task raised exception
320
"""
321
322
def wait(self, timeout=None):
323
"""
324
Wait until result is available.
325
326
Parameters:
327
- timeout: timeout in seconds (None for no timeout)
328
"""
329
330
def ready(self) -> bool:
331
"""Return whether result is ready."""
332
333
def successful(self) -> bool:
334
"""Return whether task completed successfully (only valid if ready())."""
335
336
def terminate(self, signum):
337
"""
338
Terminate the job.
339
340
Parameters:
341
- signum: signal number for termination
342
"""
343
344
class MapResult(ApplyResult):
345
"""
346
Result object returned by Pool.map_async() and related methods.
347
Extends ApplyResult with additional functionality for map operations.
348
"""
349
```
350
351
Usage example:
352
353
```python
354
from billiard import Pool, TimeoutError
355
356
def risky_task(x):
357
if x == 5:
358
raise ValueError("Five is not allowed!")
359
return x * 2
360
361
with Pool(processes=2) as pool:
362
# Submit multiple async tasks
363
results = []
364
for i in range(8):
365
result = pool.apply_async(risky_task, (i,))
366
results.append(result)
367
368
# Check results
369
for i, result in enumerate(results):
370
try:
371
if result.ready():
372
print(f"Task {i} ready: {result.successful()}")
373
value = result.get(timeout=0.1)
374
print(f"Task {i} result: {value}")
375
else:
376
print(f"Task {i} still running...")
377
result.wait(timeout=1)
378
value = result.get()
379
print(f"Task {i} completed: {value}")
380
except ValueError as e:
381
print(f"Task {i} failed: {e}")
382
except TimeoutError:
383
print(f"Task {i} timed out")
384
```
385
386
### Pool Management
387
388
Methods for controlling pool lifecycle, worker management, and resource cleanup.
389
390
```python { .api }
391
def close(self):
392
"""
393
Prevent any more tasks being submitted to pool.
394
Outstanding work will complete before workers exit.
395
"""
396
397
def terminate(self):
398
"""
399
Stop worker processes immediately without completing outstanding work.
400
"""
401
402
def join(self):
403
"""
404
Wait for worker processes to exit. Must call close() or terminate() first.
405
"""
406
407
def restart(self):
408
"""
409
Restart the pool (requires allow_restart=True).
410
"""
411
412
def grow(self, n=1):
413
"""
414
Add n worker processes to the pool.
415
416
Parameters:
417
- n: number of workers to add
418
"""
419
420
def shrink(self, n=1):
421
"""
422
Remove n worker processes from the pool.
423
424
Parameters:
425
- n: number of workers to remove
426
"""
427
428
def terminate_job(self, pid, sig=None):
429
"""
430
Terminate a specific job by process ID.
431
432
Parameters:
433
- pid: process ID of worker to terminate
434
- sig: signal to send (default: SIGTERM)
435
"""
436
437
def maintain_pool(self):
438
"""
439
Maintain the pool by replacing dead workers.
440
"""
441
442
def send_ack(self, response, job, i, fd):
443
"""
444
Send acknowledgment response for a task (used with synack mode).
445
446
Parameters:
447
- response: acknowledgment response
448
- job: job being acknowledged
449
- i: job index
450
- fd: file descriptor for communication
451
"""
452
453
def did_start_ok(self) -> bool:
454
"""
455
Check if the pool started successfully by verifying no workers have exited.
456
457
Returns:
458
True if pool started successfully, False otherwise
459
"""
460
461
def on_job_ready(self, job, i, obj, inqW_fd):
462
"""
463
Hook method called when a job becomes ready for execution.
464
465
Parameters:
466
- job: the job object
467
- i: job index
468
- obj: job object data
469
- inqW_fd: input queue write file descriptor
470
"""
471
472
def handle_result_event(self, *args):
473
"""
474
Handle result events from the result handler.
475
476
Parameters:
477
- args: event arguments
478
"""
479
480
def cpu_count(self) -> int:
481
"""
482
Return the number of CPUs with fallback logic.
483
484
Returns:
485
Number of available CPUs
486
"""
487
488
@property
489
def process_sentinels(self) -> list:
490
"""
491
Return a list of process sentinel objects for monitoring worker processes.
492
493
Returns:
494
List of sentinel objects
495
"""
496
```
497
498
Usage example:
499
500
```python
501
from billiard import Pool
502
import time
503
import signal
504
505
def long_running_task(x):
506
time.sleep(10) # Very long task
507
return x
508
509
# Create pool with restart capability
510
pool = Pool(processes=4, allow_restart=True)
511
512
try:
513
# Submit some tasks
514
results = []
515
for i in range(8):
516
result = pool.apply_async(long_running_task, (i,))
517
results.append(result)
518
519
# Let some tasks start
520
time.sleep(1)
521
522
# Dynamically manage pool size
523
print("Growing pool...")
524
pool.grow(2) # Add 2 more workers
525
526
time.sleep(2)
527
528
print("Shrinking pool...")
529
pool.shrink(1) # Remove 1 worker
530
531
# Terminate specific job if needed
532
# pool.terminate_job(worker_pid, signal.SIGTERM)
533
534
# Option 1: Graceful shutdown
535
pool.close()
536
pool.join()
537
538
except KeyboardInterrupt:
539
# Option 2: Immediate shutdown
540
print("Terminating pool...")
541
pool.terminate()
542
pool.join()
543
544
# Option 3: Restart pool
545
# pool.restart()
546
```