CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

dashboard-integration.mddocs/

Dashboard Integration

Optional web dashboard for monitoring multiprocessing jobs with real-time progress tracking and performance visualization. The dashboard provides a web interface for monitoring MPIRE worker pools and their progress in real-time.

Capabilities

Dashboard Management

Functions for starting, connecting to, and shutting down the MPIRE dashboard.

def start_dashboard(port_range: Sequence = range(8080, 8100)) -> Dict[str, Union[int, str]]
def shutdown_dashboard() -> None
def connect_to_dashboard(manager_port_nr: int, manager_host: Optional[Union[bytes, str]] = None) -> None

start_dashboard: Start the dashboard server on an available port within the specified range.

  • port_range (Sequence): Range of ports to try for the dashboard server
  • Returns: Dictionary with dashboard URL and port information

shutdown_dashboard: Stop the running dashboard server.

connect_to_dashboard: Connect a WorkerPool to an existing dashboard for monitoring.

  • manager_port_nr (int): Port number of the dashboard manager
  • manager_host (Optional[Union[bytes, str]]): Host address of the dashboard manager

Dashboard Utilities

Utility functions for dashboard configuration and management.

def get_stacklevel() -> int
def set_stacklevel(stacklevel: int) -> None

get_stacklevel: Get the current stack level for dashboard function detection.

set_stacklevel: Set the stack level for dashboard function detection.

Usage Examples

Basic Dashboard Usage

from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time

def slow_computation(x):
    time.sleep(0.1)  # Simulate work
    return x ** 2

# Start the dashboard
dashboard_info = start_dashboard()
print(f"Dashboard started at: {dashboard_info['dashboard_url']}")

try:
    # Use WorkerPool with dashboard progress tracking
    with WorkerPool(n_jobs=4) as pool:
        results = pool.map(
            slow_computation,
            range(100),
            progress_bar=True,
            progress_bar_style='dashboard'  # Use dashboard progress bar
        )
    
    print("Processing completed!")
    
finally:
    # Shutdown dashboard when done
    shutdown_dashboard()

Multiple WorkerPools with Dashboard

from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard, connect_to_dashboard
import time
import threading

def cpu_task(x):
    # CPU-intensive task
    result = 0
    for i in range(x * 1000):
        result += i
    return result

def io_task(x):
    # I/O simulation
    time.sleep(0.05)
    return f"processed_{x}"

# Start dashboard
dashboard_info = start_dashboard()
print(f"Dashboard available at: {dashboard_info['dashboard_url']}")

def run_cpu_pool():
    """Run CPU-intensive tasks"""
    with WorkerPool(n_jobs=2) as pool:
        connect_to_dashboard(dashboard_info['manager_port'])
        results = pool.map(
            cpu_task,
            range(50),
            progress_bar=True,
            progress_bar_style='dashboard',
            progress_bar_options={'desc': 'CPU Tasks'}
        )

def run_io_pool():
    """Run I/O tasks"""
    with WorkerPool(n_jobs=4) as pool:
        connect_to_dashboard(dashboard_info['manager_port'])
        results = pool.map(
            io_task,
            range(100),
            progress_bar=True,
            progress_bar_style='dashboard',
            progress_bar_options={'desc': 'I/O Tasks'}
        )

try:
    # Run both pools concurrently
    cpu_thread = threading.Thread(target=run_cpu_pool)
    io_thread = threading.Thread(target=run_io_pool)
    
    cpu_thread.start()
    io_thread.start()
    
    cpu_thread.join()
    io_thread.join()
    
    print("All tasks completed!")
    
finally:
    shutdown_dashboard()

Dashboard with Custom Progress Options

from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
import random

def variable_workload(task_id):
    # Simulate variable processing time
    sleep_time = random.uniform(0.05, 0.5)
    time.sleep(sleep_time)
    return f"Task {task_id} completed in {sleep_time:.2f}s"

# Start dashboard with custom port range
dashboard_info = start_dashboard(port_range=range(9000, 9010))
print(f"Dashboard URL: {dashboard_info['dashboard_url']}")

try:
    with WorkerPool(n_jobs=3, enable_insights=True) as pool:
        results = pool.map(
            variable_workload,
            range(50),
            progress_bar=True,
            progress_bar_style='dashboard',
            progress_bar_options={
                'desc': 'Variable Workload',
                'unit': 'tasks',
                'colour': 'green',
                'position': 0,
                'leave': True
            }
        )
    
    print("Processing complete!")
    
finally:
    shutdown_dashboard()

Dashboard with Worker Insights

from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
import numpy as np

def data_processing_task(data_chunk):
    """Simulate data processing with varying complexity"""
    # Simulate different processing complexities
    complexity = len(data_chunk)
    time.sleep(complexity * 0.01)
    
    # Process the data
    result = np.sum(data_chunk) if len(data_chunk) > 0 else 0
    return result

# Generate test data
np.random.seed(42)
data_chunks = [np.random.rand(size) for size in np.random.randint(10, 100, 30)]

# Start dashboard
dashboard_info = start_dashboard()
print(f"Monitor progress at: {dashboard_info['dashboard_url']}")

try:
    with WorkerPool(n_jobs=4, enable_insights=True) as pool:
        results = pool.map(
            data_processing_task,
            data_chunks,
            progress_bar=True,
            progress_bar_style='dashboard',
            progress_bar_options={
                'desc': 'Data Processing',
                'unit': 'chunks',
                'miniters': 1,
                'mininterval': 0.1
            }
        )
    
    # Print insights after completion
    pool.print_insights()
    print(f"Processed {len(results)} data chunks")
    
finally:
    shutdown_dashboard()

Dashboard Error Handling

from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time

def unreliable_task(x):
    time.sleep(0.1)
    # Simulate occasional failures
    if x % 10 == 7:  # Fail on multiples of 7
        raise RuntimeError(f"Task {x} failed")
    return x * 2

# Try to start dashboard with error handling
try:
    dashboard_info = start_dashboard()
    dashboard_started = True
    print(f"Dashboard started: {dashboard_info['dashboard_url']}")
except Exception as e:
    print(f"Could not start dashboard: {e}")
    print("Continuing without dashboard...")
    dashboard_started = False

try:
    with WorkerPool(n_jobs=3) as pool:
        # Use dashboard style if available, otherwise fall back to standard
        progress_style = 'dashboard' if dashboard_started else 'std'
        
        try:
            results = pool.map(
                unreliable_task,
                range(50),
                progress_bar=True,
                progress_bar_style=progress_style,
                progress_bar_options={'desc': 'Unreliable Tasks'}
            )
            print(f"Completed {len(results)} tasks successfully")
            
        except Exception as e:
            print(f"Some tasks failed: {e}")
            # Continue processing with error handling...

finally:
    if dashboard_started:
        shutdown_dashboard()

Dashboard with Multiple Progress Bars

from mpire import WorkerPool
from mpire.dashboard import start_dashboard, shutdown_dashboard
import time
import concurrent.futures

def preprocessing_task(x):
    time.sleep(0.05)
    return x * 2

def main_processing_task(x):
    time.sleep(0.1)
    return x ** 2

def postprocessing_task(x):
    time.sleep(0.03)
    return x + 10

# Start dashboard
dashboard_info = start_dashboard()
print(f"Monitor at: {dashboard_info['dashboard_url']}")

try:
    # Create data pipeline with multiple stages
    input_data = range(30)
    
    # Stage 1: Preprocessing
    with WorkerPool(n_jobs=2) as pool:
        preprocessed = pool.map(
            preprocessing_task,
            input_data,
            progress_bar=True,
            progress_bar_style='dashboard',
            progress_bar_options={'desc': 'Preprocessing', 'position': 0}
        )
    
    # Stage 2: Main processing
    with WorkerPool(n_jobs=3) as pool:
        processed = pool.map(
            main_processing_task,
            preprocessed,
            progress_bar=True,
            progress_bar_style='dashboard',
            progress_bar_options={'desc': 'Main Processing', 'position': 1}
        )
    
    # Stage 3: Postprocessing
    with WorkerPool(n_jobs=2) as pool:
        final_results = pool.map(
            postprocessing_task,
            processed,
            progress_bar=True,
            progress_bar_style='dashboard',
            progress_bar_options={'desc': 'Postprocessing', 'position': 2}
        )
    
    print(f"Pipeline completed! Final results: {final_results[:5]}...")

finally:
    shutdown_dashboard()

Dashboard Installation Check

def check_dashboard_availability():
    """Check if dashboard dependencies are available"""
    try:
        from mpire.dashboard import start_dashboard, shutdown_dashboard
        return True
    except ImportError:
        return False

def run_with_optional_dashboard():
    """Run processing with dashboard if available"""
    
    dashboard_available = check_dashboard_availability()
    dashboard_started = False
    
    if dashboard_available:
        try:
            from mpire.dashboard import start_dashboard, shutdown_dashboard
            dashboard_info = start_dashboard()
            dashboard_started = True
            print(f"Dashboard available at: {dashboard_info['dashboard_url']}")
        except Exception as e:
            print(f"Dashboard start failed: {e}")
    else:
        print("Dashboard dependencies not installed. Install with: pip install mpire[dashboard]")
    
    # Processing function
    def compute_task(x):
        import time
        time.sleep(0.1)
        return x ** 2
    
    try:
        from mpire import WorkerPool
        
        with WorkerPool(n_jobs=4) as pool:
            progress_style = 'dashboard' if dashboard_started else 'std'
            
            results = pool.map(
                compute_task,
                range(20),
                progress_bar=True,
                progress_bar_style=progress_style
            )
        
        print(f"Completed {len(results)} tasks")
    
    finally:
        if dashboard_started:
            shutdown_dashboard()

# Run the example
run_with_optional_dashboard()

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json