A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Advanced worker configuration including CPU pinning, shared objects, worker state management, and initialization/exit functions. These features enable fine-tuned control over worker behavior and resource management.
Dynamic configuration methods for modifying worker behavior after WorkerPool creation.
def pass_on_worker_id(self, pass_on: bool = True) -> None
def set_shared_objects(self, shared_objects: Any = None) -> None
def set_use_worker_state(self, use_worker_state: bool = True) -> None
def set_keep_alive(self, keep_alive: bool = True) -> None
def set_order_tasks(self, order_tasks: bool = True) -> Nonepass_on_worker_id: Configure whether to pass worker ID as the first argument to target functions.
set_shared_objects: Set or update shared objects that are passed to all workers (copy-on-write with fork).
set_use_worker_state: Enable or disable worker state functionality.
set_keep_alive: Configure whether to keep workers alive between map operations.
set_order_tasks: Configure whether tasks are distributed to workers in order.
from mpire import WorkerPool
def process_with_id(worker_id, data):
print(f"Worker {worker_id} processing: {data}")
return f"worker_{worker_id}_{data}"
# Enable worker ID passing during initialization
with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:
results = pool.map(process_with_id, range(10))
# Or enable it dynamically
with WorkerPool(n_jobs=3) as pool:
pool.pass_on_worker_id(True)
results = pool.map(process_with_id, range(10))import numpy as np
from mpire import WorkerPool
# Large shared data structure
shared_data = {
'lookup_table': np.random.rand(1000, 1000),
'constants': {'pi': 3.14159, 'e': 2.71828},
'config': {'threshold': 0.5, 'max_iterations': 100}
}
def process_with_shared(shared_objects, item):
# Access shared data without copying
threshold = shared_objects['config']['threshold']
lookup = shared_objects['lookup_table'][item % 1000]
return (lookup > threshold).sum()
# Set shared objects during initialization
with WorkerPool(n_jobs=4, shared_objects=shared_data) as pool:
results = pool.map(process_with_shared, range(100))
# Or set them dynamically
with WorkerPool(n_jobs=4) as pool:
pool.set_shared_objects(shared_data)
results = pool.map(process_with_shared, range(100))def init_worker_state(worker_state):
"""Initialize worker with persistent state"""
import sqlite3
worker_state['db'] = sqlite3.connect(':memory:')
worker_state['processed_count'] = 0
worker_state['cache'] = {}
def process_with_state(worker_state, item):
"""Process item using worker state"""
worker_state['processed_count'] += 1
# Use cache
if item in worker_state['cache']:
return worker_state['cache'][item]
# Expensive computation
result = item ** 2
worker_state['cache'][item] = result
return result
def cleanup_worker_state(worker_state):
"""Clean up worker state"""
worker_state['db'].close()
print(f"Worker processed {worker_state['processed_count']} items")
# Enable worker state during initialization
with WorkerPool(n_jobs=3, use_worker_state=True) as pool:
results = pool.map(
process_with_state,
range(100),
worker_init=init_worker_state,
worker_exit=cleanup_worker_state
)
# Or enable it dynamically
with WorkerPool(n_jobs=3) as pool:
pool.set_use_worker_state(True)
results = pool.map(
process_with_state,
range(100),
worker_init=init_worker_state,
worker_exit=cleanup_worker_state
)def expensive_init():
"""Simulate expensive initialization"""
import time
time.sleep(2) # Expensive setup
return "initialized"
def init_worker(worker_state):
worker_state['resource'] = expensive_init()
def process_item(worker_state, item):
return f"{worker_state['resource']}_{item}"
# Workers stay alive between operations
pool = WorkerPool(n_jobs=2, use_worker_state=True, keep_alive=True)
# First operation - workers initialize
results1 = pool.map(process_item, range(5), worker_init=init_worker)
# Second operation - reuses existing workers (no re-initialization)
results2 = pool.map(process_item, range(5, 10))
# Cleanup
pool.stop_and_join()
# Or configure dynamically
pool = WorkerPool(n_jobs=2, use_worker_state=True)
pool.set_keep_alive(True)
# ... use pool ...
pool.stop_and_join()def process_with_order_info(worker_id, item):
return f"Worker {worker_id} got item {item}"
# Tasks distributed in order: worker 0 gets first chunk, worker 1 gets second, etc.
with WorkerPool(n_jobs=3, pass_worker_id=True, order_tasks=True) as pool:
results = pool.map(process_with_order_info, range(15), chunk_size=5)
for result in results:
print(result)
# Configure ordering dynamically
with WorkerPool(n_jobs=3, pass_worker_id=True) as pool:
pool.set_order_tasks(True)
results = pool.map(process_with_order_info, range(15), chunk_size=5)# Pin workers to specific CPUs during initialization
cpu_assignments = [0, 1, 2, 3] # One CPU per worker
with WorkerPool(n_jobs=4, cpu_ids=cpu_assignments) as pool:
results = pool.map(cpu_intensive_function, range(100))
# Pin all workers to the same CPUs
cpu_set = [0, 1] # Workers can use CPU 0 or 1
with WorkerPool(n_jobs=4, cpu_ids=cpu_set) as pool:
results = pool.map(cpu_intensive_function, range(100))
# Different CPU sets per worker
cpu_per_worker = [[0], [1], [2, 3], [4, 5]] # Worker-specific CPU assignments
with WorkerPool(n_jobs=4, cpu_ids=cpu_per_worker) as pool:
results = pool.map(cpu_intensive_function, range(100))def comprehensive_example():
"""Example showing multiple configuration options together"""
# Shared data
shared_resources = {
'model_weights': load_model_weights(),
'lookup_tables': load_lookup_tables()
}
def init_worker(worker_id, shared_objects, worker_state):
worker_state['session_id'] = f"session_{worker_id}"
worker_state['processed'] = 0
print(f"Worker {worker_id} initialized with shared resources")
def process_item(worker_id, shared_objects, worker_state, item):
worker_state['processed'] += 1
model = shared_objects['model_weights']
result = f"Worker {worker_id} processed {item} (total: {worker_state['processed']})"
return result
def cleanup_worker(worker_id, shared_objects, worker_state):
print(f"Worker {worker_id} processed {worker_state['processed']} items total")
with WorkerPool(
n_jobs=4,
cpu_ids=[0, 1, 2, 3], # CPU pinning
shared_objects=shared_resources, # Shared data
pass_worker_id=True, # Pass worker ID
use_worker_state=True, # Enable worker state
keep_alive=True, # Reuse workers
order_tasks=True, # Ordered task distribution
enable_insights=True # Performance monitoring
) as pool:
# First batch
results1 = pool.map(
process_item,
range(20),
worker_init=init_worker,
worker_exit=cleanup_worker,
chunk_size=5
)
# Second batch reuses workers
results2 = pool.map(process_item, range(20, 40), chunk_size=5)
# Print performance insights
pool.print_insights()
comprehensive_example()Install with Tessl CLI
npx tessl i tessl/pypi-mpire