Python multiprocessing fork with improvements and bugfixes for distributed task processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core process creation, lifecycle management, and process introspection functionality. Billiard provides an enhanced Process class with additional features for worker management and timeout handling.
Create and manage individual processes with support for daemon processes, process groups, and enhanced lifecycle control.
class Process:
"""
Process objects represent activity running in a separate process.
"""
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
"""
Create a new Process object.
Parameters:
- group: reserved for future extension
- target: callable object to invoke by run() method
- name: process name (string)
- args: argument tuple for target invocation
- kwargs: keyword arguments for target invocation
- daemon: whether process is daemon (inherits from parent if None)
"""
def start(self):
"""Start the process's activity."""
def run(self):
"""Method representing the process's activity (override in subclasses)."""
def join(self, timeout=None):
"""
Wait until process terminates.
Parameters:
- timeout: timeout in seconds (None for no timeout)
"""
def terminate(self):
"""Terminate the process using SIGTERM."""
def terminate_controlled(self):
"""Controlled termination with cleanup."""
def is_alive(self) -> bool:
"""Return whether process is alive."""
def close(self):
"""Close the process object and release resources."""
@property
def name(self) -> str:
"""Process name."""
@name.setter
def name(self, value: str):
"""Set process name."""
@property
def daemon(self) -> bool:
"""Daemon flag."""
@daemon.setter
def daemon(self, value: bool):
"""Set daemon flag."""
@property
def pid(self) -> int:
"""Process ID (None if not started)."""
@property
def ident(self) -> int:
"""Alias for pid."""
@property
def exitcode(self) -> int:
"""Exit code (None if not terminated)."""
@property
def authkey(self) -> bytes:
"""Authentication key."""
@authkey.setter
def authkey(self, value: bytes):
"""Set authentication key."""
@property
def sentinel(self) -> int:
"""File descriptor for waiting."""Usage example:
from billiard import Process
import time
import os
def worker_function(name, duration):
print(f"Worker {name} (PID: {os.getpid()}) starting")
time.sleep(duration)
print(f"Worker {name} finished")
# Create and start process
process = Process(target=worker_function, args=("Worker-1", 2))
process.name = "MyWorker"
process.daemon = False
process.start()
print(f"Started process {process.name} with PID {process.pid}")
# Wait for completion
process.join(timeout=5)
if process.is_alive():
print("Process is still running, terminating...")
process.terminate()
process.join()
print(f"Process exit code: {process.exitcode}")
process.close()Functions to inspect the current process and active child processes.
def current_process() -> Process:
"""
Return Process object representing the current process.
"""
def active_children() -> list[Process]:
"""
Return list of all alive child processes of current process.
Calling this has the side effect of joining any processes that have finished.
"""
def cpu_count() -> int:
"""
Return number of CPUs in the system.
Raises:
- NotImplementedError: if number of CPUs cannot be determined
"""Usage example:
from billiard import Process, current_process, active_children, cpu_count
def worker_task():
current = current_process()
print(f"Worker: {current.name} (PID: {current.pid})")
if __name__ == '__main__':
# Get system information
print(f"System has {cpu_count()} CPUs")
# Current process info
main_process = current_process()
print(f"Main process: {main_process.name} (PID: {main_process.pid})")
# Start some workers
processes = []
for i in range(3):
p = Process(target=worker_task, name=f"Worker-{i}")
p.start()
processes.append(p)
# Check active children
children = active_children()
print(f"Active children: {len(children)}")
# Wait for completion
for p in processes:
p.join()
# Check again
children = active_children()
print(f"Active children after join: {len(children)}")Secure process authentication using authentication keys.
class AuthenticationString(bytes):
"""
A string-like object which can be passed to multiprocessing functions
that accept authentication keys.
"""
def __reduce__(self):
"""Custom pickling to maintain security."""Usage example:
from billiard import Process, current_process
import os
def secure_worker():
process = current_process()
print(f"Worker authkey: {process.authkey[:8]}...")
if __name__ == '__main__':
# Set custom authentication key
custom_key = os.urandom(32)
process = Process(target=secure_worker)
process.authkey = custom_key
process.start()
process.join()SUBDEBUG: int = 5
SUBWARNING: int = 25These constants define additional logging levels used by billiard for detailed process debugging and warnings.
Install with Tessl CLI
npx tessl i tessl/pypi-billiard