CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

parallel-map.mddocs/

Parallel Map Functions

Map-style parallel execution functions for processing iterables across multiple workers. Includes ordered and unordered variants, plus iterator versions for memory-efficient processing of large datasets.

Capabilities

Ordered Map Functions

Process iterables in parallel while maintaining result order.

def map(self, func: Callable, iterable_of_args: Union[Sized, Iterable], 
        iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
        chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
        worker_lifespan: Optional[int] = None, progress_bar: bool = False,
        concatenate_numpy_output: bool = True, worker_init: Optional[Callable] = None,
        worker_exit: Optional[Callable] = None, task_timeout: Optional[float] = None,
        worker_init_timeout: Optional[float] = None, worker_exit_timeout: Optional[float] = None,
        progress_bar_options: Optional[Dict[str, Any]] = None,
        progress_bar_style: Optional[str] = None) -> Any

Parameters:

  • func (Callable): Function to apply to each item
  • iterable_of_args (Union[Sized, Iterable]): Arguments to process
  • iterable_len (Optional[int]): Length of iterable if not sized
  • max_tasks_active (Optional[int]): Maximum number of active tasks to prevent memory issues
  • chunk_size (Optional[int]): Number of tasks per chunk for worker processing
  • n_splits (Optional[int]): Number of splits for automatic chunking
  • worker_lifespan (Optional[int]): Number of tasks before worker restart
  • progress_bar (bool): Show progress bar during execution
  • concatenate_numpy_output (bool): Whether to concatenate numpy array outputs
  • progress_bar_options (Optional[Dict]): Custom tqdm progress bar options
  • progress_bar_style (Optional[str]): Progress bar style ('std', 'notebook', 'dashboard')
  • enable_insights (bool): Enable worker performance insights
  • worker_init (Optional[Callable]): Function called when worker starts
  • worker_exit (Optional[Callable]): Function called when worker exits
  • task_timeout (Optional[float]): Timeout in seconds for individual tasks
  • worker_init_timeout (Optional[float]): Timeout for worker initialization
  • worker_exit_timeout (Optional[float]): Timeout for worker exit

Unordered Map Functions

Process iterables in parallel without maintaining result order for better performance.

def map_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], 
                  iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
                  chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
                  worker_lifespan: Optional[int] = None, progress_bar: bool = False,
                  progress_bar_options: Optional[Dict[str, Any]] = None,
                  progress_bar_style: Optional[str] = None, enable_insights: bool = False,
                  worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
                  task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
                  worker_exit_timeout: Optional[float] = None) -> List

Iterator Map Functions

Memory-efficient iterator versions that yield results as they become available.

def imap(self, func: Callable, iterable_of_args: Union[Sized, Iterable], 
         iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
         chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
         worker_lifespan: Optional[int] = None, progress_bar: bool = False,
         progress_bar_options: Optional[Dict[str, Any]] = None,
         progress_bar_style: Optional[str] = None, enable_insights: bool = False,
         worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
         task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
         worker_exit_timeout: Optional[float] = None) -> Iterator

def imap_unordered(self, func: Callable, iterable_of_args: Union[Sized, Iterable], 
                   iterable_len: Optional[int] = None, max_tasks_active: Optional[int] = None,
                   chunk_size: Optional[int] = None, n_splits: Optional[int] = None,
                   worker_lifespan: Optional[int] = None, progress_bar: bool = False,
                   progress_bar_options: Optional[Dict[str, Any]] = None,
                   progress_bar_style: Optional[str] = None, enable_insights: bool = False,
                   worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
                   task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
                   worker_exit_timeout: Optional[float] = None) -> Iterator

Usage Examples

Basic Map Operations

from mpire import WorkerPool

def square(x):
    return x * x

with WorkerPool(n_jobs=4) as pool:
    # Ordered results
    results = pool.map(square, range(10))
    print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    
    # Unordered results (potentially faster)
    results = pool.map_unordered(square, range(10))
    print(sorted(results))  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Iterator Processing

# Memory-efficient processing of large datasets
with WorkerPool(n_jobs=4) as pool:
    # Process results as they become available
    for result in pool.imap(expensive_function, large_dataset):
        process_result(result)
    
    # Unordered iterator for maximum performance
    for result in pool.imap_unordered(expensive_function, large_dataset):
        process_result(result)

Progress Tracking

# Basic progress bar
with WorkerPool(n_jobs=4) as pool:
    results = pool.map(slow_function, range(100), progress_bar=True)

# Custom progress bar options
progress_options = {
    'desc': 'Processing items',
    'unit': 'items',
    'disable': False
}

with WorkerPool(n_jobs=4) as pool:
    results = pool.map(
        slow_function, 
        range(100), 
        progress_bar=True,
        progress_bar_options=progress_options
    )

Task Chunking and Performance Tuning

# Manual chunk size control
with WorkerPool(n_jobs=4) as pool:
    results = pool.map(
        quick_function, 
        range(10000), 
        chunk_size=50  # Process 50 items per chunk
    )

# Automatic chunking with splits
with WorkerPool(n_jobs=4) as pool:
    results = pool.map(
        function, 
        data, 
        n_splits=20  # Split into 20 chunks automatically
    )

# Memory management with active task limit
with WorkerPool(n_jobs=4) as pool:
    results = pool.map(
        memory_intensive_function, 
        large_dataset, 
        max_tasks_active=8  # Limit active tasks to prevent memory issues
    )

Worker Lifecycle Management

def init_worker(worker_state):
    """Initialize worker with expensive resources"""
    worker_state['model'] = load_machine_learning_model()
    worker_state['database'] = connect_to_database()

def exit_worker(worker_state):
    """Clean up worker resources"""
    worker_state['database'].close()

def process_item(worker_state, item):
    """Process item using worker state"""
    prediction = worker_state['model'].predict(item)
    worker_state['database'].save_result(item, prediction)
    return prediction

with WorkerPool(n_jobs=4, use_worker_state=True) as pool:
    results = pool.map(
        process_item,
        items,
        worker_init=init_worker,
        worker_exit=exit_worker,
        worker_lifespan=100  # Restart workers every 100 tasks
    )

Timeout Management

# Function that might hang
def unreliable_function(x):
    import random, time
    if random.random() < 0.1:  # 10% chance of hanging
        time.sleep(1000)
    return x * 2

with WorkerPool(n_jobs=4) as pool:
    results = pool.map(
        unreliable_function,
        range(100),
        task_timeout=5.0,  # 5 second timeout per task
        worker_init_timeout=10.0,  # 10 second worker init timeout
        worker_exit_timeout=5.0   # 5 second worker exit timeout
    )

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json