Python multiprocessing fork with improvements and bugfixes for distributed task processing
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Process context configuration for controlling process start methods and execution environments. Billiard provides flexible context management for different multiprocessing scenarios.
Global functions for managing the default process context.
def get_context(method=None):
"""
Get a process context.
Parameters:
- method: start method ('fork', 'spawn', 'forkserver', or None for default)
Returns:
BaseContext object configured for the specified method
"""
def set_start_method(method, force=False):
"""
Set the default start method for processes.
Parameters:
- method: start method ('fork', 'spawn', 'forkserver')
- force: if True, allow changing an already set method
Raises:
- RuntimeError: if method already set and force=False
- ValueError: if method is not supported on current platform
"""
def get_start_method(allow_none=False) -> str:
"""
Get the current default start method.
Parameters:
- allow_none: if True, return None if no method set; otherwise set default
Returns:
String name of current start method
"""
def get_all_start_methods() -> list[str]:
"""
Get list of all available start methods on current platform.
Returns:
List of available start method names
"""Usage example:
import billiard as mp
from billiard import Process
import os
def worker_info():
"""Print information about worker process"""
print(f"Worker PID: {os.getpid()}")
print(f"Parent PID: {os.getppid()}")
print(f"Start method: {mp.get_start_method()}")
def context_basics_example():
"""Demonstrate basic context management"""
print(f"Available start methods: {mp.get_all_start_methods()}")
print(f"Current start method: {mp.get_start_method()}")
# Set start method (must be done before creating processes)
try:
mp.set_start_method('spawn', force=True)
print(f"Set start method to: {mp.get_start_method()}")
except ValueError as e:
print(f"Could not set start method: {e}")
# Create process with current context
process = Process(target=worker_info)
process.start()
process.join()
if __name__ == '__main__':
context_basics_example()Context objects provide isolated multiprocessing environments with specific configurations.
class BaseContext:
"""
Base class for multiprocessing contexts.
"""
# Process management
def Process(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None):
"""Create a Process using this context."""
def current_process(self):
"""Get current process object."""
def active_children(self):
"""Get list of active child processes."""
def cpu_count(self):
"""Get number of CPUs."""
# Synchronization primitives
def Lock(self):
"""Create a Lock using this context."""
def RLock(self):
"""Create an RLock using this context."""
def Semaphore(self, value=1):
"""Create a Semaphore using this context."""
def BoundedSemaphore(self, value=1):
"""Create a BoundedSemaphore using this context."""
def Condition(self, lock=None):
"""Create a Condition using this context."""
def Event(self):
"""Create an Event using this context."""
def Barrier(self, parties, action=None, timeout=None):
"""Create a Barrier using this context."""
# Communication
def Pipe(self, duplex=True):
"""Create a Pipe using this context."""
def Queue(self, maxsize=0):
"""Create a Queue using this context."""
def JoinableQueue(self, maxsize=0):
"""Create a JoinableQueue using this context."""
def SimpleQueue(self):
"""Create a SimpleQueue using this context."""
# Shared memory
def Value(self, typecode_or_type, *args, lock=True):
"""Create a shared Value using this context."""
def Array(self, typecode_or_type, size_or_initializer, lock=True):
"""Create a shared Array using this context."""
# Pool
def Pool(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
"""Create a Pool using this context."""
# Manager
def Manager(self):
"""Create a Manager using this context."""Usage example:
import billiard as mp
import os
import time
def context_worker(context_name, queue):
"""Worker that reports its context"""
pid = os.getpid()
queue.put(f"Worker from {context_name} context: PID {pid}")
def multiple_contexts_example():
"""Demonstrate using multiple contexts"""
# Get different contexts
fork_ctx = mp.get_context('fork') if 'fork' in mp.get_all_start_methods() else None
spawn_ctx = mp.get_context('spawn')
processes = []
results_queue = mp.Queue()
# Create processes with different contexts
if fork_ctx:
p1 = fork_ctx.Process(target=context_worker,
args=('fork', results_queue))
processes.append(p1)
p2 = spawn_ctx.Process(target=context_worker,
args=('spawn', results_queue))
processes.append(p2)
# Start all processes
for p in processes:
p.start()
# Collect results
for _ in processes:
result = results_queue.get()
print(result)
# Wait for completion
for p in processes:
p.join()
def context_isolation_example():
"""Demonstrate context isolation"""
# Create separate contexts
ctx1 = mp.get_context('spawn')
ctx2 = mp.get_context('spawn')
# Each context has its own objects
queue1 = ctx1.Queue()
queue2 = ctx2.Queue()
def isolated_worker(queue, context_id):
queue.put(f"Message from context {context_id}")
# Create processes in different contexts
p1 = ctx1.Process(target=isolated_worker, args=(queue1, 1))
p2 = ctx2.Process(target=isolated_worker, args=(queue2, 2))
p1.start()
p2.start()
print("Context 1:", queue1.get())
print("Context 2:", queue2.get())
p1.join()
p2.join()
if __name__ == '__main__':
multiple_contexts_example()
context_isolation_example()Different process creation methods with distinct characteristics.
# Only available on Unix-like systems
ctx = mp.get_context('fork')Characteristics:
# Available on all platforms
ctx = mp.get_context('spawn')Characteristics:
# Available on Unix with proper setup
ctx = mp.get_context('forkserver')Characteristics:
Usage example:
import billiard as mp
import os
import threading
import time
def threaded_parent_worker():
"""Worker function that creates threads"""
def thread_worker(thread_id):
time.sleep(0.1)
print(f"Thread {thread_id} completed")
# Create threads in parent
threads = []
for i in range(3):
t = threading.Thread(target=thread_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("All threads completed in parent")
def child_process_work(method_name):
"""Work done in child process"""
print(f"Child process ({method_name}): PID {os.getpid()}, "
f"PPID {os.getppid()}")
print(f"Active threads in child: {threading.active_count()}")
def start_method_comparison():
"""Compare different start methods"""
# Start some threads in parent process
parent_thread = threading.Thread(target=threaded_parent_worker)
parent_thread.start()
time.sleep(0.05) # Let threads start
available_methods = mp.get_all_start_methods()
print(f"Available methods: {available_methods}")
print(f"Active threads in parent: {threading.active_count()}")
# Test each available method
for method in available_methods:
print(f"\n--- Testing {method} method ---")
try:
ctx = mp.get_context(method)
process = ctx.Process(target=child_process_work, args=(method,))
process.start()
process.join()
except Exception as e:
print(f"Error with {method}: {e}")
# Wait for parent threads
parent_thread.join()
if __name__ == '__main__':
start_method_comparison()import billiard as mp
def main():
"""Main application entry point"""
# Set start method early in program
mp.set_start_method('spawn') # or 'fork', 'forkserver'
# Create context for specific needs
ctx = mp.get_context()
# Use context consistently
queue = ctx.Queue()
processes = []
for i in range(4):
p = ctx.Process(target=worker, args=(queue, i))
processes.append(p)
p.start()
for p in processes:
p.join()
def worker(queue, worker_id):
"""Worker function"""
queue.put(f"Result from worker {worker_id}")
if __name__ == '__main__':
main()import billiard as mp
import sys
import platform
def select_optimal_context():
"""Select optimal context based on platform and requirements"""
if sys.platform == 'win32':
# Windows only supports spawn
return mp.get_context('spawn')
elif 'darwin' in sys.platform:
# macOS: spawn is often safer due to framework issues
return mp.get_context('spawn')
else:
# Linux/Unix: more options available
if threading.active_count() > 1:
# Threads present: avoid fork
if 'forkserver' in mp.get_all_start_methods():
return mp.get_context('forkserver')
else:
return mp.get_context('spawn')
else:
# No threading issues: fork is fastest
return mp.get_context('fork')
def context_factory_pattern():
"""Factory pattern for context creation"""
class ProcessContextFactory:
@staticmethod
def create_high_performance_context():
"""Context optimized for performance"""
if 'fork' in mp.get_all_start_methods():
return mp.get_context('fork')
return mp.get_context('spawn')
@staticmethod
def create_safe_context():
"""Context optimized for safety"""
if 'forkserver' in mp.get_all_start_methods():
return mp.get_context('forkserver')
return mp.get_context('spawn')
@staticmethod
def create_portable_context():
"""Context for maximum portability"""
return mp.get_context('spawn')
# Usage
factory = ProcessContextFactory()
if platform.system() == 'Windows':
ctx = factory.create_portable_context()
elif performance_critical:
ctx = factory.create_high_performance_context()
else:
ctx = factory.create_safe_context()
return ctx
# Example usage
performance_critical = True
optimal_ctx = select_optimal_context()
print(f"Selected context with start method: {optimal_ctx._name}")spawn method availableif __name__ == '__main__':spawn often preferredforkforkserver for mixed threading scenariosfork fastest for CPU-intensive tasksspawn safest for mixed workloadsforkserver good compromise for long-running applicationsimport billiard as mp
# Configure for HPC workload
if 'fork' in mp.get_all_start_methods():
mp.set_start_method('fork') # Fastest startup
else:
mp.set_start_method('spawn')
ctx = mp.get_context()
# Use large pools for parallel computation
with ctx.Pool(processes=mp.cpu_count()) as pool:
results = pool.map(compute_intensive_function, data)import billiard as mp
# Configure for web service (thread-safe)
mp.set_start_method('spawn') # Safe with web frameworks
ctx = mp.get_context()
# Create worker pool for request processing
request_pool = ctx.Pool(processes=4, maxtasksperchild=100)import billiard as mp
# Configure for distributed processing
if 'forkserver' in mp.get_all_start_methods():
mp.set_start_method('forkserver') # Isolated and efficient
else:
mp.set_start_method('spawn')
ctx = mp.get_context()
manager = ctx.Manager() # For distributed stateInstall with Tessl CLI
npx tessl i tessl/pypi-billiard