CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

worker-configuration.mddocs/

Worker Configuration

Advanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions. These features enable fine-tuned control over worker behavior and resource management.

Capabilities

Worker Settings Configuration

Dynamic configuration methods for modifying worker behavior after WorkerPool creation.

def pass_on_worker_id(self, pass_on: bool = True) -> None
def set_shared_objects(self, shared_objects: Any = None) -> None
def set_use_worker_state(self, use_worker_state: bool = True) -> None
def set_keep_alive(self, keep_alive: bool = True) -> None
def set_order_tasks(self, order_tasks: bool = True) -> None

pass_on_worker_id: Configure whether to pass worker ID as the first argument to target functions.

set_shared_objects: Set or update shared objects that are passed to all workers (copy-on-write with fork).

set_use_worker_state: Enable or disable worker state functionality.

set_keep_alive: Configure whether to keep workers alive between map operations.

set_order_tasks: Configure whether tasks are distributed to workers in order.

Usage Examples

Worker ID Access

from mpire import WorkerPool

def process_with_id(worker_id, data):
    print(f"Worker {worker_id} processing: {data}")
    return f"worker_{worker_id}_{data}"

# Enable worker ID passing during initialization
with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:
    results = pool.map(process_with_id, range(10))

# Or enable it dynamically
with WorkerPool(n_jobs=3) as pool:
    pool.pass_on_worker_id(True)
    results = pool.map(process_with_id, range(10))

Shared Objects

import numpy as np
from mpire import WorkerPool

# Large shared data structure
shared_data = {
    'lookup_table': np.random.rand(1000, 1000),
    'constants': {'pi': 3.14159, 'e': 2.71828},
    'config': {'threshold': 0.5, 'max_iterations': 100}
}

def process_with_shared(shared_objects, item):
    # Access shared data without copying
    threshold = shared_objects['config']['threshold']
    lookup = shared_objects['lookup_table'][item % 1000]
    return (lookup > threshold).sum()

# Set shared objects during initialization
with WorkerPool(n_jobs=4, shared_objects=shared_data) as pool:
    results = pool.map(process_with_shared, range(100))

# Or set them dynamically
with WorkerPool(n_jobs=4) as pool:
    pool.set_shared_objects(shared_data)
    results = pool.map(process_with_shared, range(100))

Worker State Management

def init_worker_state(worker_state):
    """Initialize worker with persistent state"""
    import sqlite3
    worker_state['db'] = sqlite3.connect(':memory:')
    worker_state['processed_count'] = 0
    worker_state['cache'] = {}

def process_with_state(worker_state, item):
    """Process item using worker state"""
    worker_state['processed_count'] += 1
    
    # Use cache
    if item in worker_state['cache']:
        return worker_state['cache'][item]
    
    # Expensive computation
    result = item ** 2
    worker_state['cache'][item] = result
    
    return result

def cleanup_worker_state(worker_state):
    """Clean up worker state"""
    worker_state['db'].close()
    print(f"Worker processed {worker_state['processed_count']} items")

# Enable worker state during initialization
with WorkerPool(n_jobs=3, use_worker_state=True) as pool:
    results = pool.map(
        process_with_state,
        range(100),
        worker_init=init_worker_state,
        worker_exit=cleanup_worker_state
    )

# Or enable it dynamically
with WorkerPool(n_jobs=3) as pool:
    pool.set_use_worker_state(True)
    results = pool.map(
        process_with_state,
        range(100),
        worker_init=init_worker_state,
        worker_exit=cleanup_worker_state
    )

Worker Reuse with Keep Alive

def expensive_init():
    """Simulate expensive initialization"""
    import time
    time.sleep(2)  # Expensive setup
    return "initialized"

def init_worker(worker_state):
    worker_state['resource'] = expensive_init()

def process_item(worker_state, item):
    return f"{worker_state['resource']}_{item}"

# Workers stay alive between operations
pool = WorkerPool(n_jobs=2, use_worker_state=True, keep_alive=True)

# First operation - workers initialize
results1 = pool.map(process_item, range(5), worker_init=init_worker)

# Second operation - reuses existing workers (no re-initialization)
results2 = pool.map(process_item, range(5, 10))

# Cleanup
pool.stop_and_join()

# Or configure dynamically
pool = WorkerPool(n_jobs=2, use_worker_state=True)
pool.set_keep_alive(True)
# ... use pool ...
pool.stop_and_join()

Task Ordering

def process_with_order_info(worker_id, item):
    return f"Worker {worker_id} got item {item}"

# Tasks distributed in order: worker 0 gets first chunk, worker 1 gets second, etc.
with WorkerPool(n_jobs=3, pass_worker_id=True, order_tasks=True) as pool:
    results = pool.map(process_with_order_info, range(15), chunk_size=5)
    for result in results:
        print(result)

# Configure ordering dynamically
with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:
    pool.set_order_tasks(True)
    results = pool.map(process_with_order_info, range(15), chunk_size=5)

CPU Pinning

# Pin workers to specific CPUs during initialization
cpu_assignments = [0, 1, 2, 3]  # One CPU per worker
with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:
    results = pool.map(cpu_intensive_function, range(100))

# Pin all workers to the same CPUs
cpu_set = [0, 1]  # Workers can use CPU 0 or 1
with WorkerPool(n_jobs=4, cpu_ids=cpu_set) as pool:
    results = pool.map(cpu_intensive_function, range(100))

# Different CPU sets per worker
cpu_per_worker = [[0], [1], [2, 3], [4, 5]]  # Worker-specific CPU assignments
with WorkerPool(n_jobs=4, cpu_ids=cpu_per_worker) as pool:
    results = pool.map(cpu_intensive_function, range(100))

Combined Configuration

def comprehensive_example():
    """Example showing multiple configuration options together"""
    
    # Shared data
    shared_resources = {
        'model_weights': load_model_weights(),
        'lookup_tables': load_lookup_tables()
    }
    
    def init_worker(worker_id, shared_objects, worker_state):
        worker_state['session_id'] = f"session_{worker_id}"
        worker_state['processed'] = 0
        print(f"Worker {worker_id} initialized with shared resources")
    
    def process_item(worker_id, shared_objects, worker_state, item):
        worker_state['processed'] += 1
        model = shared_objects['model_weights']
        result = f"Worker {worker_id} processed {item} (total: {worker_state['processed']})"
        return result
    
    def cleanup_worker(worker_id, shared_objects, worker_state):
        print(f"Worker {worker_id} processed {worker_state['processed']} items total")
    
    with WorkerPool(
        n_jobs=4,
        cpu_ids=[0, 1, 2, 3],  # CPU pinning
        shared_objects=shared_resources,  # Shared data
        pass_worker_id=True,  # Pass worker ID
        use_worker_state=True,  # Enable worker state
        keep_alive=True,  # Reuse workers
        order_tasks=True,  # Ordered task distribution
        enable_insights=True  # Performance monitoring
    ) as pool:
        
        # First batch
        results1 = pool.map(
            process_item,
            range(20),
            worker_init=init_worker,
            worker_exit=cleanup_worker,
            chunk_size=5
        )
        
        # Second batch reuses workers
        results2 = pool.map(process_item, range(20, 40), chunk_size=5)
        
        # Print performance insights
        pool.print_insights()

comprehensive_example()

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json