Python multiprocessing fork with improvements and bugfixes for distributed task processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced process pool implementation for parallel execution with timeout support, worker management, and enhanced error handling. Billiard's Pool class extends Python's standard multiprocessing.Pool with additional features for production environments.
Create process pools with extensive configuration options for timeout handling, worker management, and restart policies.
class Pool:
"""
A process pool object which controls worker processes to execute tasks.
"""
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, timeout=None, soft_timeout=None,
lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,
on_process_up=None, on_process_down=None, on_timeout_set=None,
on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,
allow_restart=False, synack=False, on_process_exit=None,
context=None, max_memory_per_child=None, enable_timeouts=False):
"""
Create a process pool.
Parameters:
- processes: number of worker processes (default: cpu_count())
- initializer: callable to run on worker startup
- initargs: arguments for initializer
- maxtasksperchild: tasks per worker before restart
- timeout: hard timeout for tasks (seconds)
- soft_timeout: soft timeout allowing cleanup (seconds)
- lost_worker_timeout: timeout for detecting lost workers
- max_restarts: maximum worker restarts
- max_restart_freq: restart frequency limit
- on_process_up: callback when worker starts
- on_process_down: callback when worker stops
- on_timeout_set: callback when timeout is set (job, soft_timeout, hard_timeout)
- on_timeout_cancel: callback when timeout is cancelled (job)
- threads: use threads for result handling
- semaphore: custom semaphore for task limiting
- putlocks: use locks for putting tasks
- allow_restart: allow pool restarts
- synack: enable synchronous acknowledgment mode for task cancellation
- on_process_exit: callback when process exits (pid, exitcode)
- context: multiprocessing context to use (default: None)
- max_memory_per_child: memory limit per child process in kilobytes
- enable_timeouts: explicitly enable timeout handling (default: False)
"""Usage example:
from billiard import Pool
import time
import signal
def init_worker():
"""Initialize worker process"""
print(f"Worker {os.getpid()} initialized")
# Ignore interrupt signals in worker
signal.signal(signal.SIGINT, signal.SIG_IGN)
def long_task(x):
"""Task that might take a while"""
time.sleep(x * 0.1) # Simulate work
return x * x
def worker_up_callback(pid):
print(f"Worker {pid} started")
def worker_down_callback(pid, exitcode):
print(f"Worker {pid} stopped with exit code {exitcode}")
# Create pool with advanced configuration
with Pool(
processes=4,
initializer=init_worker,
timeout=30, # Hard timeout: 30 seconds
soft_timeout=25, # Soft timeout: 25 seconds
maxtasksperchild=100, # Restart workers after 100 tasks
max_restarts=5, # Allow up to 5 worker restarts
on_process_up=worker_up_callback,
on_process_down=worker_down_callback,
allow_restart=True
) as pool:
# Submit tasks
numbers = list(range(20))
results = pool.map(long_task, numbers)
print(f"Results: {results}")Execute tasks synchronously with blocking calls that return results immediately.
def apply(self, func, args=(), kwds={}):
"""
Call func with arguments args and keyword arguments kwds.
Blocks until result is ready.
Parameters:
- func: callable to execute
- args: positional arguments
- kwds: keyword arguments
Returns:
Result of func(*args, **kwds)
"""
def map(self, func, iterable, chunksize=None):
"""
Apply func to each element of iterable, collecting results in a list.
Parameters:
- func: callable to apply
- iterable: sequence of arguments
- chunksize: size of chunks sent to workers
Returns:
List of results
"""
def starmap(self, func, iterable, chunksize=None):
"""
Like map() but arguments are unpacked from tuples.
Parameters:
- func: callable to apply
- iterable: sequence of argument tuples
- chunksize: size of chunks sent to workers
Returns:
List of results
"""Usage example:
from billiard import Pool
def add(a, b):
return a + b
def multiply(args):
x, y = args
return x * y
with Pool(processes=2) as pool:
# Apply single function call
result = pool.apply(add, (5, 3))
print(f"5 + 3 = {result}")
# Map function over sequence
numbers = [1, 2, 3, 4, 5]
squares = pool.map(lambda x: x**2, numbers)
print(f"Squares: {squares}")
# Starmap with argument tuples
pairs = [(2, 3), (4, 5), (6, 7)]
products = pool.starmap(multiply, pairs)
print(f"Products: {products}")Execute tasks asynchronously with non-blocking calls that return result objects for later retrieval.
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None,
accept_callback=None, timeout_callback=None, waitforslot=None,
soft_timeout=None, timeout=None, lost_worker_timeout=None,
callbacks_propagate=None, correlation_id=None):
"""
Asynchronous version of apply() method.
Parameters:
- func: callable to execute
- args: positional arguments
- kwds: keyword arguments
- callback: callable for successful results
- error_callback: callable for exceptions
- accept_callback: callable for task acceptance
- timeout_callback: callable for task timeout
- waitforslot: wait for available slot before submitting
- soft_timeout: task-specific soft timeout (seconds)
- timeout: task-specific hard timeout (seconds)
- lost_worker_timeout: worker loss timeout for this task
- callbacks_propagate: control error propagation through callbacks
- correlation_id: identifier for task correlation
Returns:
ApplyResult object
"""
def map_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):
"""
Asynchronous version of map() method.
Parameters:
- func: callable to apply
- iterable: sequence of arguments
- chunksize: size of chunks sent to workers
- callback: callable for successful results
- error_callback: callable for exceptions
Returns:
MapResult object
"""
def starmap_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):
"""
Asynchronous version of starmap() method.
Parameters:
- func: callable to apply
- iterable: sequence of argument tuples
- chunksize: size of chunks sent to workers
- callback: callable for successful results
- error_callback: callable for exceptions
Returns:
MapResult object
"""
def imap(self, func, iterable, chunksize=1):
"""
Lazy equivalent of map() returning an iterator.
Parameters:
- func: callable to apply
- iterable: sequence of arguments
- chunksize: size of chunks sent to workers
Returns:
Iterator yielding results
"""
def imap_unordered(self, func, iterable, chunksize=1):
"""
Like imap() but results may be returned in arbitrary order.
Parameters:
- func: callable to apply
- iterable: sequence of arguments
- chunksize: size of chunks sent to workers
Returns:
Iterator yielding results in arbitrary order
"""Usage example:
from billiard import Pool
import time
def slow_task(x):
time.sleep(0.1)
return x * x
def success_callback(result):
print(f"Task completed with result: {result}")
def error_callback(error):
print(f"Task failed with error: {error}")
with Pool(processes=4) as pool:
# Async apply
result = pool.apply_async(
slow_task,
(5,),
callback=success_callback,
error_callback=error_callback
)
# Continue other work while task runs
print("Task submitted, doing other work...")
time.sleep(0.05)
# Get result when ready
value = result.get(timeout=1)
print(f"Got result: {value}")
# Async map
numbers = list(range(10))
map_result = pool.map_async(slow_task, numbers)
# Use iterator for streaming results
for i, result in enumerate(pool.imap(slow_task, range(5))):
print(f"Streaming result {i}: {result}")Objects returned by asynchronous operations for result retrieval and status checking.
class ApplyResult:
"""
Result object returned by Pool.apply_async().
"""
def get(self, timeout=None):
"""
Return result when available.
Parameters:
- timeout: timeout in seconds (None for no timeout)
Returns:
Result value
Raises:
- TimeoutError: if timeout exceeded
- Exception: if task raised exception
"""
def wait(self, timeout=None):
"""
Wait until result is available.
Parameters:
- timeout: timeout in seconds (None for no timeout)
"""
def ready(self) -> bool:
"""Return whether result is ready."""
def successful(self) -> bool:
"""Return whether task completed successfully (only valid if ready())."""
def terminate(self, signum):
"""
Terminate the job.
Parameters:
- signum: signal number for termination
"""
class MapResult(ApplyResult):
"""
Result object returned by Pool.map_async() and related methods.
Extends ApplyResult with additional functionality for map operations.
"""Usage example:
from billiard import Pool, TimeoutError
def risky_task(x):
if x == 5:
raise ValueError("Five is not allowed!")
return x * 2
with Pool(processes=2) as pool:
# Submit multiple async tasks
results = []
for i in range(8):
result = pool.apply_async(risky_task, (i,))
results.append(result)
# Check results
for i, result in enumerate(results):
try:
if result.ready():
print(f"Task {i} ready: {result.successful()}")
value = result.get(timeout=0.1)
print(f"Task {i} result: {value}")
else:
print(f"Task {i} still running...")
result.wait(timeout=1)
value = result.get()
print(f"Task {i} completed: {value}")
except ValueError as e:
print(f"Task {i} failed: {e}")
except TimeoutError:
print(f"Task {i} timed out")Methods for controlling pool lifecycle, worker management, and resource cleanup.
def close(self):
"""
Prevent any more tasks being submitted to pool.
Outstanding work will complete before workers exit.
"""
def terminate(self):
"""
Stop worker processes immediately without completing outstanding work.
"""
def join(self):
"""
Wait for worker processes to exit. Must call close() or terminate() first.
"""
def restart(self):
"""
Restart the pool (requires allow_restart=True).
"""
def grow(self, n=1):
"""
Add n worker processes to the pool.
Parameters:
- n: number of workers to add
"""
def shrink(self, n=1):
"""
Remove n worker processes from the pool.
Parameters:
- n: number of workers to remove
"""
def terminate_job(self, pid, sig=None):
"""
Terminate a specific job by process ID.
Parameters:
- pid: process ID of worker to terminate
- sig: signal to send (default: SIGTERM)
"""
def maintain_pool(self):
"""
Maintain the pool by replacing dead workers.
"""
def send_ack(self, response, job, i, fd):
"""
Send acknowledgment response for a task (used with synack mode).
Parameters:
- response: acknowledgment response
- job: job being acknowledged
- i: job index
- fd: file descriptor for communication
"""
def did_start_ok(self) -> bool:
"""
Check if the pool started successfully by verifying no workers have exited.
Returns:
True if pool started successfully, False otherwise
"""
def on_job_ready(self, job, i, obj, inqW_fd):
"""
Hook method called when a job becomes ready for execution.
Parameters:
- job: the job object
- i: job index
- obj: job object data
- inqW_fd: input queue write file descriptor
"""
def handle_result_event(self, *args):
"""
Handle result events from the result handler.
Parameters:
- args: event arguments
"""
def cpu_count(self) -> int:
"""
Return the number of CPUs with fallback logic.
Returns:
Number of available CPUs
"""
@property
def process_sentinels(self) -> list:
"""
Return a list of process sentinel objects for monitoring worker processes.
Returns:
List of sentinel objects
"""Usage example:
from billiard import Pool
import time
import signal
def long_running_task(x):
time.sleep(10) # Very long task
return x
# Create pool with restart capability
pool = Pool(processes=4, allow_restart=True)
try:
# Submit some tasks
results = []
for i in range(8):
result = pool.apply_async(long_running_task, (i,))
results.append(result)
# Let some tasks start
time.sleep(1)
# Dynamically manage pool size
print("Growing pool...")
pool.grow(2) # Add 2 more workers
time.sleep(2)
print("Shrinking pool...")
pool.shrink(1) # Remove 1 worker
# Terminate specific job if needed
# pool.terminate_job(worker_pid, signal.SIGTERM)
# Option 1: Graceful shutdown
pool.close()
pool.join()
except KeyboardInterrupt:
# Option 2: Immediate shutdown
print("Terminating pool...")
pool.terminate()
pool.join()
# Option 3: Restart pool
# pool.restart()Install with Tessl CLI
npx tessl i tessl/pypi-billiard