CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

apply-functions.mddocs/

Apply Functions

Apply-style parallel execution for single function calls and asynchronous operations. These functions are ideal for submitting individual tasks rather than processing iterables.

Capabilities

Synchronous Apply

Execute a single function call synchronously in a worker process.

def apply(self, func: Callable, args: Any = (), kwargs: Dict = None, 
          callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,
          worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
          task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
          worker_exit_timeout: Optional[float] = None) -> Any

Parameters:

  • func (Callable): Function to execute
  • args (Any): Positional arguments for the function. Default: ()
  • kwargs (Dict): Keyword arguments for the function. Default: None
  • callback (Optional[Callable]): Function called with result when task succeeds
  • error_callback (Optional[Callable]): Function called with exception when task fails
  • worker_init (Optional[Callable]): Function called when worker starts
  • worker_exit (Optional[Callable]): Function called when worker exits
  • task_timeout (Optional[float]): Timeout in seconds for the task
  • worker_init_timeout (Optional[float]): Timeout for worker initialization
  • worker_exit_timeout (Optional[float]): Timeout for worker exit

Returns: The result of the function call

Asynchronous Apply

Execute a single function call asynchronously and return an AsyncResult object.

def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,
                callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,
                worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
                task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
                worker_exit_timeout: Optional[float] = None) -> AsyncResult

Parameters: Same as apply() method

Returns: AsyncResult object for retrieving the result when ready

Usage Examples

Basic Apply Operations

from mpire import WorkerPool

def expensive_calculation(x, y, multiplier=1):
    import time
    time.sleep(1)  # Simulate expensive work
    return (x + y) * multiplier

with WorkerPool(n_jobs=4) as pool:
    # Synchronous apply - blocks until result is ready
    result = pool.apply(expensive_calculation, args=(10, 20), kwargs={'multiplier': 2})
    print(result)  # 60
    
    # Asynchronous apply - returns immediately
    async_result = pool.apply_async(expensive_calculation, args=(5, 15), kwargs={'multiplier': 3})
    # Do other work...
    result = async_result.get()  # Blocks until ready
    print(result)  # 60

Callback Functions

def process_data(data):
    # Some processing
    return data.upper()

def success_callback(result):
    print(f"Task completed successfully: {result}")

def error_callback(exception):
    print(f"Task failed with error: {exception}")

with WorkerPool(n_jobs=4) as pool:
    # Apply with callbacks
    result = pool.apply(
        process_data,
        args=("hello world",),
        callback=success_callback,
        error_callback=error_callback
    )
    
    # Async apply with callbacks
    async_result = pool.apply_async(
        process_data,
        args=("hello async",),
        callback=success_callback,
        error_callback=error_callback
    )

Multiple Async Tasks

from mpire import WorkerPool

def compute_factorial(n):
    import math
    return math.factorial(n)

with WorkerPool(n_jobs=4) as pool:
    # Submit multiple async tasks
    async_results = []
    for i in range(10, 20):
        result = pool.apply_async(compute_factorial, args=(i,))
        async_results.append(result)
    
    # Collect results as they become available
    results = []
    for async_result in async_results:
        result = async_result.get(timeout=10)  # 10 second timeout
        results.append(result)
    
    print("Factorials:", results)

Worker State with Apply

def init_worker(worker_state):
    worker_state['counter'] = 0

def increment_counter(worker_state, value):
    worker_state['counter'] += 1
    return worker_state['counter'] * value

with WorkerPool(n_jobs=2, use_worker_state=True) as pool:
    # Each apply call will reuse the same worker state
    result1 = pool.apply(increment_counter, args=(10,), worker_init=init_worker)
    result2 = pool.apply(increment_counter, args=(20,))
    result3 = pool.apply(increment_counter, args=(30,))
    
    print(f"Results: {result1}, {result2}, {result3}")  # Results depend on worker assignment

Error Handling

def risky_function(x):
    if x < 0:
        raise ValueError("Negative values not allowed")
    return x ** 2

def handle_error(exception):
    print(f"Caught exception: {type(exception).__name__}: {exception}")

with WorkerPool(n_jobs=2) as pool:
    try:
        # This will succeed
        result = pool.apply(risky_function, args=(5,))
        print(f"Success: {result}")
        
        # This will fail
        result = pool.apply(risky_function, args=(-3,), error_callback=handle_error)
    except Exception as e:
        print(f"Apply failed: {e}")
    
    # Async version with error handling
    async_result = pool.apply_async(risky_function, args=(-5,), error_callback=handle_error)
    try:
        result = async_result.get()
    except Exception as e:
        print(f"Async apply failed: {e}")

Timeouts

def slow_function(duration):
    import time
    time.sleep(duration)
    return f"Slept for {duration} seconds"

with WorkerPool(n_jobs=2) as pool:
    try:
        # This will succeed
        result = pool.apply(slow_function, args=(1,), task_timeout=5.0)
        print(result)
        
        # This will timeout
        result = pool.apply(slow_function, args=(10,), task_timeout=2.0)
        print(result)
    except TimeoutError:
        print("Task timed out")
    
    # Async version with timeout
    async_result = pool.apply_async(slow_function, args=(3,), task_timeout=5.0)
    try:
        result = async_result.get(timeout=2.0)  # Different timeout for getting result
        print(result)
    except TimeoutError:
        print("Getting result timed out")

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json