or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

communication.mdcontext-management.mdindex.mdmanagers.mdprocess-management.mdprocess-pools.mdqueues.mdshared-memory.mdsynchronization.md
tile.json

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/billiard@4.2.x

To install, run

npx @tessl/cli install tessl/pypi-billiard@4.2.0

index.mddocs/

Billiard

A fork of Python's multiprocessing package providing enhanced functionality for distributed task processing. Billiard offers improved process pools with timeouts, enhanced error handling, worker management, and specialized features for task queue systems like Celery.

Package Information

  • Package Name: billiard
  • Language: Python
  • Installation: pip install billiard
  • Version: 4.2.1

Core Imports

import billiard

Common imports for different components:

from billiard import Process, Pool, Queue, Lock, Event
from billiard import current_process, active_children, cpu_count

Basic Usage

import billiard as mp
from billiard import Process, Pool, Queue

# Create and start processes
def worker_task(name, result_queue):
    result = f"Hello from {name}"
    result_queue.put(result)

if __name__ == '__main__':
    # Create a queue for results
    queue = Queue()
    
    # Create and start processes
    processes = []
    for i in range(3):
        p = Process(target=worker_task, args=(f"worker-{i}", queue))
        p.start()
        processes.append(p)
    
    # Collect results
    results = []
    for _ in processes:
        results.append(queue.get())
    
    # Wait for completion
    for p in processes:
        p.join()
    
    print("Results:", results)

# Use process pool for parallel execution
def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        numbers = [1, 2, 3, 4, 5]
        squared = pool.map(square, numbers)
        print("Squared:", squared)

Architecture

Billiard extends Python's multiprocessing architecture with several key enhancements:

  • Enhanced Process Pool: Advanced pool implementation with timeouts, restart capabilities, and worker monitoring
  • Robust Error Handling: Comprehensive exception hierarchy including worker loss detection and time limit management
  • Multiple Start Methods: Support for fork, spawn, and forkserver process creation methods
  • Context Management: Configurable process contexts for different execution environments
  • Celery Integration: Specialized features optimized for distributed task processing systems

The package maintains API compatibility with Python's standard multiprocessing module while providing additional stability, performance optimizations, and features needed for production-scale distributed computing applications.

Capabilities

Process Management

Core process creation, lifecycle management, and process introspection functionality.

class Process:
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None): ...
    def start(self): ...
    def join(self, timeout=None): ...
    def terminate(self): ...
    def is_alive(self) -> bool: ...

def current_process() -> Process: ...
def active_children() -> list[Process]: ...
def cpu_count() -> int: ...

Process Management

Process Pools

Advanced process pool for parallel execution with timeout support, worker management, and enhanced error handling.

class Pool:
    def __init__(self, processes=None, initializer=None, initargs=(), 
                 maxtasksperchild=None, timeout=None, soft_timeout=None,
                 lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,
                 on_process_up=None, on_process_down=None, on_timeout_set=None,
                 on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,
                 allow_restart=False, synack=False, on_process_exit=None, 
                 context=None, max_memory_per_child=None, enable_timeouts=False): ...
    def apply(self, func, args=(), kwds={}): ...
    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): ...
    def map(self, func, iterable, chunksize=None): ...
    def close(self): ...
    def terminate(self): ...
    def join(self): ...

Process Pools

Queues

Thread-safe queues for inter-process communication using pipes, with task completion tracking support.

class Queue:
    def __init__(self, maxsize=0): ...
    def put(self, obj, block=True, timeout=None): ...
    def get(self, block=True, timeout=None): ...
    def qsize(self) -> int: ...
    def empty(self) -> bool: ...

class JoinableQueue(Queue):
    def task_done(self): ...
    def join(self): ...

Queues

Synchronization

Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes.

class Lock:
    def acquire(self, block=True, timeout=None) -> bool: ...
    def release(self): ...

class Event:
    def set(self): ...
    def clear(self): ...
    def is_set(self) -> bool: ...
    def wait(self, timeout=None) -> bool: ...

class Semaphore:
    def __init__(self, value=1): ...
    def acquire(self, block=True, timeout=None) -> bool: ...
    def release(self): ...

Synchronization

Communication

Inter-process communication through pipes and connections with support for both object and byte-level messaging.

def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]: ...

class Connection:
    def send(self, obj): ...
    def recv(self): ...
    def send_bytes(self, buf, offset=0, size=None): ...
    def recv_bytes(self, maxlength=None) -> bytes: ...
    def poll(self, timeout=None) -> bool: ...
    def close(self): ...

Communication

Shared Memory

Synchronized and unsynchronized shared memory objects for efficient data sharing between processes.

def Value(typecode_or_type, *args, lock=True) -> SynchronizedBase: ...
def Array(typecode_or_type, size_or_initializer, lock=True) -> SynchronizedArray: ...
def RawValue(typecode_or_type, *args): ...
def RawArray(typecode_or_type, size_or_initializer): ...

Shared Memory

Managers

Shared object managers for creating and managing shared objects across multiple processes.

def Manager() -> SyncManager: ...

class SyncManager:
    def start(self): ...
    def shutdown(self): ...
    def dict(self) -> dict: ...
    def list(self) -> list: ...
    def Queue(self) -> Queue: ...
    def Lock(self) -> Lock: ...

Managers

Context Management

Process context configuration for controlling process start methods and execution environments.

def get_context(method=None) -> BaseContext: ...
def set_start_method(method, force=False): ...
def get_start_method(allow_none=False) -> str: ...
def get_all_start_methods() -> list[str]: ...

Context Management

Utility Functions

Additional utility functions for platform support and configuration.

def freeze_support(): ...
def get_logger(): ...
def log_to_stderr(level=None): ...
def allow_connection_pickling(): ...
def set_executable(executable): ...
def set_forkserver_preload(module_names): ...
def soft_timeout_sighandler(signum, frame): ...

Platform support functions:

  • freeze_support(): Windows frozen executable support
  • get_logger(): Get billiard logger
  • log_to_stderr(): Enable stderr logging
  • allow_connection_pickling(): Enable connection pickling
  • set_executable(): Set Python executable path
  • set_forkserver_preload(): Set forkserver preload modules
  • soft_timeout_sighandler(): Signal handler that raises SoftTimeLimitExceeded

Exception Types

class ProcessError(Exception): ...
class TimeoutError(ProcessError): ...
class AuthenticationError(ProcessError): ...
class BufferTooShort(ProcessError): ...
class TimeLimitExceeded(Exception): ...
class SoftTimeLimitExceeded(Exception): ...
class WorkerLostError(Exception): ...
class Terminated(Exception): ...
class RestartFreqExceeded(Exception): ...
class CoroStop(Exception): ...

Common exceptions:

  • ProcessError: Base exception for process-related errors
  • TimeoutError: Operation exceeded timeout limit (subclass of ProcessError)
  • AuthenticationError: Authentication failed (subclass of ProcessError)
  • BufferTooShort: Buffer too short for message (subclass of ProcessError)
  • TimeLimitExceeded: Hard time limit exceeded (immediate termination)
  • SoftTimeLimitExceeded: Soft time limit exceeded (allows cleanup)
  • WorkerLostError: Worker process died unexpectedly
  • Terminated: Worker processing job terminated by user request
  • RestartFreqExceeded: Worker restarts happening too frequently
  • CoroStop: Coroutine exit (distinct from StopIteration)