CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

process-pools.mddocs/

Process Pools

Advanced process pool implementation for parallel execution with timeout support, worker management, and enhanced error handling. Billiard's Pool class extends Python's standard multiprocessing.Pool with additional features for production environments.

Capabilities

Pool Creation and Configuration

Create process pools with extensive configuration options for timeout handling, worker management, and restart policies.

class Pool:
    """
    A process pool object which controls worker processes to execute tasks.
    """
    def __init__(self, processes=None, initializer=None, initargs=(), 
                 maxtasksperchild=None, timeout=None, soft_timeout=None,
                 lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,
                 on_process_up=None, on_process_down=None, on_timeout_set=None,
                 on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,
                 allow_restart=False, synack=False, on_process_exit=None, 
                 context=None, max_memory_per_child=None, enable_timeouts=False):
        """
        Create a process pool.
        
        Parameters:
        - processes: number of worker processes (default: cpu_count())
        - initializer: callable to run on worker startup
        - initargs: arguments for initializer
        - maxtasksperchild: tasks per worker before restart
        - timeout: hard timeout for tasks (seconds)
        - soft_timeout: soft timeout allowing cleanup (seconds)
        - lost_worker_timeout: timeout for detecting lost workers
        - max_restarts: maximum worker restarts
        - max_restart_freq: restart frequency limit
        - on_process_up: callback when worker starts
        - on_process_down: callback when worker stops
        - on_timeout_set: callback when timeout is set (job, soft_timeout, hard_timeout)
        - on_timeout_cancel: callback when timeout is cancelled (job)
        - threads: use threads for result handling
        - semaphore: custom semaphore for task limiting
        - putlocks: use locks for putting tasks
        - allow_restart: allow pool restarts
        - synack: enable synchronous acknowledgment mode for task cancellation
        - on_process_exit: callback when process exits (pid, exitcode)
        - context: multiprocessing context to use (default: None)
        - max_memory_per_child: memory limit per child process in kilobytes
        - enable_timeouts: explicitly enable timeout handling (default: False)
        """

Usage example:

from billiard import Pool
import time
import signal

def init_worker():
    """Initialize worker process"""
    print(f"Worker {os.getpid()} initialized")
    # Ignore interrupt signals in worker
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def long_task(x):
    """Task that might take a while"""
    time.sleep(x * 0.1)  # Simulate work
    return x * x

def worker_up_callback(pid):
    print(f"Worker {pid} started")

def worker_down_callback(pid, exitcode):
    print(f"Worker {pid} stopped with exit code {exitcode}")

# Create pool with advanced configuration
with Pool(
    processes=4,
    initializer=init_worker,
    timeout=30,              # Hard timeout: 30 seconds
    soft_timeout=25,         # Soft timeout: 25 seconds  
    maxtasksperchild=100,    # Restart workers after 100 tasks
    max_restarts=5,          # Allow up to 5 worker restarts
    on_process_up=worker_up_callback,
    on_process_down=worker_down_callback,
    allow_restart=True
) as pool:
    
    # Submit tasks
    numbers = list(range(20))
    results = pool.map(long_task, numbers)
    print(f"Results: {results}")

Synchronous Task Execution

Execute tasks synchronously with blocking calls that return results immediately.

def apply(self, func, args=(), kwds={}):
    """
    Call func with arguments args and keyword arguments kwds.
    Blocks until result is ready.
    
    Parameters:
    - func: callable to execute
    - args: positional arguments
    - kwds: keyword arguments
    
    Returns:
    Result of func(*args, **kwds)
    """

def map(self, func, iterable, chunksize=None):
    """
    Apply func to each element of iterable, collecting results in a list.
    
    Parameters:
    - func: callable to apply
    - iterable: sequence of arguments
    - chunksize: size of chunks sent to workers
    
    Returns:
    List of results
    """

def starmap(self, func, iterable, chunksize=None):
    """
    Like map() but arguments are unpacked from tuples.
    
    Parameters:
    - func: callable to apply
    - iterable: sequence of argument tuples
    - chunksize: size of chunks sent to workers
    
    Returns:
    List of results
    """

Usage example:

from billiard import Pool

def add(a, b):
    return a + b

def multiply(args):
    x, y = args
    return x * y

with Pool(processes=2) as pool:
    # Apply single function call
    result = pool.apply(add, (5, 3))
    print(f"5 + 3 = {result}")
    
    # Map function over sequence
    numbers = [1, 2, 3, 4, 5]
    squares = pool.map(lambda x: x**2, numbers)
    print(f"Squares: {squares}")
    
    # Starmap with argument tuples
    pairs = [(2, 3), (4, 5), (6, 7)]
    products = pool.starmap(multiply, pairs)
    print(f"Products: {products}")

Asynchronous Task Execution

Execute tasks asynchronously with non-blocking calls that return result objects for later retrieval.

def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None,
                accept_callback=None, timeout_callback=None, waitforslot=None,
                soft_timeout=None, timeout=None, lost_worker_timeout=None,
                callbacks_propagate=None, correlation_id=None):
    """
    Asynchronous version of apply() method.
    
    Parameters:
    - func: callable to execute
    - args: positional arguments
    - kwds: keyword arguments
    - callback: callable for successful results
    - error_callback: callable for exceptions
    - accept_callback: callable for task acceptance
    - timeout_callback: callable for task timeout
    - waitforslot: wait for available slot before submitting
    - soft_timeout: task-specific soft timeout (seconds)
    - timeout: task-specific hard timeout (seconds)
    - lost_worker_timeout: worker loss timeout for this task
    - callbacks_propagate: control error propagation through callbacks
    - correlation_id: identifier for task correlation
    
    Returns:
    ApplyResult object
    """

def map_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):
    """
    Asynchronous version of map() method.
    
    Parameters:
    - func: callable to apply
    - iterable: sequence of arguments
    - chunksize: size of chunks sent to workers
    - callback: callable for successful results
    - error_callback: callable for exceptions
    
    Returns:
    MapResult object
    """

def starmap_async(self, func, iterable, chunksize=None, callback=None, error_callback=None):
    """
    Asynchronous version of starmap() method.
    
    Parameters:
    - func: callable to apply
    - iterable: sequence of argument tuples
    - chunksize: size of chunks sent to workers
    - callback: callable for successful results
    - error_callback: callable for exceptions
    
    Returns:
    MapResult object
    """

def imap(self, func, iterable, chunksize=1):
    """
    Lazy equivalent of map() returning an iterator.
    
    Parameters:
    - func: callable to apply
    - iterable: sequence of arguments
    - chunksize: size of chunks sent to workers
    
    Returns:
    Iterator yielding results
    """

def imap_unordered(self, func, iterable, chunksize=1):
    """
    Like imap() but results may be returned in arbitrary order.
    
    Parameters:
    - func: callable to apply
    - iterable: sequence of arguments
    - chunksize: size of chunks sent to workers
    
    Returns:
    Iterator yielding results in arbitrary order
    """

Usage example:

from billiard import Pool
import time

def slow_task(x):
    time.sleep(0.1)
    return x * x

def success_callback(result):
    print(f"Task completed with result: {result}")

def error_callback(error):
    print(f"Task failed with error: {error}")

with Pool(processes=4) as pool:
    # Async apply
    result = pool.apply_async(
        slow_task, 
        (5,), 
        callback=success_callback,
        error_callback=error_callback
    )
    
    # Continue other work while task runs
    print("Task submitted, doing other work...")
    time.sleep(0.05)
    
    # Get result when ready
    value = result.get(timeout=1)
    print(f"Got result: {value}")
    
    # Async map
    numbers = list(range(10))
    map_result = pool.map_async(slow_task, numbers)
    
    # Use iterator for streaming results
    for i, result in enumerate(pool.imap(slow_task, range(5))):
        print(f"Streaming result {i}: {result}")

Result Objects

Objects returned by asynchronous operations for result retrieval and status checking.

class ApplyResult:
    """
    Result object returned by Pool.apply_async().
    """
    def get(self, timeout=None):
        """
        Return result when available.
        
        Parameters:
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        Result value
        
        Raises:
        - TimeoutError: if timeout exceeded
        - Exception: if task raised exception
        """
    
    def wait(self, timeout=None):
        """
        Wait until result is available.
        
        Parameters:
        - timeout: timeout in seconds (None for no timeout)
        """
    
    def ready(self) -> bool:
        """Return whether result is ready."""
    
    def successful(self) -> bool:
        """Return whether task completed successfully (only valid if ready())."""
    
    def terminate(self, signum):
        """
        Terminate the job.
        
        Parameters:
        - signum: signal number for termination
        """

class MapResult(ApplyResult):
    """
    Result object returned by Pool.map_async() and related methods.
    Extends ApplyResult with additional functionality for map operations.
    """

Usage example:

from billiard import Pool, TimeoutError

def risky_task(x):
    if x == 5:
        raise ValueError("Five is not allowed!")
    return x * 2

with Pool(processes=2) as pool:
    # Submit multiple async tasks
    results = []
    for i in range(8):
        result = pool.apply_async(risky_task, (i,))
        results.append(result)
    
    # Check results
    for i, result in enumerate(results):
        try:
            if result.ready():
                print(f"Task {i} ready: {result.successful()}")
                value = result.get(timeout=0.1)
                print(f"Task {i} result: {value}")
            else:
                print(f"Task {i} still running...")
                result.wait(timeout=1)
                value = result.get()
                print(f"Task {i} completed: {value}")
        except ValueError as e:
            print(f"Task {i} failed: {e}")
        except TimeoutError:
            print(f"Task {i} timed out")

Pool Management

Methods for controlling pool lifecycle, worker management, and resource cleanup.

def close(self):
    """
    Prevent any more tasks being submitted to pool.
    Outstanding work will complete before workers exit.
    """

def terminate(self):
    """
    Stop worker processes immediately without completing outstanding work.
    """

def join(self):
    """
    Wait for worker processes to exit. Must call close() or terminate() first.
    """

def restart(self):
    """
    Restart the pool (requires allow_restart=True).
    """

def grow(self, n=1):
    """
    Add n worker processes to the pool.
    
    Parameters:
    - n: number of workers to add
    """

def shrink(self, n=1):
    """
    Remove n worker processes from the pool.
    
    Parameters:
    - n: number of workers to remove
    """

def terminate_job(self, pid, sig=None):
    """
    Terminate a specific job by process ID.
    
    Parameters:
    - pid: process ID of worker to terminate
    - sig: signal to send (default: SIGTERM)
    """

def maintain_pool(self):
    """
    Maintain the pool by replacing dead workers.
    """

def send_ack(self, response, job, i, fd):
    """
    Send acknowledgment response for a task (used with synack mode).
    
    Parameters:
    - response: acknowledgment response
    - job: job being acknowledged
    - i: job index
    - fd: file descriptor for communication
    """

def did_start_ok(self) -> bool:
    """
    Check if the pool started successfully by verifying no workers have exited.
    
    Returns:
    True if pool started successfully, False otherwise
    """

def on_job_ready(self, job, i, obj, inqW_fd):
    """
    Hook method called when a job becomes ready for execution.
    
    Parameters:
    - job: the job object
    - i: job index
    - obj: job object data
    - inqW_fd: input queue write file descriptor
    """

def handle_result_event(self, *args):
    """
    Handle result events from the result handler.
    
    Parameters:
    - args: event arguments
    """

def cpu_count(self) -> int:
    """
    Return the number of CPUs with fallback logic.
    
    Returns:
    Number of available CPUs
    """

@property
def process_sentinels(self) -> list:
    """
    Return a list of process sentinel objects for monitoring worker processes.
    
    Returns:
    List of sentinel objects
    """

Usage example:

from billiard import Pool
import time
import signal

def long_running_task(x):
    time.sleep(10)  # Very long task
    return x

# Create pool with restart capability
pool = Pool(processes=4, allow_restart=True)

try:
    # Submit some tasks
    results = []
    for i in range(8):
        result = pool.apply_async(long_running_task, (i,))
        results.append(result)
    
    # Let some tasks start
    time.sleep(1)
    
    # Dynamically manage pool size
    print("Growing pool...")
    pool.grow(2)  # Add 2 more workers
    
    time.sleep(2)
    
    print("Shrinking pool...")
    pool.shrink(1)  # Remove 1 worker
    
    # Terminate specific job if needed
    # pool.terminate_job(worker_pid, signal.SIGTERM)
    
    # Option 1: Graceful shutdown
    pool.close()
    pool.join()
    
except KeyboardInterrupt:
    # Option 2: Immediate shutdown
    print("Terminating pool...")
    pool.terminate()
    pool.join()

# Option 3: Restart pool
# pool.restart()

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json