A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill
—
Context management and configuration options for different process start methods and serialization behavior. Allows fine-grained control over process creation and object serialization in multiprocess applications.
Functions for managing different process start methods and contexts.
def get_context(method=None):
"""
Get a context for a specific start method.
Args:
method: start method ('fork', 'spawn', 'forkserver', or None)
None returns the current default context
Returns:
BaseContext: context object for the specified method
Raises:
ValueError: if method is not available on the platform
"""
def set_start_method(method, force=False):
"""
Set the start method for creating processes.
Args:
method: start method ('fork', 'spawn', 'forkserver')
force: if True, allow changing after context has been used
Raises:
RuntimeError: if context has already been set and force=False
ValueError: if method is not available
"""
def get_start_method(allow_none=False):
"""
Get the current start method.
Args:
allow_none: if True, return None if no method has been set
Returns:
str: current start method name
"""
def get_all_start_methods():
"""
Get all available start methods for the platform.
Returns:
list[str]: list of available start methods, default first
"""Different context types for specific start methods.
class BaseContext:
"""
Base context class providing multiprocess functionality.
"""
# All the standard multiprocess functions are available on context objects
def Process(self, target=None, name=None, args=(), kwargs={}, daemon=None):
"""Create a Process using this context."""
def Pool(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
"""Create a Pool using this context."""
def Queue(self, maxsize=0):
"""Create a Queue using this context."""
def Lock(self):
"""Create a Lock using this context."""
def Manager(self):
"""Create a Manager using this context."""
# ... all other multiprocess functions availableFunctions for configuring process behavior and execution environment.
def set_executable(executable):
"""
Set the path to Python executable for spawned processes.
Useful when embedding Python or using custom Python installations.
Only affects 'spawn' start method.
Args:
executable: path to Python executable
"""
def set_forkserver_preload(module_names):
"""
Set list of module names to preload in forkserver process.
Modules are imported when the forkserver starts, potentially
improving performance by avoiding repeated imports.
Only affects 'forkserver' start method.
Args:
module_names: list of module names to preload
"""Control over object serialization behavior.
# Context property for controlling serialization
@property
def reducer(self):
"""
Get the reducer used for object serialization.
Returns:
module: reduction module (typically dill-enhanced)
"""
@reducer.setter
def reducer(self, reduction):
"""
Set the reducer for object serialization.
Args:
reduction: reduction module to use
"""Functions for specialized configuration scenarios.
def allow_connection_pickling():
"""
Enable pickling of connection objects.
Allows connection objects to be sent between processes.
This is generally not recommended for security reasons.
"""
def freeze_support():
"""
Add support for frozen executables on Windows.
Should be called in the main module of programs that use
multiprocess and are frozen with tools like py2exe or PyInstaller.
"""
def get_logger():
"""
Get the logger used by multiprocess.
Returns:
Logger: The multiprocess logger instance
"""
def log_to_stderr(level=None):
"""
Turn on logging and add a handler which prints to stderr.
Args:
level: Optional logging level to set
Returns:
Logger: The configured logger instance
"""Available on Unix-like systems (Linux, macOS). Creates child processes by forking the parent process.
Characteristics:
When to use:
Available on all platforms. Creates fresh Python interpreter processes.
Characteristics:
When to use:
Available on Unix-like systems. Uses a server process to create child processes.
Characteristics:
When to use:
import multiprocess as mp
# Get default context
ctx = mp.get_context()
print(f"Default method: {ctx.get_start_method()}")
# Get specific context
spawn_ctx = mp.get_context('spawn')
fork_ctx = mp.get_context('fork') # Unix only
# Use context to create objects
with spawn_ctx.Pool(2) as pool:
results = pool.map(lambda x: x**2, [1, 2, 3, 4])
print(results)import multiprocess as mp
def worker():
print(f"Worker using method: {mp.get_start_method()}")
if __name__ == '__main__':
# Must set start method before creating processes
print(f"Available methods: {mp.get_all_start_methods()}")
# Set start method
mp.set_start_method('spawn')
print(f"Set method to: {mp.get_start_method()}")
# Create process
p = mp.Process(target=worker)
p.start()
p.join()import multiprocess as mp
import time
import os
def simple_worker(method_name):
"""Simple worker to test process creation time"""
pid = os.getpid()
print(f"Worker PID {pid} using {method_name}")
return pid
def benchmark_method(method_name, num_processes=4):
"""Benchmark process creation time for a method"""
try:
ctx = mp.get_context(method_name)
except ValueError:
print(f"Method {method_name} not available")
return None
start_time = time.time()
with ctx.Pool(num_processes) as pool:
results = pool.map(simple_worker, [method_name] * num_processes)
end_time = time.time()
duration = end_time - start_time
print(f"{method_name}: {duration:.3f} seconds for {num_processes} processes")
return duration
if __name__ == '__main__':
print("Benchmarking start methods:")
methods = mp.get_all_start_methods()
print(f"Available methods: {methods}")
for method in methods:
benchmark_method(method)import multiprocess as mp
# Global variable to test inheritance
global_data = "Original data"
def show_global_data(context_method):
"""Show whether global data is inherited"""
print(f"Method {context_method}: global_data = '{global_data}'")
def modify_and_show(context_method):
"""Modify global data and show it"""
global global_data
global_data = f"Modified by {context_method}"
print(f"Method {context_method}: modified global_data = '{global_data}'")
if __name__ == '__main__':
# Test different contexts
methods = mp.get_all_start_methods()
for method in methods:
try:
ctx = mp.get_context(method)
print(f"\nTesting {method} method:")
# Show original data
p1 = ctx.Process(target=show_global_data, args=(method,))
p1.start()
p1.join()
# Try to modify data
p2 = ctx.Process(target=modify_and_show, args=(method,))
p2.start()
p2.join()
# Show data again to see if modification persisted
p3 = ctx.Process(target=show_global_data, args=(method,))
p3.start()
p3.join()
except ValueError:
print(f"Method {method} not available")import multiprocess as mp
import sys
import subprocess
def worker_info():
"""Show information about the Python executable"""
print(f"Executable: {sys.executable}")
print(f"Version: {sys.version}")
print(f"Platform: {sys.platform}")
if __name__ == '__main__':
print("Default executable:")
p1 = mp.Process(target=worker_info)
p1.start()
p1.join()
# Set custom executable (if available)
# This is just an example - in practice you'd use a different Python
try:
# Try to find another Python executable
result = subprocess.run(['which', 'python3'],
capture_output=True, text=True)
if result.returncode == 0:
custom_python = result.stdout.strip()
if custom_python != sys.executable:
print(f"\nSetting custom executable: {custom_python}")
mp.set_executable(custom_python)
p2 = mp.Process(target=worker_info)
p2.start()
p2.join()
else:
print("No different Python executable found")
else:
print("Could not find alternative Python executable")
except Exception as e:
print(f"Error setting custom executable: {e}")import multiprocess as mp
import time
# Heavy module to preload
import numpy as np
import json
def worker_with_numpy(data):
"""Worker that uses numpy - should be faster with preloading"""
start_time = time.time()
# Use numpy (should be already imported in forkserver)
arr = np.array(data)
result = np.sum(arr ** 2)
end_time = time.time()
print(f"Worker processed {len(data)} items in {end_time - start_time:.3f}s")
return result
if __name__ == '__main__':
# Only works on systems that support forkserver
if 'forkserver' in mp.get_all_start_methods():
# Set forkserver method
mp.set_start_method('forkserver')
# Preload heavy modules in forkserver
mp.set_forkserver_preload(['numpy', 'json'])
print("Using forkserver with preloaded modules")
# Create some work
data_sets = [[i] * 1000 for i in range(1, 6)]
# Process with preloaded modules
start_time = time.time()
with mp.Pool(2) as pool:
results = pool.map(worker_with_numpy, data_sets)
end_time = time.time()
print(f"Results: {results}")
print(f"Total time: {end_time - start_time:.3f}s")
else:
print("Forkserver method not available on this platform")import multiprocess as mp
import os
import sys
class ConfigurableWorker:
def __init__(self, config):
self.config = config
def work(self, data):
pid = os.getpid()
method = mp.get_start_method()
result = {
'pid': pid,
'method': method,
'data': data,
'config': self.config,
'processed': data * 2
}
return result
def create_configured_pool(method='spawn', processes=None, preload=None):
"""Create a pool with specific configuration"""
# Set start method
mp.set_start_method(method, force=True)
# Configure forkserver preloading if applicable
if method == 'forkserver' and preload:
mp.set_forkserver_preload(preload)
# Create context
ctx = mp.get_context(method)
# Configuration for workers
config = {
'method': method,
'preload': preload,
'python_executable': sys.executable
}
# Create pool with initializer
def init_worker():
global worker_instance
worker_instance = ConfigurableWorker(config)
def worker_task(data):
return worker_instance.work(data)
pool = ctx.Pool(processes=processes,
initializer=init_worker)
return pool, worker_task
if __name__ == '__main__':
# Test different configurations
configs = [
{'method': 'spawn', 'processes': 2, 'preload': None},
]
# Add forkserver if available
if 'forkserver' in mp.get_all_start_methods():
configs.append({
'method': 'forkserver',
'processes': 2,
'preload': ['json', 'os']
})
# Add fork if available
if 'fork' in mp.get_all_start_methods():
configs.append({
'method': 'fork',
'processes': 2,
'preload': None
})
data = [1, 2, 3, 4, 5]
for config in configs:
print(f"\nTesting configuration: {config}")
try:
with create_configured_pool(**config) as (pool, task_func):
results = pool.map(task_func, data)
print("Results:")
for result in results:
print(f" PID {result['pid']} ({result['method']}): "
f"{result['data']} -> {result['processed']}")
except Exception as e:
print(f"Configuration failed: {e}")import multiprocess as mp
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProcessManager:
def __init__(self, method=None, processes=None):
self.method = method or self._get_best_method()
self.processes = processes or mp.cpu_count()
self.context = mp.get_context(self.method)
logger.info(f"Initialized ProcessManager with method '{self.method}', "
f"{self.processes} processes")
def _get_best_method(self):
"""Choose the best start method for the platform"""
available = mp.get_all_start_methods()
# Platform-specific preferences
if sys.platform == 'win32':
return 'spawn' # Only option on Windows
elif sys.platform == 'darwin':
return 'spawn' # Recommended on macOS
else:
# On Linux, prefer forkserver if available, then fork
if 'forkserver' in available:
return 'forkserver'
elif 'fork' in available:
return 'fork'
else:
return 'spawn'
def create_pool(self, **kwargs):
"""Create a pool using the configured context"""
return self.context.Pool(processes=self.processes, **kwargs)
def create_process(self, **kwargs):
"""Create a process using the configured context"""
return self.context.Process(**kwargs)
def get_queue(self, **kwargs):
"""Create a queue using the configured context"""
return self.context.Queue(**kwargs)
def example_task(x):
return x ** 2
if __name__ == '__main__':
import sys
# Create process manager
pm = ProcessManager()
# Use the manager
with pm.create_pool() as pool:
data = list(range(10))
results = pool.map(example_task, data)
logger.info(f"Results: {results}")
# Create individual processes
q = pm.get_queue()
def worker():
q.put(f"Hello from {os.getpid()}")
processes = []
for i in range(3):
p = pm.create_process(target=worker)
p.start()
processes.append(p)
# Collect results
for _ in range(3):
message = q.get()
logger.info(f"Received: {message}")
# Wait for processes
for p in processes:
p.join()
logger.info("All processes completed")Install with Tessl CLI
npx tessl i tessl/pypi-multiprocess