A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Asynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available. The AsyncResult system provides fine-grained control over task execution and result retrieval.
Main class for handling asynchronous results from apply_async operations.
class AsyncResult:
def __init__(self, cache: Dict, callback: Optional[Callable], error_callback: Optional[Callable],
job_id: Optional[int] = None, delete_from_cache: bool = True,
timeout: Optional[float] = None) -> None
def ready(self) -> bool
def successful(self) -> bool
def get(self, timeout: Optional[float] = None) -> Any
def wait(self, timeout: Optional[float] = None) -> Noneready: Check if the task has completed (either successfully or with error).
successful: Check if the task completed successfully (only valid after ready() returns True).
get: Retrieve the result, blocking until ready. Raises the exception if task failed.
timeout (Optional[float]): Maximum time to wait for result in secondswait: Wait for the task to complete without retrieving the result.
timeout (Optional[float]): Maximum time to wait in secondsIterator classes for handling collections of async results.
class UnorderedAsyncResultIterator:
def __init__(self, cache: Dict, job_ids: List[int]) -> None
def __iter__(self) -> Iterator
def __next__(self) -> Any
class AsyncResultWithExceptionGetter(AsyncResult):
"""AsyncResult subclass with enhanced exception handling"""
pass
class UnorderedAsyncExitResultIterator(UnorderedAsyncResultIterator):
"""Iterator for worker exit results"""
passfrom mpire import WorkerPool
import time
def slow_computation(x):
time.sleep(x * 0.1)
return x ** 2
with WorkerPool(n_jobs=4) as pool:
# Submit async task
async_result = pool.apply_async(slow_computation, args=(5,))
# Do other work while task runs
print("Task submitted, doing other work...")
time.sleep(0.2)
# Check if ready
if async_result.ready():
print("Task completed!")
if async_result.successful():
result = async_result.get()
print(f"Result: {result}")
else:
print("Task still running, waiting...")
result = async_result.get() # Block until ready
print(f"Result: {result}")from mpire import WorkerPool
import time
def factorial(n):
if n <= 1:
return 1
result = 1
for i in range(2, n + 1):
result *= i
return result
with WorkerPool(n_jobs=3) as pool:
# Submit multiple async tasks
tasks = []
for i in range(1, 11):
async_result = pool.apply_async(factorial, args=(i,))
tasks.append((i, async_result))
# Process results as they become available
completed = []
while len(completed) < len(tasks):
for i, (input_val, async_result) in enumerate(tasks):
if i not in completed and async_result.ready():
if async_result.successful():
result = async_result.get()
print(f"Factorial of {input_val} = {result}")
else:
print(f"Task {input_val} failed")
completed.append(i)
time.sleep(0.01) # Small delay to prevent busy waitingfrom mpire import WorkerPool
import time
def unreliable_task(duration):
time.sleep(duration)
return f"Completed after {duration} seconds"
with WorkerPool(n_jobs=2) as pool:
# Submit tasks with different durations
fast_task = pool.apply_async(unreliable_task, args=(1,))
slow_task = pool.apply_async(unreliable_task, args=(5,))
# Get results with timeout
try:
result1 = fast_task.get(timeout=2.0)
print(f"Fast task: {result1}")
except TimeoutError:
print("Fast task timed out")
try:
result2 = slow_task.get(timeout=2.0)
print(f"Slow task: {result2}")
except TimeoutError:
print("Slow task timed out")
# Wait for slow task without timeout
print("Waiting for slow task to complete...")
result2 = slow_task.get() # No timeout
print(f"Slow task finally completed: {result2}")from mpire import WorkerPool
import time
def process_data(data):
time.sleep(0.5)
if data < 0:
raise ValueError(f"Negative data not allowed: {data}")
return data * 2
def success_callback(result):
print(f"✓ Task succeeded with result: {result}")
def error_callback(exception):
print(f"✗ Task failed with error: {type(exception).__name__}: {exception}")
with WorkerPool(n_jobs=2) as pool:
# Submit tasks with callbacks
tasks = []
test_data = [1, 2, -1, 3, -2, 4]
for data in test_data:
async_result = pool.apply_async(
process_data,
args=(data,),
callback=success_callback,
error_callback=error_callback
)
tasks.append(async_result)
# Wait for all tasks to complete
for async_result in tasks:
async_result.wait()
print("All tasks completed")from mpire import WorkerPool
import time
import random
def random_computation(x):
# Simulate variable processing time
sleep_time = random.uniform(0.1, 1.0)
time.sleep(sleep_time)
# Occasionally fail
if random.random() < 0.2:
raise RuntimeError(f"Random failure for input {x}")
return x ** 2
with WorkerPool(n_jobs=3) as pool:
# Submit batch of tasks
async_results = []
for i in range(10):
result = pool.apply_async(random_computation, args=(i,))
async_results.append(result)
# Process results with different strategies
successful_results = []
failed_results = []
for i, async_result in enumerate(async_results):
async_result.wait() # Wait for completion
if async_result.successful():
result = async_result.get()
successful_results.append((i, result))
print(f"✓ Task {i}: {result}")
else:
try:
async_result.get() # This will raise the exception
except Exception as e:
failed_results.append((i, str(e)))
print(f"✗ Task {i}: {e}")
print(f"\nSummary: {len(successful_results)} succeeded, {len(failed_results)} failed")from mpire import WorkerPool
import time
def long_running_task(task_id):
# Simulate different task durations
duration = task_id * 0.5
time.sleep(duration)
return f"Task {task_id} completed after {duration}s"
with WorkerPool(n_jobs=2) as pool:
# Submit multiple long-running tasks
async_results = []
for i in range(1, 6):
result = pool.apply_async(long_running_task, args=(i,))
async_results.append((i, result))
# Poll for results and process them as they complete
completed_tasks = set()
while len(completed_tasks) < len(async_results):
for task_id, async_result in async_results:
if task_id not in completed_tasks and async_result.ready():
result = async_result.get()
print(f"Completed: {result}")
completed_tasks.add(task_id)
# Show progress
print(f"Progress: {len(completed_tasks)}/{len(async_results)} tasks completed")
time.sleep(0.1)
print("All tasks completed!")from mpire import WorkerPool
import time
def risky_operation(operation_id, fail_probability=0.3):
time.sleep(0.5)
import random
if random.random() < fail_probability:
if operation_id % 2 == 0:
raise ValueError(f"ValueError in operation {operation_id}")
else:
raise RuntimeError(f"RuntimeError in operation {operation_id}")
return f"Operation {operation_id} successful"
with WorkerPool(n_jobs=3) as pool:
async_results = []
# Submit operations with error handling
for i in range(10):
result = pool.apply_async(risky_operation, args=(i,))
async_results.append((i, result))
# Categorize results by outcome
success_count = 0
value_errors = 0
runtime_errors = 0
other_errors = 0
for operation_id, async_result in async_results:
async_result.wait()
if async_result.successful():
result = async_result.get()
print(f"✓ {result}")
success_count += 1
else:
try:
async_result.get()
except ValueError as e:
print(f"ValueError in operation {operation_id}: {e}")
value_errors += 1
except RuntimeError as e:
print(f"RuntimeError in operation {operation_id}: {e}")
runtime_errors += 1
except Exception as e:
print(f"Unexpected error in operation {operation_id}: {e}")
other_errors += 1
print(f"\nResults Summary:")
print(f"Successful: {success_count}")
print(f"ValueErrors: {value_errors}")
print(f"RuntimeErrors: {runtime_errors}")
print(f"Other errors: {other_errors}")Install with Tessl CLI
npx tessl i tessl/pypi-mpire