A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Map-style parallel execution functions for processing iterables across multiple workers. Includes ordered and unordered variants, plus iterator versions for memory-efficient processing of large datasets.
Process iterables in parallel while maintaining result order.
def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
concatenate_numpy_output: bool = True, worker_init: Optional[Callable] = None,
worker_exit: Optional[Callable] = None, task_timeout: Optional[float] = None,
worker_init_timeout: Optional[float] = None, worker_exit_timeout: Optional[float] = None,
progress_bar_options: Optional[Dict[str, Any]] = None,
progress_bar_style: Optional[str] = None) -> AnyParameters:
func (Callable): Function to apply to each itemiterable_of_args (Union[Sized, Iterable]): Arguments to processiterable_len (Optional[int]): Length of iterable if not sizedmax_tasks_active (Optional[int]): Maximum number of active tasks to prevent memory issueschunk_size (Optional[int]): Number of tasks per chunk for worker processingn_splits (Optional[int]): Number of splits for automatic chunkingworker_lifespan (Optional[int]): Number of tasks before worker restartprogress_bar (bool): Show progress bar during executionconcatenate_numpy_output (bool): Whether to concatenate numpy array outputsprogress_bar_options (Optional[Dict]): Custom tqdm progress bar optionsprogress_bar_style (Optional[str]): Progress bar style ('std', 'notebook', 'dashboard')enable_insights (bool): Enable worker performance insightsworker_init (Optional[Callable]): Function called when worker startsworker_exit (Optional[Callable]): Function called when worker exitstask_timeout (Optional[float]): Timeout in seconds for individual tasksworker_init_timeout (Optional[float]): Timeout for worker initializationworker_exit_timeout (Optional[float]): Timeout for worker exitProcess iterables in parallel without maintaining result order for better performance.
def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
progress_bar_options: Optional[Dict[str, Any]] = None,
progress_bar_style: Optional[str] = None, enable_insights: bool = False,
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
worker_exit_timeout: Optional[float] = None) -> ListMemory-efficient iterator versions that yield results as they become available.
def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
progress_bar_options: Optional[Dict[str, Any]] = None,
progress_bar_style: Optional[str] = None, enable_insights: bool = False,
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
worker_exit_timeout: Optional[float] = None) -> Iterator
def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable],
iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
worker_lifespan: Optional[int] = None, progress_bar: bool = False,
progress_bar_options: Optional[Dict[str, Any]] = None,
progress_bar_style: Optional[str] = None, enable_insights: bool = False,
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
worker_exit_timeout: Optional[float] = None) -> Iteratorfrom mpire import WorkerPool
def square(x):
return x * x
with WorkerPool(n_jobs=4) as pool:
# Ordered results
results = pool.map(square, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Unordered results (potentially faster)
results = pool.map_unordered(square, range(10))
print(sorted(results)) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]# Memory-efficient processing of large datasets
with WorkerPool(n_jobs=4) as pool:
# Process results as they become available
for result in pool.imap(expensive_function, large_dataset):
process_result(result)
# Unordered iterator for maximum performance
for result in pool.imap_unordered(expensive_function, large_dataset):
process_result(result)# Basic progress bar
with WorkerPool(n_jobs=4) as pool:
results = pool.map(slow_function, range(100), progress_bar=True)
# Custom progress bar options
progress_options = {
'desc': 'Processing items',
'unit': 'items',
'disable': False
}
with WorkerPool(n_jobs=4) as pool:
results = pool.map(
slow_function,
range(100),
progress_bar=True,
progress_bar_options=progress_options
)# Manual chunk size control
with WorkerPool(n_jobs=4) as pool:
results = pool.map(
quick_function,
range(10000),
chunk_size=50 # Process 50 items per chunk
)
# Automatic chunking with splits
with WorkerPool(n_jobs=4) as pool:
results = pool.map(
function,
data,
n_splits=20 # Split into 20 chunks automatically
)
# Memory management with active task limit
with WorkerPool(n_jobs=4) as pool:
results = pool.map(
memory_intensive_function,
large_dataset,
max_tasks_active=8 # Limit active tasks to prevent memory issues
)def init_worker(worker_state):
"""Initialize worker with expensive resources"""
worker_state['model'] = load_machine_learning_model()
worker_state['database'] = connect_to_database()
def exit_worker(worker_state):
"""Clean up worker resources"""
worker_state['database'].close()
def process_item(worker_state, item):
"""Process item using worker state"""
prediction = worker_state['model'].predict(item)
worker_state['database'].save_result(item, prediction)
return prediction
with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
results = pool.map(
process_item,
items,
worker_init=init_worker,
worker_exit=exit_worker,
worker_lifespan=100 # Restart workers every 100 tasks
)# Function that might hang
def unreliable_function(x):
import random, time
if random.random() < 0.1: # 10% chance of hanging
time.sleep(1000)
return x * 2
with WorkerPool(n_jobs=4) as pool:
results = pool.map(
unreliable_function,
range(100),
task_timeout=5.0, # 5 second timeout per task
worker_init_timeout=10.0, # 10 second worker init timeout
worker_exit_timeout=5.0 # 5 second worker exit timeout
)Install with Tessl CLI
npx tessl i tessl/pypi-mpire