CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

process-management.mddocs/

Process Management

Core process creation, lifecycle management, and process introspection functionality. Billiard provides an enhanced Process class with additional features for worker management and timeout handling.

Capabilities

Process Creation and Management

Create and manage individual processes with support for daemon processes, process groups, and enhanced lifecycle control.

class Process:
    """
    Process objects represent activity running in a separate process.
    """
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
        """
        Create a new Process object.
        
        Parameters:
        - group: reserved for future extension
        - target: callable object to invoke by run() method
        - name: process name (string)
        - args: argument tuple for target invocation
        - kwargs: keyword arguments for target invocation
        - daemon: whether process is daemon (inherits from parent if None)
        """
    
    def start(self):
        """Start the process's activity."""
    
    def run(self):
        """Method representing the process's activity (override in subclasses)."""
    
    def join(self, timeout=None):
        """
        Wait until process terminates.
        
        Parameters:
        - timeout: timeout in seconds (None for no timeout)
        """
    
    def terminate(self):
        """Terminate the process using SIGTERM."""
    
    def terminate_controlled(self):
        """Controlled termination with cleanup."""
    
    def is_alive(self) -> bool:
        """Return whether process is alive."""
    
    def close(self):
        """Close the process object and release resources."""
    
    @property
    def name(self) -> str:
        """Process name."""
    
    @name.setter
    def name(self, value: str):
        """Set process name."""
    
    @property
    def daemon(self) -> bool:
        """Daemon flag."""
    
    @daemon.setter  
    def daemon(self, value: bool):
        """Set daemon flag."""
    
    @property
    def pid(self) -> int:
        """Process ID (None if not started)."""
    
    @property
    def ident(self) -> int:
        """Alias for pid."""
    
    @property
    def exitcode(self) -> int:
        """Exit code (None if not terminated)."""
    
    @property
    def authkey(self) -> bytes:
        """Authentication key."""
    
    @authkey.setter
    def authkey(self, value: bytes):
        """Set authentication key."""
    
    @property
    def sentinel(self) -> int:
        """File descriptor for waiting."""

Usage example:

from billiard import Process
import time
import os

def worker_function(name, duration):
    print(f"Worker {name} (PID: {os.getpid()}) starting")
    time.sleep(duration)
    print(f"Worker {name} finished")

# Create and start process
process = Process(target=worker_function, args=("Worker-1", 2))
process.name = "MyWorker"
process.daemon = False
process.start()

print(f"Started process {process.name} with PID {process.pid}")

# Wait for completion
process.join(timeout=5)

if process.is_alive():
    print("Process is still running, terminating...")
    process.terminate()
    process.join()

print(f"Process exit code: {process.exitcode}")
process.close()

Process Introspection

Functions to inspect the current process and active child processes.

def current_process() -> Process:
    """
    Return Process object representing the current process.
    """

def active_children() -> list[Process]:
    """
    Return list of all alive child processes of current process.
    Calling this has the side effect of joining any processes that have finished.
    """

def cpu_count() -> int:
    """
    Return number of CPUs in the system.
    
    Raises:
    - NotImplementedError: if number of CPUs cannot be determined
    """

Usage example:

from billiard import Process, current_process, active_children, cpu_count

def worker_task():
    current = current_process()
    print(f"Worker: {current.name} (PID: {current.pid})")

if __name__ == '__main__':
    # Get system information
    print(f"System has {cpu_count()} CPUs")
    
    # Current process info
    main_process = current_process()
    print(f"Main process: {main_process.name} (PID: {main_process.pid})")
    
    # Start some workers
    processes = []
    for i in range(3):
        p = Process(target=worker_task, name=f"Worker-{i}")
        p.start()
        processes.append(p)
    
    # Check active children
    children = active_children()
    print(f"Active children: {len(children)}")
    
    # Wait for completion
    for p in processes:
        p.join()
    
    # Check again
    children = active_children()
    print(f"Active children after join: {len(children)}")

Authentication

Secure process authentication using authentication keys.

class AuthenticationString(bytes):
    """
    A string-like object which can be passed to multiprocessing functions
    that accept authentication keys.
    """
    def __reduce__(self):
        """Custom pickling to maintain security."""

Usage example:

from billiard import Process, current_process
import os

def secure_worker():
    process = current_process()
    print(f"Worker authkey: {process.authkey[:8]}...")

if __name__ == '__main__':
    # Set custom authentication key
    custom_key = os.urandom(32)
    
    process = Process(target=secure_worker)
    process.authkey = custom_key
    process.start()
    process.join()

Constants

SUBDEBUG: int = 5
SUBWARNING: int = 25

These constants define additional logging levels used by billiard for detailed process debugging and warnings.

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json