A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Worker performance monitoring and insights for analyzing multiprocessing efficiency. The insights system provides detailed timing data, task completion statistics, and performance metrics to help optimize parallel processing workloads.
Functions for retrieving and displaying worker performance insights.
def print_insights(self) -> None
def get_insights(self) -> Dict
def get_exit_results(self) -> Listprint_insights: Print formatted performance insights to console with timing breakdown and efficiency metrics.
get_insights: Return performance insights as a dictionary with detailed worker statistics.
get_exit_results: Get results returned by worker exit functions (if any).
Internal class for managing worker performance data.
class WorkerInsights:
def __init__(self, ctx: multiprocessing.context.BaseContext, n_jobs: int, use_dill: bool) -> None
def enable_insights(self) -> None
def disable_insights(self) -> None
def reset_insights(self, start_time: float) -> Nonefrom mpire import WorkerPool
import time
def slow_function(x):
time.sleep(0.1) # Simulate work
return x * x
# Enable insights during pool creation
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
results = pool.map(slow_function, range(20))
# Print formatted insights
pool.print_insights()
# Get insights as dictionary
insights = pool.get_insights()
print("Total tasks:", insights['n_completed_tasks'])
print("Average task time:", insights['avg_task_duration'])from mpire import WorkerPool
import time
import random
def variable_workload(x):
# Simulate variable processing time
sleep_time = random.uniform(0.01, 0.2)
time.sleep(sleep_time)
return x ** 2
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
results = pool.map(variable_workload, range(100), chunk_size=10)
insights = pool.get_insights()
print("=== Performance Insights ===")
print(f"Total completed tasks: {insights['n_completed_tasks']}")
print(f"Total execution time: {insights['total_elapsed_time']:.2f}s")
print(f"Average task duration: {insights['avg_task_duration']:.4f}s")
print(f"Worker efficiency: {insights['efficiency']:.2%}")
# Per-worker statistics
for worker_id in range(4):
worker_stats = insights[f'worker_{worker_id}']
print(f"Worker {worker_id}:")
print(f" Tasks completed: {worker_stats['n_completed_tasks']}")
print(f" Working time: {worker_stats['working_time']:.2f}s")
print(f" Waiting time: {worker_stats['waiting_time']:.2f}s")
print(f" Efficiency: {worker_stats['efficiency']:.2%}")from mpire import WorkerPool
import time
def cpu_bound_task(n):
# CPU-intensive task
result = 0
for i in range(n * 1000):
result += i
return result
def benchmark_configuration(n_jobs, chunk_size, data):
"""Benchmark a specific configuration"""
with WorkerPool(n_jobs=n_jobs, enable_insights=True) as pool:
results = pool.map(cpu_bound_task, data, chunk_size=chunk_size)
insights = pool.get_insights()
return {
'n_jobs': n_jobs,
'chunk_size': chunk_size,
'total_time': insights['total_elapsed_time'],
'efficiency': insights['efficiency'],
'avg_task_time': insights['avg_task_duration']
}
# Test different configurations
data = range(100)
configurations = [
(2, 5), (2, 10), (2, 25),
(4, 5), (4, 10), (4, 25),
(8, 5), (8, 10), (8, 25)
]
results = []
for n_jobs, chunk_size in configurations:
result = benchmark_configuration(n_jobs, chunk_size, data)
results.append(result)
print(f"Jobs: {n_jobs}, Chunk: {chunk_size:2d} => "
f"Time: {result['total_time']:.2f}s, "
f"Efficiency: {result['efficiency']:.2%}")
# Find best configuration
best = min(results, key=lambda x: x['total_time'])
print(f"\nBest configuration: {best['n_jobs']} jobs, chunk size {best['chunk_size']}")def expensive_init(worker_state):
"""Simulate expensive worker initialization"""
import time
time.sleep(1) # Expensive setup
worker_state['start_time'] = time.time()
return "initialization_complete"
def quick_task(worker_state, x):
return x * 2
with WorkerPool(n_jobs=4, use_worker_state=True, enable_insights=True) as pool:
results = pool.map(
quick_task,
range(20),
worker_init=expensive_init,
chunk_size=5
)
insights = pool.get_insights()
print("=== Initialization Impact ===")
print(f"Total init time: {insights['total_init_time']:.2f}s")
print(f"Average init time per worker: {insights['avg_init_time']:.2f}s")
print(f"Total working time: {insights['total_working_time']:.2f}s")
print(f"Init overhead: {insights['init_overhead_percentage']:.1%}")
# Check if initialization time dominates
if insights['init_overhead_percentage'] > 0.5:
print("Consider using keep_alive=True for better performance")import psutil
import os
def memory_intensive_task(size):
"""Task that uses significant memory"""
data = list(range(size * 1000))
return sum(data)
def monitor_memory_usage():
"""Get current memory usage"""
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # MB
# Monitor memory usage with insights
initial_memory = monitor_memory_usage()
with WorkerPool(n_jobs=2, enable_insights=True) as pool:
results = pool.map(memory_intensive_task, range(50))
peak_memory = monitor_memory_usage()
insights = pool.get_insights()
print("=== Memory Usage Analysis ===")
print(f"Initial memory: {initial_memory:.1f} MB")
print(f"Peak memory: {peak_memory:.1f} MB")
print(f"Memory increase: {peak_memory - initial_memory:.1f} MB")
print(f"Tasks completed: {insights['n_completed_tasks']}")
print(f"Memory per task: {(peak_memory - initial_memory) / insights['n_completed_tasks']:.2f} MB")
pool.print_insights()def init_worker(worker_state):
worker_state['processed_items'] = []
worker_state['error_count'] = 0
def process_item(worker_state, item):
try:
result = item * 2
worker_state['processed_items'].append(item)
return result
except Exception:
worker_state['error_count'] += 1
raise
def cleanup_worker(worker_state):
"""Return summary of worker's work"""
return {
'total_processed': len(worker_state['processed_items']),
'error_count': worker_state['error_count'],
'first_item': worker_state['processed_items'][0] if worker_state['processed_items'] else None,
'last_item': worker_state['processed_items'][-1] if worker_state['processed_items'] else None
}
with WorkerPool(n_jobs=3, use_worker_state=True, enable_insights=True) as pool:
results = pool.map(
process_item,
range(30),
worker_init=init_worker,
worker_exit=cleanup_worker,
chunk_size=5
)
# Get worker exit results
exit_results = pool.get_exit_results()
print("=== Worker Exit Results ===")
for i, worker_result in enumerate(exit_results):
if worker_result: # Some workers might not have exit results
print(f"Worker {i}:")
print(f" Processed: {worker_result['total_processed']} items")
print(f" Errors: {worker_result['error_count']}")
print(f" Range: {worker_result['first_item']} to {worker_result['last_item']}")
pool.print_insights()Install with Tessl CLI
npx tessl i tessl/pypi-mpire