CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-multiprocess

A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill

Pending
Overview
Eval results
Files

pools.mddocs/

Process Pools

Parallel task execution using worker process pools. The Pool class provides a convenient way to distribute tasks across multiple processes with various execution patterns and result handling options.

Capabilities

Pool Class

Main class for managing a pool of worker processes for parallel task execution.

class Pool:
    """
    A pool of worker processes for parallel task execution.
    
    Args:
        processes: number of worker processes (default: cpu_count())
        initializer: callable to run on each worker process startup
        initargs: arguments for the initializer function
        maxtasksperchild: maximum tasks per worker before restart (default: None)
        context: multiprocess context to use for creating processes
    """
    def __init__(self, processes=None, initializer=None, initargs=(), 
                 maxtasksperchild=None, context=None): ...
    
    def map(self, func, iterable, chunksize=None):
        """
        Apply function to every item of iterable and return a list of results.
        
        Args:
            func: function to apply to each item
            iterable: items to process
            chunksize: items per task sent to worker processes
            
        Returns:
            list: results in same order as input
        """
    
    def map_async(self, func, iterable, chunksize=None, callback=None, 
                  error_callback=None):
        """
        Asynchronous version of map() method.
        
        Args:
            func: function to apply to each item
            iterable: items to process
            chunksize: items per task sent to worker processes
            callback: function to call with results when complete
            error_callback: function to call if an error occurs
            
        Returns:
            AsyncResult: result object for async operation
        """
    
    def imap(self, func, iterable, chunksize=1):
        """
        Lazy version of map() that returns an iterator.
        
        Args:
            func: function to apply to each item
            iterable: items to process
            chunksize: items per task sent to worker processes
            
        Returns:
            iterator: iterator over results
        """
    
    def imap_unordered(self, func, iterable, chunksize=1):
        """
        Like imap() but results can be returned in any order.
        
        Args:
            func: function to apply to each item
            iterable: items to process
            chunksize: items per task sent to worker processes
            
        Returns:
            iterator: iterator over results in arbitrary order
        """
    
    def starmap(self, func, iterable, chunksize=None):
        """
        Like map() but arguments are unpacked from tuples.
        
        Args:
            func: function to apply (called with *args from each tuple)
            iterable: sequence of tuples containing arguments
            chunksize: items per task sent to worker processes
            
        Returns:
            list: results in same order as input
        """
    
    def starmap_async(self, func, iterable, chunksize=None, callback=None, 
                      error_callback=None):
        """
        Asynchronous version of starmap() method.
        
        Args:
            func: function to apply (called with *args from each tuple)
            iterable: sequence of tuples containing arguments
            chunksize: items per task sent to worker processes
            callback: function to call with results when complete
            error_callback: function to call if an error occurs
            
        Returns:
            AsyncResult: result object for async operation
        """
    
    def apply(self, func, args=(), kwds={}):
        """
        Apply function with arguments and return the result.
        
        Args:
            func: function to call
            args: positional arguments for func
            kwds: keyword arguments for func
            
        Returns:
            object: result of function call
        """
    
    def apply_async(self, func, args=(), kwds={}, callback=None, 
                    error_callback=None):
        """
        Asynchronous version of apply() method.
        
        Args:
            func: function to call
            args: positional arguments for func
            kwds: keyword arguments for func
            callback: function to call with result when complete
            error_callback: function to call if an error occurs
            
        Returns:
            AsyncResult: result object for async operation
        """
    
    def close(self):
        """
        Prevent any more tasks from being submitted to the pool.
        Once closed, no new tasks can be submitted.
        """
    
    def terminate(self):
        """
        Stop the worker processes immediately without completing work.
        """
    
    def join(self):
        """
        Wait for the worker processes to exit.
        Must call close() or terminate() before using join().
        """
    
    def __enter__(self):
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - closes pool and joins workers."""

AsyncResult Class

Object representing the result of an asynchronous operation.

class AsyncResult:
    """
    Result object for asynchronous pool operations.
    """
    def get(self, timeout=None):
        """
        Return the result when it arrives.
        
        Args:
            timeout: maximum time to wait (seconds)
            
        Returns:
            object: result of the operation
            
        Raises:
            TimeoutError: if timeout exceeded
        """
    
    def wait(self, timeout=None):
        """
        Wait until the result is available.
        
        Args:
            timeout: maximum time to wait (seconds)
            
        Returns:
            bool: True if result is available, False if timeout
        """
    
    def ready(self):
        """
        Return True if the operation is complete.
        
        Returns:
            bool: True if operation is complete
        """
    
    def successful(self):
        """
        Return True if the operation completed without error.
        Must call ready() first to ensure operation is complete.
        
        Returns:
            bool: True if successful
            
        Raises:
            ValueError: if operation is not yet complete
        """

Usage Examples

Basic Pool Map

from multiprocess import Pool

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:
        numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        results = pool.map(square, numbers)
        print(f"Results: {results}")
        # Output: Results: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

Asynchronous Processing

from multiprocess import Pool
import time

def slow_function(x):
    time.sleep(1)  # Simulate slow work
    return x * x

def result_callback(result):
    print(f"Got result: {result}")

def error_callback(error):
    print(f"Got error: {error}")

if __name__ == '__main__':
    with Pool(processes=2) as pool:
        # Submit async job
        async_result = pool.map_async(
            slow_function, 
            [1, 2, 3, 4], 
            callback=result_callback,
            error_callback=error_callback
        )
        
        # Do other work while waiting
        print("Doing other work...")
        time.sleep(0.5)
        print("Still working...")
        
        # Get results (blocks until complete)
        results = async_result.get(timeout=10)
        print(f"Final results: {results}")

Starmap for Multiple Arguments

from multiprocess import Pool

def multiply(x, y):
    return x * y

def power(base, exponent):
    return base ** exponent

if __name__ == '__main__':
    with Pool(processes=3) as pool:
        # Each tuple contains arguments for the function
        multiply_args = [(2, 3), (4, 5), (6, 7)]
        multiply_results = pool.starmap(multiply, multiply_args)
        print(f"Multiply results: {multiply_results}")
        # Output: Multiply results: [6, 20, 42]
        
        power_args = [(2, 3), (3, 2), (4, 2), (5, 2)]
        power_results = pool.starmap(power, power_args)
        print(f"Power results: {power_results}")
        # Output: Power results: [8, 9, 16, 25]

Iterator-based Processing

from multiprocess import Pool
import time

def process_item(x):
    # Simulate variable processing time
    time.sleep(x * 0.1)
    return x * x

if __name__ == '__main__':
    with Pool(processes=2) as pool:
        items = range(1, 11)
        
        # Ordered iterator (results in input order)
        print("Ordered results:")
        for result in pool.imap(process_item, items, chunksize=2):
            print(f"Got result: {result}")
        
        print("\nUnordered results:")
        # Unordered iterator (results as they complete)
        for result in pool.imap_unordered(process_item, items, chunksize=2):
            print(f"Got result: {result}")

Pool with Initializer

from multiprocess import Pool
import os

# Global variable in worker processes
worker_state = None

def init_worker(initial_value):
    global worker_state
    worker_state = initial_value
    print(f"Worker {os.getpid()} initialized with {initial_value}")

def worker_task(x):
    global worker_state
    pid = os.getpid()
    result = x + worker_state
    print(f"Worker {pid} processed {x} with state {worker_state} = {result}")
    return result

if __name__ == '__main__':
    # Each worker will be initialized with value 100
    with Pool(processes=2, initializer=init_worker, initargs=(100,)) as pool:
        tasks = [1, 2, 3, 4, 5]
        results = pool.map(worker_task, tasks)
        print(f"Results: {results}")

Error Handling

from multiprocess import Pool
import random

def unreliable_function(x):
    if random.random() < 0.3:  # 30% chance of error
        raise ValueError(f"Error processing {x}")
    return x * x

def handle_result(result):
    print(f"Success: {result}")

def handle_error(error):
    print(f"Error occurred: {error}")

if __name__ == '__main__':
    with Pool(processes=2) as pool:
        # Submit multiple async tasks
        async_results = []
        for i in range(10):
            result = pool.apply_async(
                unreliable_function, 
                (i,),
                callback=handle_result,
                error_callback=handle_error
            )
            async_results.append(result)
        
        # Wait for all tasks and handle individual results
        for i, async_result in enumerate(async_results):
            try:
                result = async_result.get(timeout=5)
                print(f"Task {i} completed successfully: {result}")
            except Exception as e:
                print(f"Task {i} failed: {e}")

Pool with Context Manager and Resource Cleanup

from multiprocess import Pool
import time
import os

def cpu_intensive_task(n):
    """Simulate CPU-intensive work"""
    pid = os.getpid()
    start_time = time.time()
    
    # Simulate computation
    total = 0
    for i in range(n * 1000000):
        total += i * i
    
    end_time = time.time()
    duration = end_time - start_time
    
    return {
        'pid': pid,
        'input': n,
        'result': total,
        'duration': duration
    }

if __name__ == '__main__':
    tasks = [10, 20, 30, 40, 50]
    
    # Using context manager ensures proper cleanup
    with Pool(processes=3) as pool:
        print("Starting parallel processing...")
        start_time = time.time()
        
        # Process tasks in parallel
        results = pool.map(cpu_intensive_task, tasks)
        
        end_time = time.time()
        total_time = end_time - start_time
        
        print(f"\nAll tasks completed in {total_time:.2f} seconds")
        print("\nResults:")
        for result in results:
            print(f"PID {result['pid']}: input={result['input']}, "
                  f"duration={result['duration']:.3f}s")

Chunking for Performance

from multiprocess import Pool
import time

def simple_task(x):
    return x * x

def benchmark_chunking(items, pool_size, chunk_sizes):
    """Benchmark different chunk sizes"""
    for chunk_size in chunk_sizes:
        with Pool(processes=pool_size) as pool:
            start_time = time.time()
            results = pool.map(simple_task, items, chunksize=chunk_size)
            end_time = time.time()
            
            duration = end_time - start_time
            print(f"Chunk size {chunk_size}: {duration:.3f} seconds")

if __name__ == '__main__':
    # Large dataset
    items = list(range(10000))
    pool_size = 4
    
    # Test different chunk sizes
    chunk_sizes = [1, 10, 50, 100, 500, 1000]
    
    print("Benchmarking chunk sizes:")
    benchmark_chunking(items, pool_size, chunk_sizes)

Advanced: Custom Result Processing

from multiprocess import Pool
import json
import time

def fetch_and_process_data(item_id):
    """Simulate fetching and processing data"""
    # Simulate network delay
    time.sleep(0.1)
    
    # Simulate data processing
    data = {
        'id': item_id,
        'value': item_id * 10,
        'processed_at': time.time(),
        'status': 'completed'
    }
    
    return data

def save_result(result):
    """Callback to save each result as it completes"""
    with open(f"result_{result['id']}.json", 'w') as f:
        json.dump(result, f)
    print(f"Saved result for item {result['id']}")

if __name__ == '__main__':
    item_ids = list(range(1, 21))  # Process 20 items
    
    with Pool(processes=4) as pool:
        # Submit all tasks asynchronously with callback
        async_results = []
        for item_id in item_ids:
            result = pool.apply_async(
                fetch_and_process_data,
                (item_id,),
                callback=save_result
            )
            async_results.append(result)
        
        # Monitor progress
        completed = 0
        while completed < len(async_results):
            ready_count = sum(1 for r in async_results if r.ready())
            if ready_count > completed:
                completed = ready_count
                print(f"Progress: {completed}/{len(async_results)} tasks completed")
            time.sleep(0.5)
        
        print("All tasks completed!")

Install with Tessl CLI

npx tessl i tessl/pypi-multiprocess

docs

communication.md

context-config.md

index.md

pools.md

process-management.md

shared-objects.md

synchronization.md

tile.json