CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

context-management.mddocs/

Context Management

Process context configuration for controlling process start methods and execution environments. Billiard provides flexible context management for different multiprocessing scenarios.

Capabilities

Context Functions

Global functions for managing the default process context.

def get_context(method=None):
    """
    Get a process context.
    
    Parameters:
    - method: start method ('fork', 'spawn', 'forkserver', or None for default)
    
    Returns:
    BaseContext object configured for the specified method
    """

def set_start_method(method, force=False):
    """
    Set the default start method for processes.
    
    Parameters:
    - method: start method ('fork', 'spawn', 'forkserver')
    - force: if True, allow changing an already set method
    
    Raises:
    - RuntimeError: if method already set and force=False
    - ValueError: if method is not supported on current platform
    """

def get_start_method(allow_none=False) -> str:
    """
    Get the current default start method.
    
    Parameters:
    - allow_none: if True, return None if no method set; otherwise set default
    
    Returns:
    String name of current start method
    """

def get_all_start_methods() -> list[str]:
    """
    Get list of all available start methods on current platform.
    
    Returns:
    List of available start method names
    """

Usage example:

import billiard as mp
from billiard import Process
import os

def worker_info():
    """Print information about worker process"""
    print(f"Worker PID: {os.getpid()}")
    print(f"Parent PID: {os.getppid()}")
    print(f"Start method: {mp.get_start_method()}")

def context_basics_example():
    """Demonstrate basic context management"""
    print(f"Available start methods: {mp.get_all_start_methods()}")
    print(f"Current start method: {mp.get_start_method()}")
    
    # Set start method (must be done before creating processes)
    try:
        mp.set_start_method('spawn', force=True)
        print(f"Set start method to: {mp.get_start_method()}")
    except ValueError as e:
        print(f"Could not set start method: {e}")
    
    # Create process with current context
    process = Process(target=worker_info)
    process.start()
    process.join()

if __name__ == '__main__':
    context_basics_example()

Context Objects

Context objects provide isolated multiprocessing environments with specific configurations.

class BaseContext:
    """
    Base class for multiprocessing contexts.
    """
    # Process management
    def Process(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
        """Create a Process using this context."""
    
    def current_process(self):
        """Get current process object."""
    
    def active_children(self):
        """Get list of active child processes."""
    
    def cpu_count(self):
        """Get number of CPUs."""
    
    # Synchronization primitives
    def Lock(self):
        """Create a Lock using this context."""
    
    def RLock(self):
        """Create an RLock using this context."""
    
    def Semaphore(self, value=1):
        """Create a Semaphore using this context."""
    
    def BoundedSemaphore(self, value=1):
        """Create a BoundedSemaphore using this context."""
    
    def Condition(self, lock=None):
        """Create a Condition using this context."""
    
    def Event(self):
        """Create an Event using this context."""
    
    def Barrier(self, parties, action=None, timeout=None):
        """Create a Barrier using this context."""
    
    # Communication
    def Pipe(self, duplex=True):
        """Create a Pipe using this context."""
    
    def Queue(self, maxsize=0):
        """Create a Queue using this context."""
    
    def JoinableQueue(self, maxsize=0):
        """Create a JoinableQueue using this context."""
    
    def SimpleQueue(self):
        """Create a SimpleQueue using this context."""
    
    # Shared memory
    def Value(self, typecode_or_type, *args, lock=True):
        """Create a shared Value using this context."""
    
    def Array(self, typecode_or_type, size_or_initializer, lock=True):
        """Create a shared Array using this context."""
    
    # Pool
    def Pool(self, processes=None, initializer=None, initargs=(), 
             maxtasksperchild=None):
        """Create a Pool using this context."""
    
    # Manager
    def Manager(self):
        """Create a Manager using this context."""

Usage example:

import billiard as mp
import os
import time

def context_worker(context_name, queue):
    """Worker that reports its context"""
    pid = os.getpid()
    queue.put(f"Worker from {context_name} context: PID {pid}")

def multiple_contexts_example():
    """Demonstrate using multiple contexts"""
    # Get different contexts
    fork_ctx = mp.get_context('fork') if 'fork' in mp.get_all_start_methods() else None
    spawn_ctx = mp.get_context('spawn')
    
    processes = []
    results_queue = mp.Queue()
    
    # Create processes with different contexts
    if fork_ctx:
        p1 = fork_ctx.Process(target=context_worker, 
                             args=('fork', results_queue))
        processes.append(p1)
    
    p2 = spawn_ctx.Process(target=context_worker, 
                          args=('spawn', results_queue))
    processes.append(p2)
    
    # Start all processes
    for p in processes:
        p.start()
    
    # Collect results
    for _ in processes:
        result = results_queue.get()
        print(result)
    
    # Wait for completion
    for p in processes:
        p.join()

def context_isolation_example():
    """Demonstrate context isolation"""
    # Create separate contexts
    ctx1 = mp.get_context('spawn')
    ctx2 = mp.get_context('spawn')
    
    # Each context has its own objects
    queue1 = ctx1.Queue()
    queue2 = ctx2.Queue()
    
    def isolated_worker(queue, context_id):
        queue.put(f"Message from context {context_id}")
    
    # Create processes in different contexts
    p1 = ctx1.Process(target=isolated_worker, args=(queue1, 1))
    p2 = ctx2.Process(target=isolated_worker, args=(queue2, 2))
    
    p1.start()
    p2.start()
    
    print("Context 1:", queue1.get())
    print("Context 2:", queue2.get())
    
    p1.join()
    p2.join()

if __name__ == '__main__':
    multiple_contexts_example()
    context_isolation_example()

Start Methods

Different process creation methods with distinct characteristics.

Fork Method

# Only available on Unix-like systems
ctx = mp.get_context('fork')

Characteristics:

  • Fast startup: Child inherits parent's memory space
  • Shared resources: All imported modules and objects are available
  • Threading issues: Not safe with threads (can cause deadlocks)
  • Memory efficient: Copy-on-write memory sharing
  • Unix only: Not available on Windows

Spawn Method

# Available on all platforms
ctx = mp.get_context('spawn')

Characteristics:

  • Clean startup: Fresh Python interpreter for each process
  • Thread safe: No threading issues
  • Slower startup: Must import modules and initialize objects
  • Cross-platform: Works on Windows, macOS, and Linux
  • Isolated: Each process has separate memory space

Forkserver Method

# Available on Unix with proper setup
ctx = mp.get_context('forkserver')

Characteristics:

  • Server-based: Uses dedicated fork server process
  • Thread safe: Avoids threading issues of fork
  • Resource efficient: Server pre-imports common modules
  • Security: Isolated from parent process state
  • Unix only: Not available on Windows

Usage example:

import billiard as mp
import os
import threading
import time

def threaded_parent_worker():
    """Worker function that creates threads"""
    def thread_worker(thread_id):
        time.sleep(0.1)
        print(f"Thread {thread_id} completed")
    
    # Create threads in parent
    threads = []
    for i in range(3):
        t = threading.Thread(target=thread_worker, args=(i,))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    print("All threads completed in parent")

def child_process_work(method_name):
    """Work done in child process"""
    print(f"Child process ({method_name}): PID {os.getpid()}, "
          f"PPID {os.getppid()}")
    print(f"Active threads in child: {threading.active_count()}")

def start_method_comparison():
    """Compare different start methods"""
    # Start some threads in parent process
    parent_thread = threading.Thread(target=threaded_parent_worker)
    parent_thread.start()
    
    time.sleep(0.05)  # Let threads start
    
    available_methods = mp.get_all_start_methods()
    print(f"Available methods: {available_methods}")
    print(f"Active threads in parent: {threading.active_count()}")
    
    # Test each available method
    for method in available_methods:
        print(f"\n--- Testing {method} method ---")
        try:
            ctx = mp.get_context(method)
            process = ctx.Process(target=child_process_work, args=(method,))
            process.start()
            process.join()
        except Exception as e:
            print(f"Error with {method}: {e}")
    
    # Wait for parent threads
    parent_thread.join()

if __name__ == '__main__':
    start_method_comparison()

Context Best Practices

Application Structure

import billiard as mp

def main():
    """Main application entry point"""
    # Set start method early in program
    mp.set_start_method('spawn')  # or 'fork', 'forkserver'
    
    # Create context for specific needs
    ctx = mp.get_context()
    
    # Use context consistently
    queue = ctx.Queue()
    processes = []
    
    for i in range(4):
        p = ctx.Process(target=worker, args=(queue, i))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

def worker(queue, worker_id):
    """Worker function"""
    queue.put(f"Result from worker {worker_id}")

if __name__ == '__main__':
    main()

Context Selection Guidelines

import billiard as mp
import sys
import platform

def select_optimal_context():
    """Select optimal context based on platform and requirements"""
    
    if sys.platform == 'win32':
        # Windows only supports spawn
        return mp.get_context('spawn')
    
    elif 'darwin' in sys.platform:
        # macOS: spawn is often safer due to framework issues
        return mp.get_context('spawn')
    
    else:
        # Linux/Unix: more options available
        if threading.active_count() > 1:
            # Threads present: avoid fork
            if 'forkserver' in mp.get_all_start_methods():
                return mp.get_context('forkserver')
            else:
                return mp.get_context('spawn')
        else:
            # No threading issues: fork is fastest
            return mp.get_context('fork')

def context_factory_pattern():
    """Factory pattern for context creation"""
    
    class ProcessContextFactory:
        @staticmethod
        def create_high_performance_context():
            """Context optimized for performance"""
            if 'fork' in mp.get_all_start_methods():
                return mp.get_context('fork')
            return mp.get_context('spawn')
        
        @staticmethod
        def create_safe_context():
            """Context optimized for safety"""
            if 'forkserver' in mp.get_all_start_methods():
                return mp.get_context('forkserver')
            return mp.get_context('spawn')
        
        @staticmethod
        def create_portable_context():
            """Context for maximum portability"""
            return mp.get_context('spawn')
    
    # Usage
    factory = ProcessContextFactory()
    
    if platform.system() == 'Windows':
        ctx = factory.create_portable_context()
    elif performance_critical:
        ctx = factory.create_high_performance_context()
    else:
        ctx = factory.create_safe_context()
    
    return ctx

# Example usage
performance_critical = True
optimal_ctx = select_optimal_context()
print(f"Selected context with start method: {optimal_ctx._name}")

Platform Considerations

Windows

  • Only spawn method available
  • All imports must be guarded with if __name__ == '__main__':
  • Slower process creation due to full interpreter startup

macOS

  • All methods available but spawn often preferred
  • Framework compatibility issues with fork
  • Consider forkserver for mixed threading scenarios

Linux

  • All methods available
  • fork fastest for CPU-intensive tasks
  • spawn safest for mixed workloads
  • forkserver good compromise for long-running applications

Context Configuration Examples

High-Performance Computing

import billiard as mp

# Configure for HPC workload
if 'fork' in mp.get_all_start_methods():
    mp.set_start_method('fork')  # Fastest startup
else:
    mp.set_start_method('spawn')

ctx = mp.get_context()

# Use large pools for parallel computation
with ctx.Pool(processes=mp.cpu_count()) as pool:
    results = pool.map(compute_intensive_function, data)

Web Service Backend

import billiard as mp

# Configure for web service (thread-safe)
mp.set_start_method('spawn')  # Safe with web frameworks
ctx = mp.get_context()

# Create worker pool for request processing
request_pool = ctx.Pool(processes=4, maxtasksperchild=100)

Distributed System

import billiard as mp

# Configure for distributed processing
if 'forkserver' in mp.get_all_start_methods():
    mp.set_start_method('forkserver')  # Isolated and efficient
else:
    mp.set_start_method('spawn')

ctx = mp.get_context()
manager = ctx.Manager()  # For distributed state

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json