A Python package for easy multiprocessing, but faster than multiprocessing with advanced features including worker state management, progress bars, and performance insights.
—
Apply-style parallel execution for single function calls and asynchronous operations. These functions are ideal for submitting individual tasks rather than processing iterables.
Execute a single function call synchronously in a worker process.
def apply(self, func: Callable, args: Any = (), kwargs: Dict = None,
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
worker_exit_timeout: Optional[float] = None) -> AnyParameters:
func (Callable): Function to executeargs (Any): Positional arguments for the function. Default: ()kwargs (Dict): Keyword arguments for the function. Default: Nonecallback (Optional[Callable]): Function called with result when task succeedserror_callback (Optional[Callable]): Function called with exception when task failsworker_init (Optional[Callable]): Function called when worker startsworker_exit (Optional[Callable]): Function called when worker exitstask_timeout (Optional[float]): Timeout in seconds for the taskworker_init_timeout (Optional[float]): Timeout for worker initializationworker_exit_timeout (Optional[float]): Timeout for worker exitReturns: The result of the function call
Execute a single function call asynchronously and return an AsyncResult object.
def apply_async(self, func: Callable, args: Any = (), kwargs: Dict = None,
callback: Optional[Callable] = None, error_callback: Optional[Callable] = None,
worker_init: Optional[Callable] = None, worker_exit: Optional[Callable] = None,
task_timeout: Optional[float] = None, worker_init_timeout: Optional[float] = None,
worker_exit_timeout: Optional[float] = None) -> AsyncResultParameters: Same as apply() method
Returns: AsyncResult object for retrieving the result when ready
from mpire import WorkerPool
def expensive_calculation(x, y, multiplier=1):
import time
time.sleep(1) # Simulate expensive work
return (x + y) * multiplier
with WorkerPool(n_jobs=4) as pool:
# Synchronous apply - blocks until result is ready
result = pool.apply(expensive_calculation, args=(10, 20), kwargs={'multiplier': 2})
print(result) # 60
# Asynchronous apply - returns immediately
async_result = pool.apply_async(expensive_calculation, args=(5, 15), kwargs={'multiplier': 3})
# Do other work...
result = async_result.get() # Blocks until ready
print(result) # 60def process_data(data):
# Some processing
return data.upper()
def success_callback(result):
print(f"Task completed successfully: {result}")
def error_callback(exception):
print(f"Task failed with error: {exception}")
with WorkerPool(n_jobs=4) as pool:
# Apply with callbacks
result = pool.apply(
process_data,
args=("hello world",),
callback=success_callback,
error_callback=error_callback
)
# Async apply with callbacks
async_result = pool.apply_async(
process_data,
args=("hello async",),
callback=success_callback,
error_callback=error_callback
)from mpire import WorkerPool
def compute_factorial(n):
import math
return math.factorial(n)
with WorkerPool(n_jobs=4) as pool:
# Submit multiple async tasks
async_results = []
for i in range(10, 20):
result = pool.apply_async(compute_factorial, args=(i,))
async_results.append(result)
# Collect results as they become available
results = []
for async_result in async_results:
result = async_result.get(timeout=10) # 10 second timeout
results.append(result)
print("Factorials:", results)def init_worker(worker_state):
worker_state['counter'] = 0
def increment_counter(worker_state, value):
worker_state['counter'] += 1
return worker_state['counter'] * value
with WorkerPool(n_jobs=2, use_worker_state=True) as pool:
# Each apply call will reuse the same worker state
result1 = pool.apply(increment_counter, args=(10,), worker_init=init_worker)
result2 = pool.apply(increment_counter, args=(20,))
result3 = pool.apply(increment_counter, args=(30,))
print(f"Results: {result1}, {result2}, {result3}") # Results depend on worker assignmentdef risky_function(x):
if x < 0:
raise ValueError("Negative values not allowed")
return x ** 2
def handle_error(exception):
print(f"Caught exception: {type(exception).__name__}: {exception}")
with WorkerPool(n_jobs=2) as pool:
try:
# This will succeed
result = pool.apply(risky_function, args=(5,))
print(f"Success: {result}")
# This will fail
result = pool.apply(risky_function, args=(-3,), error_callback=handle_error)
except Exception as e:
print(f"Apply failed: {e}")
# Async version with error handling
async_result = pool.apply_async(risky_function, args=(-5,), error_callback=handle_error)
try:
result = async_result.get()
except Exception as e:
print(f"Async apply failed: {e}")def slow_function(duration):
import time
time.sleep(duration)
return f"Slept for {duration} seconds"
with WorkerPool(n_jobs=2) as pool:
try:
# This will succeed
result = pool.apply(slow_function, args=(1,), task_timeout=5.0)
print(result)
# This will timeout
result = pool.apply(slow_function, args=(10,), task_timeout=2.0)
print(result)
except TimeoutError:
print("Task timed out")
# Async version with timeout
async_result = pool.apply_async(slow_function, args=(3,), task_timeout=5.0)
try:
result = async_result.get(timeout=2.0) # Different timeout for getting result
print(result)
except TimeoutError:
print("Getting result timed out")Install with Tessl CLI
npx tessl i tessl/pypi-mpire