A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill
—
Parallel task execution using worker process pools. The Pool class provides a convenient way to distribute tasks across multiple processes with various execution patterns and result handling options.
Main class for managing a pool of worker processes for parallel task execution.
class Pool:
"""
A pool of worker processes for parallel task execution.
Args:
processes: number of worker processes (default: cpu_count())
initializer: callable to run on each worker process startup
initargs: arguments for the initializer function
maxtasksperchild: maximum tasks per worker before restart (default: None)
context: multiprocess context to use for creating processes
"""
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None): ...
def map(self, func, iterable, chunksize=None):
"""
Apply function to every item of iterable and return a list of results.
Args:
func: function to apply to each item
iterable: items to process
chunksize: items per task sent to worker processes
Returns:
list: results in same order as input
"""
def map_async(self, func, iterable, chunksize=None, callback=None,
error_callback=None):
"""
Asynchronous version of map() method.
Args:
func: function to apply to each item
iterable: items to process
chunksize: items per task sent to worker processes
callback: function to call with results when complete
error_callback: function to call if an error occurs
Returns:
AsyncResult: result object for async operation
"""
def imap(self, func, iterable, chunksize=1):
"""
Lazy version of map() that returns an iterator.
Args:
func: function to apply to each item
iterable: items to process
chunksize: items per task sent to worker processes
Returns:
iterator: iterator over results
"""
def imap_unordered(self, func, iterable, chunksize=1):
"""
Like imap() but results can be returned in any order.
Args:
func: function to apply to each item
iterable: items to process
chunksize: items per task sent to worker processes
Returns:
iterator: iterator over results in arbitrary order
"""
def starmap(self, func, iterable, chunksize=None):
"""
Like map() but arguments are unpacked from tuples.
Args:
func: function to apply (called with *args from each tuple)
iterable: sequence of tuples containing arguments
chunksize: items per task sent to worker processes
Returns:
list: results in same order as input
"""
def starmap_async(self, func, iterable, chunksize=None, callback=None,
error_callback=None):
"""
Asynchronous version of starmap() method.
Args:
func: function to apply (called with *args from each tuple)
iterable: sequence of tuples containing arguments
chunksize: items per task sent to worker processes
callback: function to call with results when complete
error_callback: function to call if an error occurs
Returns:
AsyncResult: result object for async operation
"""
def apply(self, func, args=(), kwds={}):
"""
Apply function with arguments and return the result.
Args:
func: function to call
args: positional arguments for func
kwds: keyword arguments for func
Returns:
object: result of function call
"""
def apply_async(self, func, args=(), kwds={}, callback=None,
error_callback=None):
"""
Asynchronous version of apply() method.
Args:
func: function to call
args: positional arguments for func
kwds: keyword arguments for func
callback: function to call with result when complete
error_callback: function to call if an error occurs
Returns:
AsyncResult: result object for async operation
"""
def close(self):
"""
Prevent any more tasks from being submitted to the pool.
Once closed, no new tasks can be submitted.
"""
def terminate(self):
"""
Stop the worker processes immediately without completing work.
"""
def join(self):
"""
Wait for the worker processes to exit.
Must call close() or terminate() before using join().
"""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - closes pool and joins workers."""Object representing the result of an asynchronous operation.
class AsyncResult:
"""
Result object for asynchronous pool operations.
"""
def get(self, timeout=None):
"""
Return the result when it arrives.
Args:
timeout: maximum time to wait (seconds)
Returns:
object: result of the operation
Raises:
TimeoutError: if timeout exceeded
"""
def wait(self, timeout=None):
"""
Wait until the result is available.
Args:
timeout: maximum time to wait (seconds)
Returns:
bool: True if result is available, False if timeout
"""
def ready(self):
"""
Return True if the operation is complete.
Returns:
bool: True if operation is complete
"""
def successful(self):
"""
Return True if the operation completed without error.
Must call ready() first to ensure operation is complete.
Returns:
bool: True if successful
Raises:
ValueError: if operation is not yet complete
"""from multiprocess import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
results = pool.map(square, numbers)
print(f"Results: {results}")
# Output: Results: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]from multiprocess import Pool
import time
def slow_function(x):
time.sleep(1) # Simulate slow work
return x * x
def result_callback(result):
print(f"Got result: {result}")
def error_callback(error):
print(f"Got error: {error}")
if __name__ == '__main__':
with Pool(processes=2) as pool:
# Submit async job
async_result = pool.map_async(
slow_function,
[1, 2, 3, 4],
callback=result_callback,
error_callback=error_callback
)
# Do other work while waiting
print("Doing other work...")
time.sleep(0.5)
print("Still working...")
# Get results (blocks until complete)
results = async_result.get(timeout=10)
print(f"Final results: {results}")from multiprocess import Pool
def multiply(x, y):
return x * y
def power(base, exponent):
return base ** exponent
if __name__ == '__main__':
with Pool(processes=3) as pool:
# Each tuple contains arguments for the function
multiply_args = [(2, 3), (4, 5), (6, 7)]
multiply_results = pool.starmap(multiply, multiply_args)
print(f"Multiply results: {multiply_results}")
# Output: Multiply results: [6, 20, 42]
power_args = [(2, 3), (3, 2), (4, 2), (5, 2)]
power_results = pool.starmap(power, power_args)
print(f"Power results: {power_results}")
# Output: Power results: [8, 9, 16, 25]from multiprocess import Pool
import time
def process_item(x):
# Simulate variable processing time
time.sleep(x * 0.1)
return x * x
if __name__ == '__main__':
with Pool(processes=2) as pool:
items = range(1, 11)
# Ordered iterator (results in input order)
print("Ordered results:")
for result in pool.imap(process_item, items, chunksize=2):
print(f"Got result: {result}")
print("\nUnordered results:")
# Unordered iterator (results as they complete)
for result in pool.imap_unordered(process_item, items, chunksize=2):
print(f"Got result: {result}")from multiprocess import Pool
import os
# Global variable in worker processes
worker_state = None
def init_worker(initial_value):
global worker_state
worker_state = initial_value
print(f"Worker {os.getpid()} initialized with {initial_value}")
def worker_task(x):
global worker_state
pid = os.getpid()
result = x + worker_state
print(f"Worker {pid} processed {x} with state {worker_state} = {result}")
return result
if __name__ == '__main__':
# Each worker will be initialized with value 100
with Pool(processes=2, initializer=init_worker, initargs=(100,)) as pool:
tasks = [1, 2, 3, 4, 5]
results = pool.map(worker_task, tasks)
print(f"Results: {results}")from multiprocess import Pool
import random
def unreliable_function(x):
if random.random() < 0.3: # 30% chance of error
raise ValueError(f"Error processing {x}")
return x * x
def handle_result(result):
print(f"Success: {result}")
def handle_error(error):
print(f"Error occurred: {error}")
if __name__ == '__main__':
with Pool(processes=2) as pool:
# Submit multiple async tasks
async_results = []
for i in range(10):
result = pool.apply_async(
unreliable_function,
(i,),
callback=handle_result,
error_callback=handle_error
)
async_results.append(result)
# Wait for all tasks and handle individual results
for i, async_result in enumerate(async_results):
try:
result = async_result.get(timeout=5)
print(f"Task {i} completed successfully: {result}")
except Exception as e:
print(f"Task {i} failed: {e}")from multiprocess import Pool
import time
import os
def cpu_intensive_task(n):
"""Simulate CPU-intensive work"""
pid = os.getpid()
start_time = time.time()
# Simulate computation
total = 0
for i in range(n * 1000000):
total += i * i
end_time = time.time()
duration = end_time - start_time
return {
'pid': pid,
'input': n,
'result': total,
'duration': duration
}
if __name__ == '__main__':
tasks = [10, 20, 30, 40, 50]
# Using context manager ensures proper cleanup
with Pool(processes=3) as pool:
print("Starting parallel processing...")
start_time = time.time()
# Process tasks in parallel
results = pool.map(cpu_intensive_task, tasks)
end_time = time.time()
total_time = end_time - start_time
print(f"\nAll tasks completed in {total_time:.2f} seconds")
print("\nResults:")
for result in results:
print(f"PID {result['pid']}: input={result['input']}, "
f"duration={result['duration']:.3f}s")from multiprocess import Pool
import time
def simple_task(x):
return x * x
def benchmark_chunking(items, pool_size, chunk_sizes):
"""Benchmark different chunk sizes"""
for chunk_size in chunk_sizes:
with Pool(processes=pool_size) as pool:
start_time = time.time()
results = pool.map(simple_task, items, chunksize=chunk_size)
end_time = time.time()
duration = end_time - start_time
print(f"Chunk size {chunk_size}: {duration:.3f} seconds")
if __name__ == '__main__':
# Large dataset
items = list(range(10000))
pool_size = 4
# Test different chunk sizes
chunk_sizes = [1, 10, 50, 100, 500, 1000]
print("Benchmarking chunk sizes:")
benchmark_chunking(items, pool_size, chunk_sizes)from multiprocess import Pool
import json
import time
def fetch_and_process_data(item_id):
"""Simulate fetching and processing data"""
# Simulate network delay
time.sleep(0.1)
# Simulate data processing
data = {
'id': item_id,
'value': item_id * 10,
'processed_at': time.time(),
'status': 'completed'
}
return data
def save_result(result):
"""Callback to save each result as it completes"""
with open(f"result_{result['id']}.json", 'w') as f:
json.dump(result, f)
print(f"Saved result for item {result['id']}")
if __name__ == '__main__':
item_ids = list(range(1, 21)) # Process 20 items
with Pool(processes=4) as pool:
# Submit all tasks asynchronously with callback
async_results = []
for item_id in item_ids:
result = pool.apply_async(
fetch_and_process_data,
(item_id,),
callback=save_result
)
async_results.append(result)
# Monitor progress
completed = 0
while completed < len(async_results):
ready_count = sum(1 for r in async_results if r.ready())
if ready_count > completed:
completed = ready_count
print(f"Progress: {completed}/{len(async_results)} tasks completed")
time.sleep(0.5)
print("All tasks completed!")Install with Tessl CLI
npx tessl i tessl/pypi-multiprocess