CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-multiprocess

A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill

Pending
Overview
Eval results
Files

context-config.mddocs/

Context and Configuration

Context management and configuration options for different process start methods and serialization behavior. Allows fine-grained control over process creation and object serialization in multiprocess applications.

Capabilities

Context Management

Functions for managing different process start methods and contexts.

def get_context(method=None):
    """
    Get a context for a specific start method.
    
    Args:
        method: start method ('fork', 'spawn', 'forkserver', or None)
               None returns the current default context
    
    Returns:
        BaseContext: context object for the specified method
        
    Raises:
        ValueError: if method is not available on the platform
    """

def set_start_method(method, force=False):
    """
    Set the start method for creating processes.
    
    Args:
        method: start method ('fork', 'spawn', 'forkserver')
        force: if True, allow changing after context has been used
        
    Raises:
        RuntimeError: if context has already been set and force=False
        ValueError: if method is not available
    """

def get_start_method(allow_none=False):
    """
    Get the current start method.
    
    Args:
        allow_none: if True, return None if no method has been set
        
    Returns:
        str: current start method name
    """

def get_all_start_methods():
    """
    Get all available start methods for the platform.
    
    Returns:
        list[str]: list of available start methods, default first
    """

Context Objects

Different context types for specific start methods.

class BaseContext:
    """
    Base context class providing multiprocess functionality.
    """
    # All the standard multiprocess functions are available on context objects
    def Process(self, target=None, name=None, args=(), kwargs={}, daemon=None):
        """Create a Process using this context."""
    
    def Pool(self, processes=None, initializer=None, initargs=(), 
             maxtasksperchild=None):
        """Create a Pool using this context."""
    
    def Queue(self, maxsize=0):
        """Create a Queue using this context."""
    
    def Lock(self):
        """Create a Lock using this context."""
    
    def Manager(self):
        """Create a Manager using this context."""
    
    # ... all other multiprocess functions available

Process Configuration

Functions for configuring process behavior and execution environment.

def set_executable(executable):
    """
    Set the path to Python executable for spawned processes.
    
    Useful when embedding Python or using custom Python installations.
    Only affects 'spawn' start method.
    
    Args:
        executable: path to Python executable
    """

def set_forkserver_preload(module_names):
    """
    Set list of module names to preload in forkserver process.
    
    Modules are imported when the forkserver starts, potentially
    improving performance by avoiding repeated imports.
    Only affects 'forkserver' start method.
    
    Args:
        module_names: list of module names to preload
    """

Serialization Configuration

Control over object serialization behavior.

# Context property for controlling serialization
@property
def reducer(self):
    """
    Get the reducer used for object serialization.
    
    Returns:
        module: reduction module (typically dill-enhanced)
    """

@reducer.setter
def reducer(self, reduction):
    """
    Set the reducer for object serialization.
    
    Args:
        reduction: reduction module to use
    """

Advanced Configuration

Functions for specialized configuration scenarios.

def allow_connection_pickling():
    """
    Enable pickling of connection objects.
    
    Allows connection objects to be sent between processes.
    This is generally not recommended for security reasons.
    """

def freeze_support():
    """
    Add support for frozen executables on Windows.
    
    Should be called in the main module of programs that use
    multiprocess and are frozen with tools like py2exe or PyInstaller.
    """

def get_logger():
    """
    Get the logger used by multiprocess.
    
    Returns:
        Logger: The multiprocess logger instance
    """

def log_to_stderr(level=None):
    """
    Turn on logging and add a handler which prints to stderr.
    
    Args:
        level: Optional logging level to set
        
    Returns:
        Logger: The configured logger instance
    """

Start Method Details

Fork Method

Available on Unix-like systems (Linux, macOS). Creates child processes by forking the parent process.

Characteristics:

  • Fast process creation
  • Inherits parent's memory state
  • Shares file descriptors and handles
  • Can have issues with threads and certain libraries
  • Default on Linux

When to use:

  • Fast process startup needed
  • Sharing large amounts of data
  • Unix-only applications

Spawn Method

Available on all platforms. Creates fresh Python interpreter processes.

Characteristics:

  • Slower process creation
  • Clean process state
  • Better isolation
  • Required on Windows
  • Safer with threads
  • Default on Windows and macOS (recent versions)

When to use:

  • Cross-platform compatibility needed
  • Using threading in parent process
  • Need process isolation
  • Windows applications

Forkserver Method

Available on Unix-like systems. Uses a server process to create child processes.

Characteristics:

  • Moderate startup time
  • Clean process state after server startup
  • Server process can preload modules
  • Avoids fork-related issues
  • Requires additional setup

When to use:

  • Want benefits of fork with clean state
  • Using threads that don't work well with fork
  • Need to preload heavy modules

Usage Examples

Basic Context Usage

import multiprocess as mp

# Get default context
ctx = mp.get_context()
print(f"Default method: {ctx.get_start_method()}")

# Get specific context
spawn_ctx = mp.get_context('spawn')
fork_ctx = mp.get_context('fork')  # Unix only

# Use context to create objects
with spawn_ctx.Pool(2) as pool:
    results = pool.map(lambda x: x**2, [1, 2, 3, 4])
    print(results)

Setting Start Method

import multiprocess as mp

def worker():
    print(f"Worker using method: {mp.get_start_method()}")

if __name__ == '__main__':
    # Must set start method before creating processes
    print(f"Available methods: {mp.get_all_start_methods()}")
    
    # Set start method
    mp.set_start_method('spawn')
    print(f"Set method to: {mp.get_start_method()}")
    
    # Create process
    p = mp.Process(target=worker)
    p.start()
    p.join()

Method Comparison

import multiprocess as mp
import time
import os

def simple_worker(method_name):
    """Simple worker to test process creation time"""
    pid = os.getpid()
    print(f"Worker PID {pid} using {method_name}")
    return pid

def benchmark_method(method_name, num_processes=4):
    """Benchmark process creation time for a method"""
    try:
        ctx = mp.get_context(method_name)
    except ValueError:
        print(f"Method {method_name} not available")
        return None
    
    start_time = time.time()
    
    with ctx.Pool(num_processes) as pool:
        results = pool.map(simple_worker, [method_name] * num_processes)
    
    end_time = time.time()
    duration = end_time - start_time
    
    print(f"{method_name}: {duration:.3f} seconds for {num_processes} processes")
    return duration

if __name__ == '__main__':
    print("Benchmarking start methods:")
    
    methods = mp.get_all_start_methods()
    print(f"Available methods: {methods}")
    
    for method in methods:
        benchmark_method(method)

Context Isolation

import multiprocess as mp

# Global variable to test inheritance
global_data = "Original data"

def show_global_data(context_method):
    """Show whether global data is inherited"""
    print(f"Method {context_method}: global_data = '{global_data}'")

def modify_and_show(context_method):
    """Modify global data and show it"""
    global global_data
    global_data = f"Modified by {context_method}"
    print(f"Method {context_method}: modified global_data = '{global_data}'")

if __name__ == '__main__':
    # Test different contexts
    methods = mp.get_all_start_methods()
    
    for method in methods:
        try:
            ctx = mp.get_context(method)
            print(f"\nTesting {method} method:")
            
            # Show original data
            p1 = ctx.Process(target=show_global_data, args=(method,))
            p1.start()
            p1.join()
            
            # Try to modify data
            p2 = ctx.Process(target=modify_and_show, args=(method,))
            p2.start()
            p2.join()
            
            # Show data again to see if modification persisted
            p3 = ctx.Process(target=show_global_data, args=(method,))
            p3.start()
            p3.join()
            
        except ValueError:
            print(f"Method {method} not available")

Custom Executable Configuration

import multiprocess as mp
import sys
import subprocess

def worker_info():
    """Show information about the Python executable"""
    print(f"Executable: {sys.executable}")
    print(f"Version: {sys.version}")
    print(f"Platform: {sys.platform}")

if __name__ == '__main__':
    print("Default executable:")
    p1 = mp.Process(target=worker_info)
    p1.start()
    p1.join()
    
    # Set custom executable (if available)
    # This is just an example - in practice you'd use a different Python
    try:
        # Try to find another Python executable
        result = subprocess.run(['which', 'python3'], 
                               capture_output=True, text=True)
        if result.returncode == 0:
            custom_python = result.stdout.strip()
            if custom_python != sys.executable:
                print(f"\nSetting custom executable: {custom_python}")
                mp.set_executable(custom_python)
                
                p2 = mp.Process(target=worker_info)
                p2.start()
                p2.join()
            else:
                print("No different Python executable found")
        else:
            print("Could not find alternative Python executable")
    except Exception as e:
        print(f"Error setting custom executable: {e}")

Forkserver with Preloading

import multiprocess as mp
import time

# Heavy module to preload
import numpy as np
import json

def worker_with_numpy(data):
    """Worker that uses numpy - should be faster with preloading"""
    start_time = time.time()
    
    # Use numpy (should be already imported in forkserver)
    arr = np.array(data)
    result = np.sum(arr ** 2)
    
    end_time = time.time()
    print(f"Worker processed {len(data)} items in {end_time - start_time:.3f}s")
    return result

if __name__ == '__main__':
    # Only works on systems that support forkserver
    if 'forkserver' in mp.get_all_start_methods():
        # Set forkserver method
        mp.set_start_method('forkserver')
        
        # Preload heavy modules in forkserver
        mp.set_forkserver_preload(['numpy', 'json'])
        
        print("Using forkserver with preloaded modules")
        
        # Create some work
        data_sets = [[i] * 1000 for i in range(1, 6)]
        
        # Process with preloaded modules
        start_time = time.time()
        with mp.Pool(2) as pool:
            results = pool.map(worker_with_numpy, data_sets)
        end_time = time.time()
        
        print(f"Results: {results}")
        print(f"Total time: {end_time - start_time:.3f}s")
    else:
        print("Forkserver method not available on this platform")

Advanced Configuration Example

import multiprocess as mp
import os
import sys

class ConfigurableWorker:
    def __init__(self, config):
        self.config = config
    
    def work(self, data):
        pid = os.getpid()
        method = mp.get_start_method()
        
        result = {
            'pid': pid,
            'method': method,
            'data': data,
            'config': self.config,
            'processed': data * 2
        }
        
        return result

def create_configured_pool(method='spawn', processes=None, preload=None):
    """Create a pool with specific configuration"""
    
    # Set start method
    mp.set_start_method(method, force=True)
    
    # Configure forkserver preloading if applicable
    if method == 'forkserver' and preload:
        mp.set_forkserver_preload(preload)
    
    # Create context
    ctx = mp.get_context(method)
    
    # Configuration for workers
    config = {
        'method': method,
        'preload': preload,
        'python_executable': sys.executable
    }
    
    # Create pool with initializer
    def init_worker():
        global worker_instance
        worker_instance = ConfigurableWorker(config)
    
    def worker_task(data):
        return worker_instance.work(data)
    
    pool = ctx.Pool(processes=processes, 
                   initializer=init_worker)
    
    return pool, worker_task

if __name__ == '__main__':
    # Test different configurations
    configs = [
        {'method': 'spawn', 'processes': 2, 'preload': None},
    ]
    
    # Add forkserver if available
    if 'forkserver' in mp.get_all_start_methods():
        configs.append({
            'method': 'forkserver', 
            'processes': 2, 
            'preload': ['json', 'os']
        })
    
    # Add fork if available
    if 'fork' in mp.get_all_start_methods():
        configs.append({
            'method': 'fork', 
            'processes': 2, 
            'preload': None
        })
    
    data = [1, 2, 3, 4, 5]
    
    for config in configs:
        print(f"\nTesting configuration: {config}")
        
        try:
            with create_configured_pool(**config) as (pool, task_func):
                results = pool.map(task_func, data)
                
                print("Results:")
                for result in results:
                    print(f"  PID {result['pid']} ({result['method']}): "
                          f"{result['data']} -> {result['processed']}")
                          
        except Exception as e:
            print(f"Configuration failed: {e}")

Context Best Practices

import multiprocess as mp
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ProcessManager:
    def __init__(self, method=None, processes=None):
        self.method = method or self._get_best_method()
        self.processes = processes or mp.cpu_count()
        self.context = mp.get_context(self.method)
        
        logger.info(f"Initialized ProcessManager with method '{self.method}', "
                   f"{self.processes} processes")
    
    def _get_best_method(self):
        """Choose the best start method for the platform"""
        available = mp.get_all_start_methods()
        
        # Platform-specific preferences
        if sys.platform == 'win32':
            return 'spawn'  # Only option on Windows
        elif sys.platform == 'darwin':
            return 'spawn'  # Recommended on macOS
        else:
            # On Linux, prefer forkserver if available, then fork
            if 'forkserver' in available:
                return 'forkserver'
            elif 'fork' in available:
                return 'fork'
            else:
                return 'spawn'
    
    def create_pool(self, **kwargs):
        """Create a pool using the configured context"""
        return self.context.Pool(processes=self.processes, **kwargs)
    
    def create_process(self, **kwargs):
        """Create a process using the configured context"""
        return self.context.Process(**kwargs)
    
    def get_queue(self, **kwargs):
        """Create a queue using the configured context"""
        return self.context.Queue(**kwargs)

def example_task(x):
    return x ** 2

if __name__ == '__main__':
    import sys
    
    # Create process manager
    pm = ProcessManager()
    
    # Use the manager
    with pm.create_pool() as pool:
        data = list(range(10))
        results = pool.map(example_task, data)
        logger.info(f"Results: {results}")
    
    # Create individual processes
    q = pm.get_queue()
    
    def worker():
        q.put(f"Hello from {os.getpid()}")
    
    processes = []
    for i in range(3):
        p = pm.create_process(target=worker)
        p.start()
        processes.append(p)
    
    # Collect results
    for _ in range(3):
        message = q.get()
        logger.info(f"Received: {message}")
    
    # Wait for processes
    for p in processes:
        p.join()
    
    logger.info("All processes completed")

Install with Tessl CLI

npx tessl i tessl/pypi-multiprocess

docs

communication.md

context-config.md

index.md

pools.md

process-management.md

shared-objects.md

synchronization.md

tile.json