CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

process-pools.mddocs/

Process Pools

Managed pools of worker processes for executing CPU-intensive tasks. Process pools bypass Python's Global Interpreter Lock (GIL) to provide true parallelism with advanced features like timeouts, automatic worker restart, and comprehensive error handling with process isolation.

Capabilities

ProcessPool Class

A managed pool of worker processes that can execute multiple tasks concurrently with true parallelism, automatic process lifecycle management, and advanced error handling.

class ProcessPool:
    def __init__(
        self,
        max_workers: int = multiprocessing.cpu_count(),
        max_tasks: int = 0,
        initializer: Callable = None,
        initargs: list = (),
        context: multiprocessing.context.BaseContext = multiprocessing
    ):
        """
        Create a process pool for CPU-intensive concurrent task execution.
        
        Parameters:
        - max_workers: Maximum number of worker processes (defaults to CPU count)
        - max_tasks: Maximum tasks per worker before restart (0 = no limit)
        - initializer: Function called when each worker process starts
        - initargs: Arguments passed to initializer function
        - context: Multiprocessing context (spawn, fork, forkserver)
        """

Basic Usage

from pebble import ProcessPool
import time

# Create pool with default settings
pool = ProcessPool()

# Create pool with custom configuration
pool = ProcessPool(max_workers=4, max_tasks=50)

def cpu_intensive_task(n, multiplier=1):
    # CPU-intensive computation that benefits from true parallelism
    total = 0
    for i in range(n):
        total += (i ** 2) * multiplier
    return total

# Schedule tasks
future1 = pool.schedule(cpu_intensive_task, args=(100000,), kwargs={"multiplier": 2})
future2 = pool.schedule(cpu_intensive_task, args=(200000,))

# Get results
result1 = future1.result()
result2 = future2.result()

print(f"Results: {result1}, {result2}")

# Always clean up
pool.close()
pool.join()

Task Scheduling with Timeouts

Schedule individual tasks with timeout protection to prevent runaway processes:

def schedule(
    self,
    function: Callable,
    args: list = (),
    kwargs: dict = {},
    timeout: float = None
) -> ProcessFuture:
    """
    Schedule a function for execution in the process pool.
    
    Parameters:
    - function: The function to execute
    - args: Positional arguments to pass to function
    - kwargs: Keyword arguments to pass to function
    - timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
    
    Returns:
    ProcessFuture object for retrieving the result
    """

def submit(
    self,
    function: Callable,
    timeout: Optional[float],
    /,
    *args,
    **kwargs
) -> ProcessFuture:
    """
    Submit a function for execution (compatibility with concurrent.futures).
    
    Parameters:
    - function: The function to execute
    - timeout: Maximum execution time in seconds (positional-only parameter)
    - args: Positional arguments to pass to function
    - kwargs: Keyword arguments to pass to function
    
    Returns:
    ProcessFuture object for retrieving the result
    """

Scheduling Examples

from pebble import ProcessPool, ProcessExpired
import time
import math

def prime_factorization(n):
    """CPU-intensive task: find prime factors"""
    factors = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            factors.append(d)
            n //= d
        d += 1
    if n > 1:
        factors.append(n)
    return factors

def monte_carlo_pi(iterations):
    """CPU-intensive task: estimate Pi using Monte Carlo method"""
    import random
    inside_circle = 0
    for _ in range(iterations):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            inside_circle += 1
    return 4.0 * inside_circle / iterations

def potentially_slow_task(delay):
    """Task that might run too long"""
    time.sleep(delay)
    return f"Completed after {delay} seconds"

# CPU-intensive work with timeouts
with ProcessPool(max_workers=4) as pool:
    # Schedule CPU-intensive tasks
    numbers = [982451653, 982451654, 982451655, 982451656]
    factor_futures = []
    
    for num in numbers:
        future = pool.schedule(
            prime_factorization, 
            args=(num,), 
            timeout=30.0  # 30 second timeout
        )
        factor_futures.append(future)
    
    # Monte Carlo Pi estimation
    pi_futures = []
    for iterations in [1000000, 2000000, 3000000]:
        future = pool.schedule(
            monte_carlo_pi,
            args=(iterations,),
            timeout=60.0  # 60 second timeout
        )
        pi_futures.append(future)
    
    # Schedule tasks that might timeout
    timeout_futures = [
        pool.schedule(potentially_slow_task, args=(1,), timeout=5.0),  # Should complete
        pool.schedule(potentially_slow_task, args=(10,), timeout=5.0)  # Should timeout
    ]
    
    # Collect results with error handling
    print("Prime factorizations:")
    for i, future in enumerate(factor_futures):
        try:
            result = future.result()
            print(f"  {numbers[i]} = {' × '.join(map(str, result))}")
        except TimeoutError:
            print(f"  {numbers[i]} = TIMEOUT")
        except Exception as e:
            print(f"  {numbers[i]} = ERROR: {e}")
    
    print("\nPi estimations:")
    for i, future in enumerate(pi_futures):
        try:
            pi_estimate = future.result()
            iterations = [1000000, 2000000, 3000000][i]
            error = abs(pi_estimate - math.pi)
            print(f"  {iterations:,} iterations: π ≈ {pi_estimate:.6f} (error: {error:.6f})")
        except Exception as e:
            print(f"  ERROR: {e}")
    
    print("\nTimeout examples:")
    for i, future in enumerate(timeout_futures):
        try:
            result = future.result()
            print(f"  Task {i+1}: {result}")
        except TimeoutError:
            print(f"  Task {i+1}: TIMEOUT")
        except ProcessExpired as e:
            print(f"  Task {i+1}: PROCESS DIED: {e}")

Bulk Operations with Map

Execute a function across multiple inputs efficiently using process pools:

def map(
    self,
    function: Callable,
    *iterables,
    chunksize: int = None,
    timeout: float = None
) -> ProcessMapFuture:
    """
    Apply function to every item of iterables in parallel using processes.
    
    Parameters:
    - function: Function to apply to each item
    - iterables: One or more iterables to process
    - chunksize: Number of items per chunk sent to each process
    - timeout: Maximum time to wait for all results
    
    Returns:
    ProcessMapFuture object that yields results as they become available
    """

Map Usage Examples

from pebble import ProcessPool
import math
import time

def cpu_bound_function(x):
    """Simulate CPU-intensive work"""
    result = 0
    for i in range(x * 1000):
        result += math.sin(i) * math.cos(i)
    return result

def data_processing_pipeline(data_chunk):
    """Process a chunk of data"""
    processed = []
    for item in data_chunk:
        # Simulate complex processing
        processed_item = {
            'original': item,
            'squared': item ** 2,
            'sqrt': math.sqrt(abs(item)),
            'factorial': math.factorial(min(abs(item), 10))  # Limit to prevent huge numbers
        }
        processed.append(processed_item)
    return processed

def matrix_operation(matrix_row):
    """Perform operations on matrix row"""
    return [x ** 2 + math.sin(x) for x in matrix_row]

# Efficient parallel processing with map
with ProcessPool(max_workers=6) as pool:
    # Process large dataset
    large_dataset = list(range(1, 101))
    
    print("Processing large dataset...")
    start_time = time.time()
    
    # Use map with optimal chunk size
    results = pool.map(
        cpu_bound_function, 
        large_dataset, 
        chunksize=10,  # Process 10 items per chunk
        timeout=120    # 2 minute timeout for entire operation
    )
    
    # Convert to list to get all results
    processed_results = list(results)
    end_time = time.time()
    
    print(f"Processed {len(processed_results)} items in {end_time - start_time:.2f} seconds")
    print(f"Average result: {sum(processed_results) / len(processed_results):.2f}")
    
    # Data processing pipeline
    raw_data = [list(range(i*10, (i+1)*10)) for i in range(20)]  # 20 chunks of 10 items each
    
    print("\nRunning data processing pipeline...")
    pipeline_results = pool.map(
        data_processing_pipeline,
        raw_data,
        chunksize=2,   # 2 data chunks per process
        timeout=60
    )
    
    # Flatten results
    all_processed = []
    for chunk_result in pipeline_results:
        all_processed.extend(chunk_result)
    
    print(f"Processed {len(all_processed)} data items through pipeline")
    
    # Matrix operations
    matrix = [[i+j for j in range(100)] for i in range(50)]  # 50x100 matrix
    
    print("\nPerforming matrix operations...")
    matrix_results = pool.map(
        matrix_operation,
        matrix,
        chunksize=5,   # 5 rows per process
        timeout=30
    )
    
    processed_matrix = list(matrix_results)
    print(f"Processed matrix with {len(processed_matrix)} rows")

Multiprocessing Context Configuration

Configure the multiprocessing context for different process creation methods:

import multiprocessing
from pebble import ProcessPool

def worker_task(data, worker_id=None):
    import os
    return {
        'data': data,
        'worker_pid': os.getpid(),
        'worker_id': worker_id
    }

# Different multiprocessing contexts
def context_examples():
    # Spawn context (creates fresh Python interpreter)
    spawn_ctx = multiprocessing.get_context('spawn')
    spawn_pool = ProcessPool(max_workers=2, context=spawn_ctx)
    
    # Fork context (copies current process) - Unix only
    try:
        fork_ctx = multiprocessing.get_context('fork')
        fork_pool = ProcessPool(max_workers=2, context=fork_ctx)
    except RuntimeError:
        print("Fork context not available on this platform")
        fork_pool = None
    
    # Forkserver context (hybrid approach) - Unix only
    try:
        forkserver_ctx = multiprocessing.get_context('forkserver')
        forkserver_pool = ProcessPool(max_workers=2, context=forkserver_ctx)
    except RuntimeError:
        print("Forkserver context not available on this platform")
        forkserver_pool = None
    
    # Test different contexts
    test_data = list(range(10))
    
    print("Testing spawn context:")
    with spawn_pool:
        spawn_results = [
            spawn_pool.schedule(worker_task, args=(data, f"spawn-{data}"))
            for data in test_data
        ]
        for future in spawn_results:
            print(f"  {future.result()}")
    
    if fork_pool:
        print("\nTesting fork context:")
        with fork_pool:
            fork_results = [
                fork_pool.schedule(worker_task, args=(data, f"fork-{data}"))
                for data in test_data
            ]
            for future in fork_results:
                print(f"  {future.result()}")
    
    if forkserver_pool:
        print("\nTesting forkserver context:")
        with forkserver_pool:
            forkserver_results = [
                forkserver_pool.schedule(worker_task, args=(data, f"forkserver-{data}"))
                for data in test_data
            ]
            for future in forkserver_results:
                print(f"  {future.result()}")

# Run context examples
context_examples()

Process Initialization and Cleanup

Initialize worker processes with shared resources and handle cleanup:

from pebble import ProcessPool
import multiprocessing
import logging
import os

# Global state for worker processes
worker_state = {}

def init_worker_process(config, log_level):
    """Initialize each worker process"""
    global worker_state
    
    # Setup logging for this process
    logging.basicConfig(
        level=log_level,
        format=f'PID-{os.getpid()}: %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)
    
    # Initialize worker state
    worker_state = {
        'config': config,
        'logger': logger,
        'task_count': 0,
        'process_id': os.getpid()
    }
    
    logger.info(f"Worker process {os.getpid()} initialized with config: {config}")

def worker_task_with_state(task_data):
    """Task that uses initialized worker state"""
    global worker_state
    
    worker_state['task_count'] += 1
    logger = worker_state['logger']
    
    logger.info(f"Processing task {worker_state['task_count']}: {task_data}")
    
    # Simulate work using config
    multiplier = worker_state['config'].get('multiplier', 1)
    result = task_data * multiplier
    
    # Simulate some processing time
    import time
    time.sleep(0.1)
    
    logger.info(f"Task completed. Result: {result}")
    
    return {
        'input': task_data,
        'result': result,
        'task_number': worker_state['task_count'],
        'process_id': worker_state['process_id']
    }

# Create pool with worker initialization
config = {'multiplier': 3, 'timeout': 30}

pool = ProcessPool(
    max_workers=3,
    max_tasks=5,  # Restart workers every 5 tasks
    initializer=init_worker_process,
    initargs=(config, logging.INFO)
)

try:
    # Schedule multiple tasks to see worker behavior
    tasks = list(range(1, 16))  # 15 tasks
    futures = []
    
    for task in tasks:
        future = pool.schedule(worker_task_with_state, args=(task,))
        futures.append(future)
    
    # Collect results
    results = []
    for future in futures:
        try:
            result = future.result(timeout=10)
            results.append(result)
        except Exception as e:
            print(f"Task failed: {e}")
    
    # Print results showing worker process recycling
    print(f"\nProcessed {len(results)} tasks:")
    for result in results:
        print(f"  Task {result['task_number']} in PID {result['process_id']}: "
              f"{result['input']} -> {result['result']}")
    
    # Group by process ID to see worker recycling
    by_process = {}
    for result in results:
        pid = result['process_id']
        if pid not in by_process:
            by_process[pid] = []
        by_process[pid].append(result['task_number'])
    
    print(f"\nTasks by worker process:")
    for pid, task_numbers in by_process.items():
        print(f"  PID {pid}: tasks {task_numbers}")
        
finally:
    pool.close()
    pool.join()

Error Handling and Recovery

Handle various error conditions specific to process-based execution:

from pebble import ProcessPool, ProcessExpired
import signal
import time
import os

def normal_task(x):
    return x * 2

def crashing_task():
    # This will cause the process to crash
    os._exit(1)  # Immediate process termination

def hanging_task():
    # This task will hang indefinitely
    while True:
        time.sleep(1)

def memory_intensive_task(size):
    # This might run out of memory
    big_list = [0] * size
    return len(big_list)

def signal_task():
    # This task will receive a signal
    import signal
    os.kill(os.getpid(), signal.SIGTERM)

# Comprehensive error handling
with ProcessPool(max_workers=4) as pool:
    # Schedule various types of tasks
    futures = {
        'normal': pool.schedule(normal_task, args=(42,)),
        'crashing': pool.schedule(crashing_task),
        'hanging': pool.schedule(hanging_task),
        'memory': pool.schedule(memory_intensive_task, args=(10**9,)),  # Huge allocation
        'timeout': pool.schedule(hanging_task, timeout=2.0),  # Will timeout
        'signal': pool.schedule(signal_task)
    }
    
    # Handle each type of error
    for task_name, future in futures.items():
        try:
            if task_name == 'hanging':
                # Don't wait for hanging task
                result = future.result(timeout=1.0)
            else:
                result = future.result(timeout=10.0)
            print(f"{task_name}: SUCCESS - {result}")
            
        except TimeoutError:
            print(f"{task_name}: TIMEOUT - Task exceeded time limit")
            
        except ProcessExpired as e:
            print(f"{task_name}: PROCESS DIED - PID: {e.pid}, Exit code: {e.exitcode}")
            
        except MemoryError:
            print(f"{task_name}: MEMORY ERROR - Not enough memory")
            
        except OSError as e:
            print(f"{task_name}: OS ERROR - {e}")
            
        except Exception as e:
            print(f"{task_name}: UNEXPECTED ERROR - {type(e).__name__}: {e}")
    
    print("\nAll error handling completed")

Advanced Pool Configuration

Configure pools for specific performance and reliability requirements:

from pebble import ProcessPool
import multiprocessing
import time

# High-performance pool for CPU-intensive work
def create_high_performance_pool():
    return ProcessPool(
        max_workers=multiprocessing.cpu_count() * 2,  # Oversubscribe for mixed workloads
        max_tasks=0,  # No worker recycling for maximum performance
        context=multiprocessing.get_context('spawn')  # Clean process creation
    )

# Reliable pool with frequent worker recycling
def create_reliable_pool():
    return ProcessPool(
        max_workers=multiprocessing.cpu_count(),
        max_tasks=10,  # Recycle workers frequently to prevent memory leaks
        context=multiprocessing.get_context('spawn')
    )

# Memory-conscious pool
def create_memory_conscious_pool():
    return ProcessPool(
        max_workers=max(1, multiprocessing.cpu_count() // 2),  # Fewer workers
        max_tasks=5,  # Frequent recycling to free memory
        context=multiprocessing.get_context('spawn')
    )

def benchmark_task(iterations):
    """Benchmark task for testing pool performance"""
    import math
    total = 0
    for i in range(iterations):
        total += math.sin(i) * math.cos(i)
    return total

# Benchmark different pool configurations
configurations = {
    'high_performance': create_high_performance_pool(),
    'reliable': create_reliable_pool(),
    'memory_conscious': create_memory_conscious_pool()
}

for config_name, pool in configurations.items():
    print(f"\nTesting {config_name} configuration:")
    
    start_time = time.time()
    
    with pool:
        # Submit benchmark tasks
        futures = [
            pool.schedule(benchmark_task, args=(100000,))
            for _ in range(20)
        ]
        
        # Wait for completion
        results = [f.result() for f in futures]
    
    end_time = time.time()
    
    print(f"  Completed {len(results)} tasks in {end_time - start_time:.2f} seconds")
    print(f"  Average result: {sum(results) / len(results):.2f}")

Install with Tessl CLI

npx tessl i tessl/pypi-pebble

docs

asynchronous-decorators.md

concurrent-decorators.md

future-types-exceptions.md

index.md

process-pools.md

synchronization-utilities.md

thread-pools.md

tile.json