Python multiprocessing fork with improvements and bugfixes for distributed task processing
npx @tessl/cli install tessl/pypi-billiard@4.2.0A fork of Python's multiprocessing package providing enhanced functionality for distributed task processing. Billiard offers improved process pools with timeouts, enhanced error handling, worker management, and specialized features for task queue systems like Celery.
pip install billiardimport billiardCommon imports for different components:
from billiard import Process, Pool, Queue, Lock, Event
from billiard import current_process, active_children, cpu_countimport billiard as mp
from billiard import Process, Pool, Queue
# Create and start processes
def worker_task(name, result_queue):
result = f"Hello from {name}"
result_queue.put(result)
if __name__ == '__main__':
# Create a queue for results
queue = Queue()
# Create and start processes
processes = []
for i in range(3):
p = Process(target=worker_task, args=(f"worker-{i}", queue))
p.start()
processes.append(p)
# Collect results
results = []
for _ in processes:
results.append(queue.get())
# Wait for completion
for p in processes:
p.join()
print("Results:", results)
# Use process pool for parallel execution
def square(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
numbers = [1, 2, 3, 4, 5]
squared = pool.map(square, numbers)
print("Squared:", squared)Billiard extends Python's multiprocessing architecture with several key enhancements:
The package maintains API compatibility with Python's standard multiprocessing module while providing additional stability, performance optimizations, and features needed for production-scale distributed computing applications.
Core process creation, lifecycle management, and process introspection functionality.
class Process:
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None): ...
def start(self): ...
def join(self, timeout=None): ...
def terminate(self): ...
def is_alive(self) -> bool: ...
def current_process() -> Process: ...
def active_children() -> list[Process]: ...
def cpu_count() -> int: ...Advanced process pool for parallel execution with timeout support, worker management, and enhanced error handling.
class Pool:
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, timeout=None, soft_timeout=None,
lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,
on_process_up=None, on_process_down=None, on_timeout_set=None,
on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,
allow_restart=False, synack=False, on_process_exit=None,
context=None, max_memory_per_child=None, enable_timeouts=False): ...
def apply(self, func, args=(), kwds={}): ...
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): ...
def map(self, func, iterable, chunksize=None): ...
def close(self): ...
def terminate(self): ...
def join(self): ...Thread-safe queues for inter-process communication using pipes, with task completion tracking support.
class Queue:
def __init__(self, maxsize=0): ...
def put(self, obj, block=True, timeout=None): ...
def get(self, block=True, timeout=None): ...
def qsize(self) -> int: ...
def empty(self) -> bool: ...
class JoinableQueue(Queue):
def task_done(self): ...
def join(self): ...Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes.
class Lock:
def acquire(self, block=True, timeout=None) -> bool: ...
def release(self): ...
class Event:
def set(self): ...
def clear(self): ...
def is_set(self) -> bool: ...
def wait(self, timeout=None) -> bool: ...
class Semaphore:
def __init__(self, value=1): ...
def acquire(self, block=True, timeout=None) -> bool: ...
def release(self): ...Inter-process communication through pipes and connections with support for both object and byte-level messaging.
def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]: ...
class Connection:
def send(self, obj): ...
def recv(self): ...
def send_bytes(self, buf, offset=0, size=None): ...
def recv_bytes(self, maxlength=None) -> bytes: ...
def poll(self, timeout=None) -> bool: ...
def close(self): ...Synchronized and unsynchronized shared memory objects for efficient data sharing between processes.
def Value(typecode_or_type, *args, lock=True) -> SynchronizedBase: ...
def Array(typecode_or_type, size_or_initializer, lock=True) -> SynchronizedArray: ...
def RawValue(typecode_or_type, *args): ...
def RawArray(typecode_or_type, size_or_initializer): ...Shared object managers for creating and managing shared objects across multiple processes.
def Manager() -> SyncManager: ...
class SyncManager:
def start(self): ...
def shutdown(self): ...
def dict(self) -> dict: ...
def list(self) -> list: ...
def Queue(self) -> Queue: ...
def Lock(self) -> Lock: ...Process context configuration for controlling process start methods and execution environments.
def get_context(method=None) -> BaseContext: ...
def set_start_method(method, force=False): ...
def get_start_method(allow_none=False) -> str: ...
def get_all_start_methods() -> list[str]: ...Additional utility functions for platform support and configuration.
def freeze_support(): ...
def get_logger(): ...
def log_to_stderr(level=None): ...
def allow_connection_pickling(): ...
def set_executable(executable): ...
def set_forkserver_preload(module_names): ...
def soft_timeout_sighandler(signum, frame): ...Platform support functions:
class ProcessError(Exception): ...
class TimeoutError(ProcessError): ...
class AuthenticationError(ProcessError): ...
class BufferTooShort(ProcessError): ...
class TimeLimitExceeded(Exception): ...
class SoftTimeLimitExceeded(Exception): ...
class WorkerLostError(Exception): ...
class Terminated(Exception): ...
class RestartFreqExceeded(Exception): ...
class CoroStop(Exception): ...Common exceptions: