CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpire

A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.

Pending
Overview
Eval results
Files

async-results.mddocs/

Async Results

Asynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available. The AsyncResult system provides fine-grained control over task execution and result retrieval.

Capabilities

AsyncResult Class

Main class for handling asynchronous results from apply_async operations.

class AsyncResult:
    def __init__(self, cache: Dict, callback: Optional[Callable], error_callback: Optional[Callable],
                 job_id: Optional[int] = None, delete_from_cache: bool = True, 
                 timeout: Optional[float] = None) -> None
    def ready(self) -> bool
    def successful(self) -> bool
    def get(self, timeout: Optional[float] = None) -> Any
    def wait(self, timeout: Optional[float] = None) -> None

ready: Check if the task has completed (either successfully or with error).

successful: Check if the task completed successfully (only valid after ready() returns True).

get: Retrieve the result, blocking until ready. Raises the exception if task failed.

  • timeout (Optional[float]): Maximum time to wait for result in seconds

wait: Wait for the task to complete without retrieving the result.

  • timeout (Optional[float]): Maximum time to wait in seconds

AsyncResult Iterator Classes

Iterator classes for handling collections of async results.

class UnorderedAsyncResultIterator:
    def __init__(self, cache: Dict, job_ids: List[int]) -> None
    def __iter__(self) -> Iterator
    def __next__(self) -> Any

class AsyncResultWithExceptionGetter(AsyncResult):
    """AsyncResult subclass with enhanced exception handling"""
    pass

class UnorderedAsyncExitResultIterator(UnorderedAsyncResultIterator):
    """Iterator for worker exit results"""
    pass

Usage Examples

Basic AsyncResult Usage

from mpire import WorkerPool
import time

def slow_computation(x):
    time.sleep(x * 0.1)
    return x ** 2

with WorkerPool(n_jobs=4) as pool:
    # Submit async task
    async_result = pool.apply_async(slow_computation, args=(5,))
    
    # Do other work while task runs
    print("Task submitted, doing other work...")
    time.sleep(0.2)
    
    # Check if ready
    if async_result.ready():
        print("Task completed!")
        if async_result.successful():
            result = async_result.get()
            print(f"Result: {result}")
    else:
        print("Task still running, waiting...")
        result = async_result.get()  # Block until ready
        print(f"Result: {result}")

Multiple Async Tasks

from mpire import WorkerPool
import time

def factorial(n):
    if n <= 1:
        return 1
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

with WorkerPool(n_jobs=3) as pool:
    # Submit multiple async tasks
    tasks = []
    for i in range(1, 11):
        async_result = pool.apply_async(factorial, args=(i,))
        tasks.append((i, async_result))
    
    # Process results as they become available
    completed = []
    while len(completed) < len(tasks):
        for i, (input_val, async_result) in enumerate(tasks):
            if i not in completed and async_result.ready():
                if async_result.successful():
                    result = async_result.get()
                    print(f"Factorial of {input_val} = {result}")
                else:
                    print(f"Task {input_val} failed")
                completed.append(i)
        
        time.sleep(0.01)  # Small delay to prevent busy waiting

Timeout Handling

from mpire import WorkerPool
import time

def unreliable_task(duration):
    time.sleep(duration)
    return f"Completed after {duration} seconds"

with WorkerPool(n_jobs=2) as pool:
    # Submit tasks with different durations
    fast_task = pool.apply_async(unreliable_task, args=(1,))
    slow_task = pool.apply_async(unreliable_task, args=(5,))
    
    # Get results with timeout
    try:
        result1 = fast_task.get(timeout=2.0)
        print(f"Fast task: {result1}")
    except TimeoutError:
        print("Fast task timed out")
    
    try:
        result2 = slow_task.get(timeout=2.0)
        print(f"Slow task: {result2}")
    except TimeoutError:
        print("Slow task timed out")
        
    # Wait for slow task without timeout
    print("Waiting for slow task to complete...")
    result2 = slow_task.get()  # No timeout
    print(f"Slow task finally completed: {result2}")

Callback Functions with AsyncResult

from mpire import WorkerPool
import time

def process_data(data):
    time.sleep(0.5)
    if data < 0:
        raise ValueError(f"Negative data not allowed: {data}")
    return data * 2

def success_callback(result):
    print(f"✓ Task succeeded with result: {result}")

def error_callback(exception):
    print(f"✗ Task failed with error: {type(exception).__name__}: {exception}")

with WorkerPool(n_jobs=2) as pool:
    # Submit tasks with callbacks
    tasks = []
    test_data = [1, 2, -1, 3, -2, 4]
    
    for data in test_data:
        async_result = pool.apply_async(
            process_data,
            args=(data,),
            callback=success_callback,
            error_callback=error_callback
        )
        tasks.append(async_result)
    
    # Wait for all tasks to complete
    for async_result in tasks:
        async_result.wait()
    
    print("All tasks completed")

Conditional Result Processing

from mpire import WorkerPool
import time
import random

def random_computation(x):
    # Simulate variable processing time
    sleep_time = random.uniform(0.1, 1.0)
    time.sleep(sleep_time)
    
    # Occasionally fail
    if random.random() < 0.2:
        raise RuntimeError(f"Random failure for input {x}")
    
    return x ** 2

with WorkerPool(n_jobs=3) as pool:
    # Submit batch of tasks
    async_results = []
    for i in range(10):
        result = pool.apply_async(random_computation, args=(i,))
        async_results.append(result)
    
    # Process results with different strategies
    successful_results = []
    failed_results = []
    
    for i, async_result in enumerate(async_results):
        async_result.wait()  # Wait for completion
        
        if async_result.successful():
            result = async_result.get()
            successful_results.append((i, result))
            print(f"✓ Task {i}: {result}")
        else:
            try:
                async_result.get()  # This will raise the exception
            except Exception as e:
                failed_results.append((i, str(e)))
                print(f"✗ Task {i}: {e}")
    
    print(f"\nSummary: {len(successful_results)} succeeded, {len(failed_results)} failed")

Polling for Results

from mpire import WorkerPool
import time

def long_running_task(task_id):
    # Simulate different task durations
    duration = task_id * 0.5
    time.sleep(duration)
    return f"Task {task_id} completed after {duration}s"

with WorkerPool(n_jobs=2) as pool:
    # Submit multiple long-running tasks
    async_results = []
    for i in range(1, 6):
        result = pool.apply_async(long_running_task, args=(i,))
        async_results.append((i, result))
    
    # Poll for results and process them as they complete
    completed_tasks = set()
    
    while len(completed_tasks) < len(async_results):
        for task_id, async_result in async_results:
            if task_id not in completed_tasks and async_result.ready():
                result = async_result.get()
                print(f"Completed: {result}")
                completed_tasks.add(task_id)
        
        # Show progress
        print(f"Progress: {len(completed_tasks)}/{len(async_results)} tasks completed")
        time.sleep(0.1)
    
    print("All tasks completed!")

Advanced Error Handling

from mpire import WorkerPool
import time

def risky_operation(operation_id, fail_probability=0.3):
    time.sleep(0.5)
    
    import random
    if random.random() < fail_probability:
        if operation_id % 2 == 0:
            raise ValueError(f"ValueError in operation {operation_id}")
        else:
            raise RuntimeError(f"RuntimeError in operation {operation_id}")
    
    return f"Operation {operation_id} successful"

with WorkerPool(n_jobs=3) as pool:
    async_results = []
    
    # Submit operations with error handling
    for i in range(10):
        result = pool.apply_async(risky_operation, args=(i,))
        async_results.append((i, result))
    
    # Categorize results by outcome
    success_count = 0
    value_errors = 0
    runtime_errors = 0
    other_errors = 0
    
    for operation_id, async_result in async_results:
        async_result.wait()
        
        if async_result.successful():
            result = async_result.get()
            print(f"✓ {result}")
            success_count += 1
        else:
            try:
                async_result.get()
            except ValueError as e:
                print(f"ValueError in operation {operation_id}: {e}")
                value_errors += 1
            except RuntimeError as e:
                print(f"RuntimeError in operation {operation_id}: {e}")
                runtime_errors += 1
            except Exception as e:
                print(f"Unexpected error in operation {operation_id}: {e}")
                other_errors += 1
    
    print(f"\nResults Summary:")
    print(f"Successful: {success_count}")
    print(f"ValueErrors: {value_errors}")
    print(f"RuntimeErrors: {runtime_errors}")
    print(f"Other errors: {other_errors}")

Install with Tessl CLI

npx tessl i tessl/pypi-mpire

docs

apply-functions.md

async-results.md

dashboard-integration.md

exception-handling.md

index.md

parallel-map.md

performance-insights.md

utility-functions.md

worker-configuration.md

workerpool-management.md

tile.json