CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

performance-insights.mddocs/

Performance Insights

Worker performance monitoring and insights for analyzing multiprocessing efficiency. The insights system provides detailed timing data, task completion statistics, and performance metrics to help optimize parallel processing workloads.

Capabilities

Insights Management

Functions for retrieving and displaying worker performance insights.

def print_insights(self) -> None
def get_insights(self) -> Dict
def get_exit_results(self) -> List

print_insights: Print formatted performance insights to console with timing breakdown and efficiency metrics.

get_insights: Return performance insights as a dictionary with detailed worker statistics.

get_exit_results: Get results returned by worker exit functions (if any).

WorkerInsights Class

Internal class for managing worker performance data.

class WorkerInsights:
    def __init__(self, ctx: multiprocessing.context.BaseContext, n_jobs: int, use_dill: bool) -> None
    def enable_insights(self) -> None
    def disable_insights(self) -> None
    def reset_insights(self, start_time: float) -> None

Usage Examples

Basic Insights Usage

from mpire import WorkerPool
import time

def slow_function(x):
    time.sleep(0.1)  # Simulate work
    return x * x

# Enable insights during pool creation
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
    results = pool.map(slow_function, range(20))
    
    # Print formatted insights
    pool.print_insights()
    
    # Get insights as dictionary
    insights = pool.get_insights()
    print("Total tasks:", insights['n_completed_tasks'])
    print("Average task time:", insights['avg_task_duration'])

Detailed Performance Analysis

from mpire import WorkerPool
import time
import random

def variable_workload(x):
    # Simulate variable processing time
    sleep_time = random.uniform(0.01, 0.2)
    time.sleep(sleep_time)
    return x ** 2

with WorkerPool(n_jobs=4, enable_insights=True) as pool:
    results = pool.map(variable_workload, range(100), chunk_size=10)
    
    insights = pool.get_insights()
    
    print("=== Performance Insights ===")
    print(f"Total completed tasks: {insights['n_completed_tasks']}")
    print(f"Total execution time: {insights['total_elapsed_time']:.2f}s")
    print(f"Average task duration: {insights['avg_task_duration']:.4f}s")
    print(f"Worker efficiency: {insights['efficiency']:.2%}")
    
    # Per-worker statistics
    for worker_id in range(4):
        worker_stats = insights[f'worker_{worker_id}']
        print(f"Worker {worker_id}:")
        print(f"  Tasks completed: {worker_stats['n_completed_tasks']}")
        print(f"  Working time: {worker_stats['working_time']:.2f}s")
        print(f"  Waiting time: {worker_stats['waiting_time']:.2f}s")
        print(f"  Efficiency: {worker_stats['efficiency']:.2%}")

Comparing Different Configurations

from mpire import WorkerPool
import time

def cpu_bound_task(n):
    # CPU-intensive task
    result = 0
    for i in range(n * 1000):
        result += i
    return result

def benchmark_configuration(n_jobs, chunk_size, data):
    """Benchmark a specific configuration"""
    with WorkerPool(n_jobs=n_jobs, enable_insights=True) as pool:
        results = pool.map(cpu_bound_task, data, chunk_size=chunk_size)
        insights = pool.get_insights()
        
        return {
            'n_jobs': n_jobs,
            'chunk_size': chunk_size,
            'total_time': insights['total_elapsed_time'],
            'efficiency': insights['efficiency'],
            'avg_task_time': insights['avg_task_duration']
        }

# Test different configurations
data = range(100)
configurations = [
    (2, 5), (2, 10), (2, 25),
    (4, 5), (4, 10), (4, 25),
    (8, 5), (8, 10), (8, 25)
]

results = []
for n_jobs, chunk_size in configurations:
    result = benchmark_configuration(n_jobs, chunk_size, data)
    results.append(result)
    print(f"Jobs: {n_jobs}, Chunk: {chunk_size:2d} => "
          f"Time: {result['total_time']:.2f}s, "
          f"Efficiency: {result['efficiency']:.2%}")

# Find best configuration
best = min(results, key=lambda x: x['total_time'])
print(f"\nBest configuration: {best['n_jobs']} jobs, chunk size {best['chunk_size']}")

Worker Initialization Insights

def expensive_init(worker_state):
    """Simulate expensive worker initialization"""
    import time
    time.sleep(1)  # Expensive setup
    worker_state['start_time'] = time.time()
    return "initialization_complete"

def quick_task(worker_state, x):
    return x * 2

with WorkerPool(n_jobs=4, use_worker_state=True, enable_insights=True) as pool:
    results = pool.map(
        quick_task,
        range(20),
        worker_init=expensive_init,
        chunk_size=5
    )
    
    insights = pool.get_insights()
    
    print("=== Initialization Impact ===")
    print(f"Total init time: {insights['total_init_time']:.2f}s")
    print(f"Average init time per worker: {insights['avg_init_time']:.2f}s")
    print(f"Total working time: {insights['total_working_time']:.2f}s")
    print(f"Init overhead: {insights['init_overhead_percentage']:.1%}")
    
    # Check if initialization time dominates
    if insights['init_overhead_percentage'] > 0.5:
        print("Consider using keep_alive=True for better performance")

Memory and Resource Insights

import psutil
import os

def memory_intensive_task(size):
    """Task that uses significant memory"""
    data = list(range(size * 1000))
    return sum(data)

def monitor_memory_usage():
    """Get current memory usage"""
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # MB

# Monitor memory usage with insights
initial_memory = monitor_memory_usage()

with WorkerPool(n_jobs=2, enable_insights=True) as pool:
    results = pool.map(memory_intensive_task, range(50))
    
    peak_memory = monitor_memory_usage()
    insights = pool.get_insights()
    
    print("=== Memory Usage Analysis ===")
    print(f"Initial memory: {initial_memory:.1f} MB")
    print(f"Peak memory: {peak_memory:.1f} MB")
    print(f"Memory increase: {peak_memory - initial_memory:.1f} MB")
    print(f"Tasks completed: {insights['n_completed_tasks']}")
    print(f"Memory per task: {(peak_memory - initial_memory) / insights['n_completed_tasks']:.2f} MB")
    
    pool.print_insights()

Worker Exit Results

def init_worker(worker_state):
    worker_state['processed_items'] = []
    worker_state['error_count'] = 0

def process_item(worker_state, item):
    try:
        result = item * 2
        worker_state['processed_items'].append(item)
        return result
    except Exception:
        worker_state['error_count'] += 1
        raise

def cleanup_worker(worker_state):
    """Return summary of worker's work"""
    return {
        'total_processed': len(worker_state['processed_items']),
        'error_count': worker_state['error_count'],
        'first_item': worker_state['processed_items'][0] if worker_state['processed_items'] else None,
        'last_item': worker_state['processed_items'][-1] if worker_state['processed_items'] else None
    }

with WorkerPool(n_jobs=3, use_worker_state=True, enable_insights=True) as pool:
    results = pool.map(
        process_item,
        range(30),
        worker_init=init_worker,
        worker_exit=cleanup_worker,
        chunk_size=5
    )
    
    # Get worker exit results
    exit_results = pool.get_exit_results()
    
    print("=== Worker Exit Results ===")
    for i, worker_result in enumerate(exit_results):
        if worker_result:  # Some workers might not have exit results
            print(f"Worker {i}:")
            print(f"  Processed: {worker_result['total_processed']} items")
            print(f"  Errors: {worker_result['error_count']}")
            print(f"  Range: {worker_result['first_item']} to {worker_result['last_item']}")
    
    pool.print_insights()

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json