A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Utility functions for task chunking, CPU affinity management, timing operations, and other helper functionality. These utilities provide fine-grained control over multiprocessing performance and resource management.
Functions for managing CPU resources and system information.
def cpu_count() -> int
def set_cpu_affinity(pid: int, mask: List[int]) -> Nonecpu_count: Get the number of available CPU cores (imported from multiprocessing).
set_cpu_affinity: Set CPU affinity for a process to specific CPU cores.
pid (int): Process ID to set affinity formask (List[int]): List of CPU core IDs to bind the process toFunctions for optimizing task distribution and chunking strategies.
def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,
chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator
def apply_numpy_chunking(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,
chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator
def get_n_chunks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
chunk_size: Optional[int] = None, n_jobs: Optional[int] = None,
n_tasks_per_job: Optional[int] = None, n_splits: Optional[int] = None) -> intchunk_tasks: Split an iterable into optimally-sized chunks for parallel processing.
apply_numpy_chunking: Apply numpy-specific chunking optimizations for array processing.
get_n_chunks: Calculate the optimal number of chunks for given parameters.
Functions for preparing arguments for parallel processing.
def make_single_arguments(iterable_of_args: Iterable, generator: bool = True) -> Union[List, Generator]make_single_arguments: Convert multi-argument tuples to single arguments for functions expecting individual parameters.
Functions for timing operations and formatting output.
def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> str
class TimeIt:
def __init__(self, label: str = "Operation") -> None
def __enter__(self) -> 'TimeIt'
def __exit__(self, exc_type, exc_val, exc_tb) -> Noneformat_seconds: Format seconds into human-readable time strings.
TimeIt: Context manager for timing code blocks with automatic reporting.
Functions for creating multiprocessing managers and communication objects.
def create_sync_manager(use_dill: bool) -> SyncManager
class NonPickledSyncManager:
"""Synchronization manager that doesn't require pickling"""
passcreate_sync_manager: Create a multiprocessing SyncManager with optional dill support.
NonPickledSyncManager: Alternative sync manager for scenarios where pickling is problematic.
from mpire import WorkerPool
from mpire.utils import set_cpu_affinity
import os
import time
def cpu_intensive_task(x):
"""CPU-bound task that benefits from CPU pinning"""
# Show which CPU the process is running on
pid = os.getpid()
print(f"Process {pid} processing {x}")
# CPU-intensive computation
result = 0
for i in range(x * 100000):
result += i
return result
# Pin workers to specific CPUs
cpu_assignments = [0, 1, 2, 3] # Use first 4 CPUs
with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:
results = pool.map(cpu_intensive_task, range(8))
print(f"Results: {results}")
# Manual CPU affinity setting
def set_process_affinity():
pid = os.getpid()
set_cpu_affinity(pid, [0, 2]) # Pin to CPUs 0 and 2
print(f"Process {pid} pinned to CPUs 0 and 2")
set_process_affinity()from mpire import WorkerPool
from mpire.utils import chunk_tasks, get_n_chunks
import time
def quick_task(x):
"""Fast task that benefits from larger chunks"""
return x * 2
def slow_task(x):
"""Slow task that benefits from smaller chunks"""
time.sleep(0.01)
return x ** 2
# Analyze chunking for different scenarios
data = range(1000)
print("=== Chunking Analysis ===")
# Quick tasks - use larger chunks to reduce overhead
quick_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=50))
print(f"Quick task chunks: {len(quick_chunks)} chunks")
print(f"Chunk sizes: {[len(chunk) for chunk in quick_chunks[:5]]}...")
# Slow tasks - use smaller chunks for better load balancing
slow_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=10))
print(f"Slow task chunks: {len(slow_chunks)} chunks")
print(f"Chunk sizes: {[len(chunk) for chunk in slow_chunks[:5]]}...")
# Calculate optimal chunk count
optimal_chunks = get_n_chunks(data, n_jobs=4, n_tasks_per_job=25)
print(f"Optimal chunk count: {optimal_chunks}")
# Test with WorkerPool
with WorkerPool(n_jobs=4) as pool:
# Quick tasks with large chunks
start_time = time.time()
results1 = pool.map(quick_task, data, chunk_size=50)
quick_time = time.time() - start_time
# Slow tasks with small chunks
start_time = time.time()
results2 = pool.map(slow_task, range(100), chunk_size=5)
slow_time = time.time() - start_time
print(f"Quick tasks time: {quick_time:.2f}s")
print(f"Slow tasks time: {slow_time:.2f}s")import numpy as np
from mpire import WorkerPool
from mpire.utils import apply_numpy_chunking
def process_array_chunk(chunk):
"""Process a numpy array chunk"""
# Simulate array processing
return np.sum(chunk ** 2)
# Create large numpy array
large_array = np.random.rand(10000)
print("=== Numpy Chunking ===")
# Apply numpy-specific chunking
chunks = list(apply_numpy_chunking(large_array, n_jobs=4, chunk_size=1000))
print(f"Created {len(chunks)} chunks")
print(f"Chunk shapes: {[chunk.shape for chunk in chunks[:3]]}...")
# Process with WorkerPool
with WorkerPool(n_jobs=4) as pool:
results = pool.map(process_array_chunk, chunks)
total_result = sum(results)
print(f"Total processing result: {total_result:.2f}")
# Compare with direct numpy processing
direct_result = np.sum(large_array ** 2)
print(f"Direct numpy result: {direct_result:.2f}")
print(f"Results match: {abs(total_result - direct_result) < 1e-10}")from mpire import WorkerPool
from mpire.utils import make_single_arguments
def multi_arg_function(a, b, c):
"""Function that expects multiple arguments"""
return a + b * c
def single_arg_function(args):
"""Function that expects a single tuple argument"""
a, b, c = args
return a + b * c
# Original data as tuples
multi_arg_data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]
with WorkerPool(n_jobs=2) as pool:
# Method 1: Use starmap-like functionality (MPIRE handles this automatically)
results1 = pool.map(multi_arg_function, multi_arg_data)
print(f"Multi-arg results: {results1}")
# Method 2: Convert to single arguments if needed
single_args = make_single_arguments(multi_arg_data, generator=False)
results2 = pool.map(single_arg_function, single_args)
print(f"Single-arg results: {results2}")
# Verify results are the same
print(f"Results match: {results1 == results2}")from mpire import WorkerPool
from mpire.utils import TimeIt, format_seconds
import time
def timed_operation(duration):
"""Operation with known duration"""
time.sleep(duration)
return f"Slept for {duration} seconds"
# Time individual operations
with TimeIt("Single operation"):
result = timed_operation(0.5)
# Time parallel operations
with TimeIt("Parallel operations"):
with WorkerPool(n_jobs=3) as pool:
results = pool.map(timed_operation, [0.2, 0.3, 0.4, 0.1, 0.2])
# Manual timing with formatting
start_time = time.time()
with WorkerPool(n_jobs=2) as pool:
results = pool.map(timed_operation, [0.1] * 10)
elapsed_time = time.time() - start_time
formatted_time = format_seconds(elapsed_time, with_milliseconds=True)
print(f"Manual timing: {formatted_time}")
# Timing with different formatting options
test_times = [0.001, 0.1, 1.5, 65.3, 3661.7]
for t in test_times:
with_ms = format_seconds(t, with_milliseconds=True)
without_ms = format_seconds(t, with_milliseconds=False)
print(f"{t:8.3f}s -> With MS: {with_ms:>15} | Without MS: {without_ms:>10}")from mpire import WorkerPool
from mpire.utils import create_sync_manager, NonPickledSyncManager
import multiprocessing
def worker_with_shared_dict(shared_dict, worker_id, items):
"""Worker that updates a shared dictionary"""
for item in items:
shared_dict[f"worker_{worker_id}_item_{item}"] = item ** 2
return len(items)
# Example 1: Standard sync manager
print("=== Standard Sync Manager ===")
with create_sync_manager(use_dill=False) as manager:
shared_dict = manager.dict()
with WorkerPool(n_jobs=3, shared_objects=shared_dict) as pool:
results = pool.map(
worker_with_shared_dict,
[(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9])],
pass_worker_id=False # Pass worker_id manually in args
)
print(f"Processed items: {sum(results)}")
print(f"Shared dict contents: {dict(shared_dict)}")
# Example 2: Dill-enabled sync manager (for complex objects)
print("\n=== Dill Sync Manager ===")
try:
with create_sync_manager(use_dill=True) as manager:
# Create shared objects that might need dill
shared_list = manager.list()
shared_dict = manager.dict()
def complex_worker(shared_objects, data):
shared_list, shared_dict = shared_objects
# Process complex data types
shared_list.append(len(data))
shared_dict[f"len_{len(data)}"] = data
return sum(data)
test_data = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
with WorkerPool(n_jobs=2, shared_objects=(shared_list, shared_dict)) as pool:
results = pool.map(complex_worker, test_data)
print(f"Results: {results}")
print(f"Shared list: {list(shared_list)}")
print(f"Shared dict keys: {list(shared_dict.keys())}")
except ImportError:
print("Dill not available, skipping dill manager example")
# Example 3: Non-pickled manager for special cases
print("\n=== Non-Pickled Manager ===")
def simple_shared_counter():
"""Example using a simple shared counter"""
counter = multiprocessing.Value('i', 0)
def increment_counter(shared_counter, increment):
with shared_counter.get_lock():
shared_counter.value += increment
return shared_counter.value
with WorkerPool(n_jobs=2, shared_objects=counter) as pool:
results = pool.map(increment_counter, [1, 2, 3, 4, 5])
print(f"Final counter value: {counter.value}")
print(f"Worker results: {results}")
simple_shared_counter()from mpire import WorkerPool, cpu_count
from mpire.utils import TimeIt, format_seconds, chunk_tasks, set_cpu_affinity
import numpy as np
import time
def comprehensive_utility_example():
"""Example combining multiple utilities"""
print(f"=== System Information ===")
print(f"Available CPUs: {cpu_count()}")
# Generate test data
data_size = 10000
test_array = np.random.rand(data_size)
def array_processing_task(chunk):
"""Process array chunk with timing"""
start_time = time.time()
result = np.sum(chunk ** 2) + np.mean(chunk)
processing_time = time.time() - start_time
return result, processing_time
# Optimize chunking strategy
n_workers = min(4, cpu_count())
optimal_chunks = list(chunk_tasks(
test_array,
n_jobs=n_workers,
n_tasks_per_job=data_size // (n_workers * 4)
))
print(f"Created {len(optimal_chunks)} chunks for {n_workers} workers")
print(f"Chunk sizes: {[len(chunk) for chunk in optimal_chunks[:3]]}...")
# Process with timing and CPU pinning
with TimeIt("Complete parallel processing"):
cpu_ids = list(range(min(n_workers, cpu_count())))
with WorkerPool(n_jobs=n_workers, cpu_ids=cpu_ids, enable_insights=True) as pool:
results = pool.map(array_processing_task, optimal_chunks)
# Extract results and timings
values, timings = zip(*results)
total_value = sum(values)
avg_chunk_time = np.mean(timings)
print(f"Total processing value: {total_value:.4f}")
print(f"Average chunk processing time: {format_seconds(avg_chunk_time, True)}")
# Show insights
pool.print_insights()
# Compare with serial processing
with TimeIt("Serial processing"):
serial_result, serial_time = array_processing_task(test_array)
print(f"\nComparison:")
print(f"Parallel result: {total_value:.4f}")
print(f"Serial result: {serial_result:.4f}")
print(f"Results match: {abs(total_value - serial_result) < 1e-10}")
print(f"Serial processing time: {format_seconds(serial_time, True)}")
comprehensive_utility_example()Install with Tessl CLI
npx tessl i tessl/pypi-mpire