CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

utility-functions.mddocs/

Utility Functions

Utility functions for task chunking, CPU affinity management, timing operations, and other helper functionality. These utilities provide fine-grained control over multiprocessing performance and resource management.

Capabilities

CPU and System Utilities

Functions for managing CPU resources and system information.

def cpu_count() -> int
def set_cpu_affinity(pid: int, mask: List[int]) -> None

cpu_count: Get the number of available CPU cores (imported from multiprocessing).

set_cpu_affinity: Set CPU affinity for a process to specific CPU cores.

  • pid (int): Process ID to set affinity for
  • mask (List[int]): List of CPU core IDs to bind the process to

Task Chunking Utilities

Functions for optimizing task distribution and chunking strategies.

def chunk_tasks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
                n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,
                chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator

def apply_numpy_chunking(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
                        n_jobs: Optional[int] = None, n_tasks_per_job: Optional[int] = None,
                        chunk_size: Optional[int] = None, n_splits: Optional[int] = None) -> Generator

def get_n_chunks(iterable_of_args: Iterable, iterable_len: Optional[int] = None,
                chunk_size: Optional[int] = None, n_jobs: Optional[int] = None,
                n_tasks_per_job: Optional[int] = None, n_splits: Optional[int] = None) -> int

chunk_tasks: Split an iterable into optimally-sized chunks for parallel processing.

apply_numpy_chunking: Apply numpy-specific chunking optimizations for array processing.

get_n_chunks: Calculate the optimal number of chunks for given parameters.

Argument Processing

Functions for preparing arguments for parallel processing.

def make_single_arguments(iterable_of_args: Iterable, generator: bool = True) -> Union[List, Generator]

make_single_arguments: Convert multi-argument tuples to single arguments for functions expecting individual parameters.

Time and Formatting Utilities

Functions for timing operations and formatting output.

def format_seconds(seconds: Optional[Union[int, float]], with_milliseconds: bool) -> str

class TimeIt:
    def __init__(self, label: str = "Operation") -> None
    def __enter__(self) -> 'TimeIt'
    def __exit__(self, exc_type, exc_val, exc_tb) -> None

format_seconds: Format seconds into human-readable time strings.

TimeIt: Context manager for timing code blocks with automatic reporting.

Manager and Communication Utilities

Functions for creating multiprocessing managers and communication objects.

def create_sync_manager(use_dill: bool) -> SyncManager

class NonPickledSyncManager:
    """Synchronization manager that doesn't require pickling"""
    pass

create_sync_manager: Create a multiprocessing SyncManager with optional dill support.

NonPickledSyncManager: Alternative sync manager for scenarios where pickling is problematic.

Usage Examples

CPU Affinity Management

from mpire import WorkerPool
from mpire.utils import set_cpu_affinity
import os
import time

def cpu_intensive_task(x):
    """CPU-bound task that benefits from CPU pinning"""
    # Show which CPU the process is running on
    pid = os.getpid()
    print(f"Process {pid} processing {x}")
    
    # CPU-intensive computation
    result = 0
    for i in range(x * 100000):
        result += i
    return result

# Pin workers to specific CPUs
cpu_assignments = [0, 1, 2, 3]  # Use first 4 CPUs

with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:
    results = pool.map(cpu_intensive_task, range(8))
    print(f"Results: {results}")

# Manual CPU affinity setting
def set_process_affinity():
    pid = os.getpid()
    set_cpu_affinity(pid, [0, 2])  # Pin to CPUs 0 and 2
    print(f"Process {pid} pinned to CPUs 0 and 2")

set_process_affinity()

Task Chunking Optimization

from mpire import WorkerPool
from mpire.utils import chunk_tasks, get_n_chunks
import time

def quick_task(x):
    """Fast task that benefits from larger chunks"""
    return x * 2

def slow_task(x):
    """Slow task that benefits from smaller chunks"""
    time.sleep(0.01)
    return x ** 2

# Analyze chunking for different scenarios
data = range(1000)

print("=== Chunking Analysis ===")

# Quick tasks - use larger chunks to reduce overhead
quick_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=50))
print(f"Quick task chunks: {len(quick_chunks)} chunks")
print(f"Chunk sizes: {[len(chunk) for chunk in quick_chunks[:5]]}...")

# Slow tasks - use smaller chunks for better load balancing
slow_chunks = list(chunk_tasks(data, n_jobs=4, n_tasks_per_job=10))
print(f"Slow task chunks: {len(slow_chunks)} chunks")
print(f"Chunk sizes: {[len(chunk) for chunk in slow_chunks[:5]]}...")

# Calculate optimal chunk count
optimal_chunks = get_n_chunks(data, n_jobs=4, n_tasks_per_job=25)
print(f"Optimal chunk count: {optimal_chunks}")

# Test with WorkerPool
with WorkerPool(n_jobs=4) as pool:
    # Quick tasks with large chunks
    start_time = time.time()
    results1 = pool.map(quick_task, data, chunk_size=50)
    quick_time = time.time() - start_time
    
    # Slow tasks with small chunks  
    start_time = time.time()
    results2 = pool.map(slow_task, range(100), chunk_size=5)
    slow_time = time.time() - start_time
    
    print(f"Quick tasks time: {quick_time:.2f}s")
    print(f"Slow tasks time: {slow_time:.2f}s")

Numpy Array Chunking

import numpy as np
from mpire import WorkerPool
from mpire.utils import apply_numpy_chunking

def process_array_chunk(chunk):
    """Process a numpy array chunk"""
    # Simulate array processing
    return np.sum(chunk ** 2)

# Create large numpy array
large_array = np.random.rand(10000)

print("=== Numpy Chunking ===")

# Apply numpy-specific chunking
chunks = list(apply_numpy_chunking(large_array, n_jobs=4, chunk_size=1000))
print(f"Created {len(chunks)} chunks")
print(f"Chunk shapes: {[chunk.shape for chunk in chunks[:3]]}...")

# Process with WorkerPool
with WorkerPool(n_jobs=4) as pool:
    results = pool.map(process_array_chunk, chunks)
    total_result = sum(results)
    print(f"Total processing result: {total_result:.2f}")

# Compare with direct numpy processing
direct_result = np.sum(large_array ** 2)
print(f"Direct numpy result: {direct_result:.2f}")
print(f"Results match: {abs(total_result - direct_result) < 1e-10}")

Argument Processing

from mpire import WorkerPool
from mpire.utils import make_single_arguments

def multi_arg_function(a, b, c):
    """Function that expects multiple arguments"""
    return a + b * c

def single_arg_function(args):
    """Function that expects a single tuple argument"""
    a, b, c = args
    return a + b * c

# Original data as tuples
multi_arg_data = [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12)]

with WorkerPool(n_jobs=2) as pool:
    # Method 1: Use starmap-like functionality (MPIRE handles this automatically)
    results1 = pool.map(multi_arg_function, multi_arg_data)
    print(f"Multi-arg results: {results1}")
    
    # Method 2: Convert to single arguments if needed
    single_args = make_single_arguments(multi_arg_data, generator=False)
    results2 = pool.map(single_arg_function, single_args)
    print(f"Single-arg results: {results2}")
    
    # Verify results are the same
    print(f"Results match: {results1 == results2}")

Timing Operations

from mpire import WorkerPool
from mpire.utils import TimeIt, format_seconds
import time

def timed_operation(duration):
    """Operation with known duration"""
    time.sleep(duration)
    return f"Slept for {duration} seconds"

# Time individual operations
with TimeIt("Single operation"):
    result = timed_operation(0.5)

# Time parallel operations
with TimeIt("Parallel operations"):
    with WorkerPool(n_jobs=3) as pool:
        results = pool.map(timed_operation, [0.2, 0.3, 0.4, 0.1, 0.2])

# Manual timing with formatting
start_time = time.time()
with WorkerPool(n_jobs=2) as pool:
    results = pool.map(timed_operation, [0.1] * 10)
elapsed_time = time.time() - start_time

formatted_time = format_seconds(elapsed_time, with_milliseconds=True)
print(f"Manual timing: {formatted_time}")

# Timing with different formatting options
test_times = [0.001, 0.1, 1.5, 65.3, 3661.7]
for t in test_times:
    with_ms = format_seconds(t, with_milliseconds=True)
    without_ms = format_seconds(t, with_milliseconds=False)
    print(f"{t:8.3f}s -> With MS: {with_ms:>15} | Without MS: {without_ms:>10}")

Custom Sync Manager Usage

from mpire import WorkerPool
from mpire.utils import create_sync_manager, NonPickledSyncManager
import multiprocessing

def worker_with_shared_dict(shared_dict, worker_id, items):
    """Worker that updates a shared dictionary"""
    for item in items:
        shared_dict[f"worker_{worker_id}_item_{item}"] = item ** 2
    return len(items)

# Example 1: Standard sync manager
print("=== Standard Sync Manager ===")
with create_sync_manager(use_dill=False) as manager:
    shared_dict = manager.dict()
    
    with WorkerPool(n_jobs=3, shared_objects=shared_dict) as pool:
        results = pool.map(
            worker_with_shared_dict,
            [(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9])],
            pass_worker_id=False  # Pass worker_id manually in args
        )
    
    print(f"Processed items: {sum(results)}")
    print(f"Shared dict contents: {dict(shared_dict)}")

# Example 2: Dill-enabled sync manager (for complex objects)
print("\n=== Dill Sync Manager ===")
try:
    with create_sync_manager(use_dill=True) as manager:
        # Create shared objects that might need dill
        shared_list = manager.list()
        shared_dict = manager.dict()
        
        def complex_worker(shared_objects, data):
            shared_list, shared_dict = shared_objects
            # Process complex data types
            shared_list.append(len(data))
            shared_dict[f"len_{len(data)}"] = data
            return sum(data)
        
        test_data = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
        
        with WorkerPool(n_jobs=2, shared_objects=(shared_list, shared_dict)) as pool:
            results = pool.map(complex_worker, test_data)
        
        print(f"Results: {results}")
        print(f"Shared list: {list(shared_list)}")
        print(f"Shared dict keys: {list(shared_dict.keys())}")

except ImportError:
    print("Dill not available, skipping dill manager example")

# Example 3: Non-pickled manager for special cases
print("\n=== Non-Pickled Manager ===")
def simple_shared_counter():
    """Example using a simple shared counter"""
    counter = multiprocessing.Value('i', 0)
    
    def increment_counter(shared_counter, increment):
        with shared_counter.get_lock():
            shared_counter.value += increment
        return shared_counter.value
    
    with WorkerPool(n_jobs=2, shared_objects=counter) as pool:
        results = pool.map(increment_counter, [1, 2, 3, 4, 5])
    
    print(f"Final counter value: {counter.value}")
    print(f"Worker results: {results}")

simple_shared_counter()

Combined Utility Usage

from mpire import WorkerPool, cpu_count
from mpire.utils import TimeIt, format_seconds, chunk_tasks, set_cpu_affinity
import numpy as np
import time

def comprehensive_utility_example():
    """Example combining multiple utilities"""
    
    print(f"=== System Information ===")
    print(f"Available CPUs: {cpu_count()}")
    
    # Generate test data
    data_size = 10000
    test_array = np.random.rand(data_size)
    
    def array_processing_task(chunk):
        """Process array chunk with timing"""
        start_time = time.time()
        result = np.sum(chunk ** 2) + np.mean(chunk)
        processing_time = time.time() - start_time
        return result, processing_time
    
    # Optimize chunking strategy
    n_workers = min(4, cpu_count())
    optimal_chunks = list(chunk_tasks(
        test_array, 
        n_jobs=n_workers, 
        n_tasks_per_job=data_size // (n_workers * 4)
    ))
    
    print(f"Created {len(optimal_chunks)} chunks for {n_workers} workers")
    print(f"Chunk sizes: {[len(chunk) for chunk in optimal_chunks[:3]]}...")
    
    # Process with timing and CPU pinning
    with TimeIt("Complete parallel processing"):
        cpu_ids = list(range(min(n_workers, cpu_count())))
        
        with WorkerPool(n_jobs=n_workers, cpu_ids=cpu_ids, enable_insights=True) as pool:
            results = pool.map(array_processing_task, optimal_chunks)
            
            # Extract results and timings
            values, timings = zip(*results)
            total_value = sum(values)
            avg_chunk_time = np.mean(timings)
            
            print(f"Total processing value: {total_value:.4f}")
            print(f"Average chunk processing time: {format_seconds(avg_chunk_time, True)}")
            
            # Show insights
            pool.print_insights()
    
    # Compare with serial processing
    with TimeIt("Serial processing"):
        serial_result, serial_time = array_processing_task(test_array)
    
    print(f"\nComparison:")
    print(f"Parallel result: {total_value:.4f}")
    print(f"Serial result: {serial_result:.4f}")
    print(f"Results match: {abs(total_value - serial_result) < 1e-10}")
    print(f"Serial processing time: {format_seconds(serial_time, True)}")

comprehensive_utility_example()

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json