A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization. The dashboard provides a web interface for monitoring MPIRE worker pools and their progress in real-time.
Functions for starting, connecting to, and shutting down the MPIRE dashboard.
def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]
def shutdown_dashboard() -> None
def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> Nonestart_dashboard: Start the dashboard server on an available port within the specified range.
port_range (Sequence): Range of ports to try for the dashboard servershutdown_dashboard: Stop the running dashboard server.
connect_to_dashboard: Connect a WorkerPool to an existing dashboard for monitoring.
manager_port_nr (int): Port number of the dashboard managermanager_host (Optional[Union[bytes, str]]): Host address of the dashboard managerUtility functions for dashboard configuration and management.
def get_stacklevel() -> int
def set_stacklevel(stacklevel: int) -> Noneget_stacklevel: Get the current stack level for dashboard function detection.
set_stacklevel: Set the stack level for dashboard function detection.
from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
def slow_computation(x):
time.sleep(0.1) # Simulate work
return x ** 2
# Start the dashboard
dashboard_info = start_dashboard()
print(f"Dashboard started at: {dashboard_info['dashboard_url']}")
try:
# Use WorkerPool with dashboard progress tracking
with WorkerPool(n_jobs=4) as pool:
results = pool.map(
slow_computation,
range(100),
progress_bar=True,
progress_bar_style='dashboard' # Use dashboard progress bar
)
print("Processing completed!")
finally:
# Shutdown dashboard when done
shutdown_dashboard()from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard, connect_to_dashboard
import time
import threading
def cpu_task(x):
# CPU-intensive task
result = 0
for i in range(x * 1000):
result += i
return result
def io_task(x):
# I/O simulation
time.sleep(0.05)
return f"processed_{x}"
# Start dashboard
dashboard_info = start_dashboard()
print(f"Dashboard available at: {dashboard_info['dashboard_url']}")
def run_cpu_pool():
"""Run CPU-intensive tasks"""
with WorkerPool(n_jobs=2) as pool:
connect_to_dashboard(dashboard_info['manager_port'])
results = pool.map(
cpu_task,
range(50),
progress_bar=True,
progress_bar_style='dashboard',
progress_bar_options={'desc': 'CPU Tasks'}
)
def run_io_pool():
"""Run I/O tasks"""
with WorkerPool(n_jobs=4) as pool:
connect_to_dashboard(dashboard_info['manager_port'])
results = pool.map(
io_task,
range(100),
progress_bar=True,
progress_bar_style='dashboard',
progress_bar_options={'desc': 'I/O Tasks'}
)
try:
# Run both pools concurrently
cpu_thread = threading.Thread(target=run_cpu_pool)
io_thread = threading.Thread(target=run_io_pool)
cpu_thread.start()
io_thread.start()
cpu_thread.join()
io_thread.join()
print("All tasks completed!")
finally:
shutdown_dashboard()from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
import random
def variable_workload(task_id):
# Simulate variable processing time
sleep_time = random.uniform(0.05, 0.5)
time.sleep(sleep_time)
return f"Task {task_id} completed in {sleep_time:.2f}s"
# Start dashboard with custom port range
dashboard_info = start_dashboard(port_range=range(9000, 9010))
print(f"Dashboard URL: {dashboard_info['dashboard_url']}")
try:
with WorkerPool(n_jobs=3, enable_insights=True) as pool:
results = pool.map(
variable_workload,
range(50),
progress_bar=True,
progress_bar_style='dashboard',
progress_bar_options={
'desc': 'Variable Workload',
'unit': 'tasks',
'colour': 'green',
'position': 0,
'leave': True
}
)
print("Processing complete!")
finally:
shutdown_dashboard()from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
import numpy as np
def data_processing_task(data_chunk):
"""Simulate data processing with varying complexity"""
# Simulate different processing complexities
complexity = len(data_chunk)
time.sleep(complexity * 0.01)
# Process the data
result = np.sum(data_chunk) if len(data_chunk) > 0 else 0
return result
# Generate test data
np.random.seed(42)
data_chunks = [np.random.rand(size) for size in np.random.randint(10, 100, 30)]
# Start dashboard
dashboard_info = start_dashboard()
print(f"Monitor progress at: {dashboard_info['dashboard_url']}")
try:
with WorkerPool(n_jobs=4, enable_insights=True) as pool:
results = pool.map(
data_processing_task,
data_chunks,
progress_bar=True,
progress_bar_style='dashboard',
progress_bar_options={
'desc': 'Data Processing',
'unit': 'chunks',
'miniters': 1,
'mininterval': 0.1
}
)
# Print insights after completion
pool.print_insights()
print(f"Processed {len(results)} data chunks")
finally:
shutdown_dashboard()from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
def unreliable_task(x):
time.sleep(0.1)
# Simulate occasional failures
if x % 10 == 7: # Fail on multiples of 7
raise RuntimeError(f"Task {x} failed")
return x * 2
# Try to start dashboard with error handling
try:
dashboard_info = start_dashboard()
dashboard_started = True
print(f"Dashboard started: {dashboard_info['dashboard_url']}")
except Exception as e:
print(f"Could not start dashboard: {e}")
print("Continuing without dashboard...")
dashboard_started = False
try:
with WorkerPool(n_jobs=3) as pool:
# Use dashboard style if available, otherwise fall back to standard
progress_style = 'dashboard' if dashboard_started else 'std'
try:
results = pool.map(
unreliable_task,
range(50),
progress_bar=True,
progress_bar_style=progress_style,
progress_bar_options={'desc': 'Unreliable Tasks'}
)
print(f"Completed {len(results)} tasks successfully")
except Exception as e:
print(f"Some tasks failed: {e}")
# Continue processing with error handling...
finally:
if dashboard_started:
shutdown_dashboard()from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
import concurrent.futures
def preprocessing_task(x):
time.sleep(0.05)
return x * 2
def main_processing_task(x):
time.sleep(0.1)
return x ** 2
def postprocessing_task(x):
time.sleep(0.03)
return x + 10
# Start dashboard
dashboard_info = start_dashboard()
print(f"Monitor at: {dashboard_info['dashboard_url']}")
try:
# Create data pipeline with multiple stages
input_data = range(30)
# Stage 1: Preprocessing
with WorkerPool(n_jobs=2) as pool:
preprocessed = pool.map(
preprocessing_task,
input_data,
progress_bar=True,
progress_bar_style='dashboard',
progress_bar_options={'desc': 'Preprocessing', 'position': 0}
)
# Stage 2: Main processing
with WorkerPool(n_jobs=3) as pool:
processed = pool.map(
main_processing_task,
preprocessed,
progress_bar=True,
progress_bar_style='dashboard',
progress_bar_options={'desc': 'Main Processing', 'position': 1}
)
# Stage 3: Postprocessing
with WorkerPool(n_jobs=2) as pool:
final_results = pool.map(
postprocessing_task,
processed,
progress_bar=True,
progress_bar_style='dashboard',
progress_bar_options={'desc': 'Postprocessing', 'position': 2}
)
print(f"Pipeline completed! Final results: {final_results[:5]}...")
finally:
shutdown_dashboard()def check_dashboard_availability():
"""Check if dashboard dependencies are available"""
try:
from mpire.dashboard import start_dashboard, shutdown_dashboard
return True
except ImportError:
return False
def run_with_optional_dashboard():
"""Run processing with dashboard if available"""
dashboard_available = check_dashboard_availability()
dashboard_started = False
if dashboard_available:
try:
from mpire.dashboard import start_dashboard, shutdown_dashboard
dashboard_info = start_dashboard()
dashboard_started = True
print(f"Dashboard available at: {dashboard_info['dashboard_url']}")
except Exception as e:
print(f"Dashboard start failed: {e}")
else:
print("Dashboard dependencies not installed. Install with: pip install mpire[dashboard]")
# Processing function
def compute_task(x):
import time
time.sleep(0.1)
return x ** 2
try:
from mpire import WorkerPool
with WorkerPool(n_jobs=4) as pool:
progress_style = 'dashboard' if dashboard_started else 'std'
results = pool.map(
compute_task,
range(20),
progress_bar=True,
progress_bar_style=progress_style
)
print(f"Completed {len(results)} tasks")
finally:
if dashboard_started:
shutdown_dashboard()
# Run the example
run_with_optional_dashboard()Install with Tessl CLI
npx tessl i tessl/pypi-mpire