CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pebble

Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

future-types-exceptions.mddocs/

Future Types and Exceptions

Enhanced Future objects and exception types for handling concurrent execution results, timeouts, and error conditions specific to Pebble's execution model. These types provide advanced capabilities beyond standard concurrent.futures functionality.

Capabilities

ProcessFuture

Enhanced Future object specifically designed for process-based execution with additional capabilities for process management and timeout handling.

class ProcessFuture(concurrent.futures.Future):
    def cancel(self) -> bool:
        """
        Attempt to cancel the process execution.
        
        Returns:
        True if cancellation was successful, False otherwise
        """
    
    def result(self, timeout=None):
        """
        Get the result of the process execution.
        
        Parameters:
        - timeout: Maximum time to wait for result in seconds
        
        Returns:
        The result of the executed function
        
        Raises:
        - TimeoutError: If timeout is exceeded
        - ProcessExpired: If the process died unexpectedly
        - CancelledError: If the execution was cancelled
        - Exception: Any exception raised by the executed function
        """
    
    def exception(self, timeout=None):
        """
        Get the exception raised by the process execution.
        
        Parameters:
        - timeout: Maximum time to wait for completion in seconds
        
        Returns:
        Exception raised by the function, or None if successful
        """
    
    def add_done_callback(self, fn):
        """
        Add a callback to be called when the process completes.
        
        Parameters:
        - fn: Callback function that takes the Future as argument
        """

ProcessFuture Usage Examples

from pebble.concurrent import process
from pebble import ProcessExpired
import time

@process(timeout=5.0)
def cpu_intensive_task(n):
    total = 0
    for i in range(n):
        total += i ** 2
    return total

@process
def potentially_failing_task(should_fail=False):
    if should_fail:
        raise ValueError("Task was configured to fail")
    return "Success"

@process
def long_running_task():
    time.sleep(10)
    return "Finally done"

# Using ProcessFuture features
def test_process_future():
    # Schedule tasks
    future1 = cpu_intensive_task(1000000)
    future2 = potentially_failing_task(should_fail=True)
    future3 = long_running_task()
    
    # Add callbacks
    def completion_callback(future):
        try:
            result = future.result(timeout=0)  # Don't wait
            print(f"Task completed successfully: {result}")
        except Exception as e:
            print(f"Task failed: {type(e).__name__}: {e}")
    
    future1.add_done_callback(completion_callback)
    future2.add_done_callback(completion_callback)
    future3.add_done_callback(completion_callback)
    
    # Handle different scenarios
    try:
        # This should succeed
        result1 = future1.result(timeout=10)
        print(f"CPU task result: {result1}")
    except TimeoutError:
        print("CPU task timed out")
    except ProcessExpired as e:
        print(f"CPU task process died: {e}")
    
    try:
        # This should raise ValueError
        result2 = future2.result(timeout=5)
        print(f"Failing task result: {result2}")
    except ValueError as e:
        print(f"Expected failure: {e}")
    except ProcessExpired as e:
        print(f"Process died: {e}")
    
    # Try to cancel long-running task
    print(f"Attempting to cancel long-running task...")
    cancelled = future3.cancel()
    print(f"Cancellation {'successful' if cancelled else 'failed'}")
    
    if not cancelled:
        try:
            result3 = future3.result(timeout=2)
            print(f"Long task result: {result3}")
        except TimeoutError:
            print("Long task timed out as expected")

test_process_future()

MapFuture and ProcessMapFuture

Future objects specifically designed for handling map operations in thread and process pools, with iteration capabilities and bulk result handling.

class MapFuture(concurrent.futures.Future):
    def __init__(self, futures, timeout=None):
        """
        Future for thread pool map operations.
        
        Parameters:
        - futures: List of individual Future objects
        - timeout: Overall timeout for the map operation
        """
    
    def __iter__(self):
        """
        Iterate over results as they become available.
        
        Yields:
        Results in the order they were submitted
        """
    
    def cancel(self) -> bool:
        """Cancel all underlying futures."""
    
    def result(self, timeout=None):
        """Get all results as a list."""

class ProcessMapFuture(concurrent.futures.Future):
    def __init__(self, futures, timeout=None):
        """
        Future for process pool map operations.
        
        Parameters:
        - futures: List of individual ProcessFuture objects
        - timeout: Overall timeout for the map operation
        """
    
    def __iter__(self):
        """
        Iterate over results as they become available.
        
        Yields:
        Results in the order they were submitted
        """
    
    def cancel(self) -> bool:
        """Cancel all underlying futures."""
    
    def result(self, timeout=None):
        """Get all results as a list."""

Map Future Usage Examples

from pebble import ThreadPool, ProcessPool
import time
import math

def io_bound_task(delay):
    time.sleep(delay)
    return f"IO task completed after {delay}s"

def cpu_bound_task(n):
    return sum(math.sin(i) for i in range(n))

# Thread pool map operations
def test_map_futures():
    # ThreadPool map
    with ThreadPool(max_workers=4) as thread_pool:
        delays = [0.5, 1.0, 1.5, 2.0, 0.3]
        
        # Get MapFuture
        map_future = thread_pool.map(io_bound_task, delays, timeout=10)
        
        print("Thread pool map results:")
        
        # Iterate over results as they complete
        for i, result in enumerate(map_future):
            print(f"  Task {i}: {result}")
        
        # Alternative: get all results at once
        # all_results = list(map_future)
        # print(f"All results: {all_results}")
    
    # ProcessPool map
    with ProcessPool(max_workers=3) as process_pool:
        work_sizes = [10000, 20000, 15000, 25000]
        
        # Get ProcessMapFuture
        process_map_future = process_pool.map(
            cpu_bound_task, 
            work_sizes, 
            chunksize=2,
            timeout=30
        )
        
        print("\nProcess pool map results:")
        
        try:
            for i, result in enumerate(process_map_future):
                print(f"  CPU task {i} (size {work_sizes[i]}): {result:.2f}")
        except TimeoutError:
            print("  Map operation timed out")
        except Exception as e:
            print(f"  Map operation failed: {e}")

test_map_futures()

ProcessExpired Exception

Exception raised when a worker process dies unexpectedly, providing detailed information about the process failure.

class ProcessExpired(OSError):
    def __init__(self, msg, code=0, pid=None):
        """
        Exception for unexpected process termination.
        
        Parameters:
        - msg: Error message describing the failure
        - code: Process exit code
        - pid: Process ID of the failed process
        """
        super().__init__(msg)
        self.exitcode = code
        self.pid = pid
    
    def __str__(self):
        """String representation including exit code and PID."""

ProcessExpired Handling Examples

from pebble.concurrent import process
from pebble import ProcessExpired, ProcessPool
import os
import signal
import time

@process
def crashing_task(crash_type="exit"):
    if crash_type == "exit":
        # Clean exit with code
        exit(42)
    elif crash_type == "abort":
        # Abnormal termination
        os.abort()
    elif crash_type == "segfault":
        # Simulate segmentation fault (platform dependent)
        import ctypes
        ctypes.string_at(0)
    elif crash_type == "signal":
        # Kill self with signal
        os.kill(os.getpid(), signal.SIGKILL)
    else:
        raise ValueError("Invalid crash type")

@process(timeout=2.0)
def timeout_task():
    time.sleep(10)  # Will be killed by timeout
    return "Should never reach here"

def test_process_expired():
    crash_types = ["exit", "abort", "signal"]
    
    for crash_type in crash_types:
        print(f"\nTesting {crash_type} crash:")
        
        future = crashing_task(crash_type)
        
        try:
            result = future.result(timeout=5)
            print(f"  Unexpected success: {result}")
        except ProcessExpired as e:
            print(f"  Process expired: {e}")
            print(f"  Exit code: {e.exitcode}")
            print(f"  Process PID: {e.pid}")
            print(f"  Exception args: {e.args}")
        except Exception as e:
            print(f"  Other exception: {type(e).__name__}: {e}")
    
    # Test timeout-induced process expiration
    print(f"\nTesting timeout:")
    future = timeout_task()
    
    try:
        result = future.result()
    except TimeoutError:
        print("  Task timed out as expected")
    except ProcessExpired as e:
        print(f"  Process expired due to timeout: {e}")
    except Exception as e:
        print(f"  Unexpected exception: {type(e).__name__}: {e}")

# Pool-level ProcessExpired handling
def test_pool_process_expired():
    @process
    def unstable_task(task_id, should_crash=False):
        if should_crash:
            if task_id % 2 == 0:
                os._exit(1)  # Hard exit
            else:
                raise RuntimeError(f"Task {task_id} crashed")
        return f"Task {task_id} succeeded"
    
    with ProcessPool(max_workers=3) as pool:
        # Submit mix of stable and unstable tasks
        futures = []
        for i in range(10):
            should_crash = i % 3 == 0  # Every third task crashes
            future = pool.schedule(unstable_task, args=(i, should_crash))
            futures.append((i, future))
        
        print("Pool task results:")
        for task_id, future in futures:
            try:
                result = future.result(timeout=5)
                print(f"  Task {task_id}: {result}")
            except ProcessExpired as e:
                print(f"  Task {task_id}: Process died (PID: {e.pid}, code: {e.exitcode})")
            except RuntimeError as e:
                print(f"  Task {task_id}: Runtime error: {e}")
            except Exception as e:
                print(f"  Task {task_id}: Unexpected error: {type(e).__name__}: {e}")

test_process_expired()
test_pool_process_expired()

Advanced Error Handling Patterns

Comprehensive error handling strategies using Pebble's future types and exceptions:

from pebble import ProcessPool, ThreadPool, ProcessExpired
from pebble.concurrent import process, thread
import time
import random
import logging

# Setup logging for error tracking
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TaskManager:
    def __init__(self):
        self.successful_tasks = 0
        self.failed_tasks = 0
        self.expired_processes = 0
        self.timeout_tasks = 0
    
    def execute_with_retry(self, task_func, max_retries=3, *args, **kwargs):
        """Execute a task with retry logic for different failure types"""
        
        for attempt in range(max_retries + 1):
            try:
                # Create future for this attempt
                if hasattr(task_func, '__wrapped__'):  # It's a decorated function
                    future = task_func(*args, **kwargs)
                else:
                    # Use process decorator
                    @process(timeout=10.0)
                    def wrapped_task():
                        return task_func(*args, **kwargs)
                    future = wrapped_task()
                
                # Wait for result
                result = future.result(timeout=15.0)
                
                self.successful_tasks += 1
                logger.info(f"Task succeeded on attempt {attempt + 1}")
                return result
                
            except ProcessExpired as e:
                self.expired_processes += 1
                logger.warning(f"Process expired on attempt {attempt + 1}: {e}")
                
                if attempt == max_retries:
                    logger.error(f"Task failed after {max_retries + 1} attempts (process expiration)")
                    raise
                
                # Wait before retry
                time.sleep(0.5 * (attempt + 1))
                
            except TimeoutError:
                self.timeout_tasks += 1
                logger.warning(f"Task timed out on attempt {attempt + 1}")
                
                if attempt == max_retries:
                    logger.error(f"Task failed after {max_retries + 1} attempts (timeout)")
                    raise
                
                # Wait before retry
                time.sleep(1.0 * (attempt + 1))
                
            except Exception as e:
                self.failed_tasks += 1
                logger.error(f"Task failed with exception on attempt {attempt + 1}: {e}")
                
                # Don't retry for regular exceptions
                raise
    
    def get_stats(self):
        return {
            'successful': self.successful_tasks,
            'failed': self.failed_tasks,
            'expired': self.expired_processes,
            'timeout': self.timeout_tasks
        }

# Unreliable tasks for testing
def unreliable_task(task_id, failure_rate=0.3):
    """Task that randomly fails in different ways"""
    
    failure_type = random.choice(['success', 'crash', 'timeout', 'exception'])
    
    if random.random() < failure_rate:
        if failure_type == 'crash':
            import os
            os._exit(1)
        elif failure_type == 'timeout':
            time.sleep(20)  # Will cause timeout
        elif failure_type == 'exception':
            raise ValueError(f"Task {task_id} random failure")
    
    # Simulate work
    time.sleep(random.uniform(0.1, 1.0))
    return f"Task {task_id} completed successfully"

# Test comprehensive error handling
def test_comprehensive_error_handling():
    manager = TaskManager()
    
    # Test individual task retry
    print("Testing individual task retry:")
    try:
        result = manager.execute_with_retry(unreliable_task, 3, "test-task", 0.7)
        print(f"Result: {result}")
    except Exception as e:
        print(f"Final failure: {type(e).__name__}: {e}")
    
    # Test batch processing with error handling
    print("\nTesting batch processing:")
    
    with ProcessPool(max_workers=4) as pool:
        futures = []
        
        # Submit batch of unreliable tasks
        for i in range(20):
            future = pool.schedule(unreliable_task, args=(i, 0.4))
            futures.append((i, future))
        
        # Process results with different error handling strategies
        results = {}
        errors = {}
        
        for task_id, future in futures:
            try:
                # Use shorter timeout for individual tasks
                result = future.result(timeout=5.0)
                results[task_id] = result
                manager.successful_tasks += 1
                
            except ProcessExpired as e:
                error_msg = f"Process died (PID: {e.pid}, code: {e.exitcode})"
                errors[task_id] = error_msg
                manager.expired_processes += 1
                logger.warning(f"Task {task_id}: {error_msg}")
                
            except TimeoutError:
                error_msg = "Task timed out"
                errors[task_id] = error_msg
                manager.timeout_tasks += 1
                logger.warning(f"Task {task_id}: {error_msg}")
                
            except Exception as e:
                error_msg = f"{type(e).__name__}: {e}"
                errors[task_id] = error_msg
                manager.failed_tasks += 1
                logger.error(f"Task {task_id}: {error_msg}")
    
    # Print summary
    stats = manager.get_stats()
    total_tasks = sum(stats.values())
    
    print(f"\nExecution Summary:")
    print(f"  Total tasks: {total_tasks}")
    print(f"  Successful: {stats['successful']} ({stats['successful']/total_tasks*100:.1f}%)")
    print(f"  Failed: {stats['failed']} ({stats['failed']/total_tasks*100:.1f}%)")
    print(f"  Process expired: {stats['expired']} ({stats['expired']/total_tasks*100:.1f}%)")
    print(f"  Timeout: {stats['timeout']} ({stats['timeout']/total_tasks*100:.1f}%)")
    
    print(f"\nSuccessful results: {len(results)}")
    print(f"Error conditions: {len(errors)}")

# Advanced callback and monitoring patterns
def test_advanced_monitoring():
    """Test advanced monitoring using callbacks and custom Future handling"""
    
    completed_tasks = []
    failed_tasks = []
    
    def success_callback(future):
        try:
            result = future.result(timeout=0)
            completed_tasks.append(result)
            print(f"✓ Task completed: {result}")
        except:
            pass  # Not completed yet or failed
    
    def failure_callback(future):
        try:
            future.result(timeout=0)
        except Exception as e:
            failed_tasks.append(str(e))
            print(f"✗ Task failed: {type(e).__name__}: {e}")
    
    @process(timeout=3.0)
    def monitored_task(task_id, duration):
        time.sleep(duration)
        if duration > 2.5:  # Will timeout
            time.sleep(10)
        return f"Monitored task {task_id}"
    
    print("Testing advanced monitoring:")
    
    # Create tasks with different durations
    durations = [0.5, 1.0, 1.5, 2.0, 3.0]  # Last one will timeout
    futures = []
    
    for i, duration in enumerate(durations):
        future = monitored_task(i, duration)
        
        # Add callbacks
        future.add_done_callback(success_callback)
        future.add_done_callback(failure_callback)
        
        futures.append(future)
    
    # Wait for all to complete
    time.sleep(5)
    
    print(f"\nMonitoring results:")
    print(f"  Completed: {len(completed_tasks)}")
    print(f"  Failed: {len(failed_tasks)}")

# Run comprehensive tests
test_comprehensive_error_handling()
test_advanced_monitoring()

Install with Tessl CLI

npx tessl i tessl/pypi-pebble

docs

asynchronous-decorators.md

concurrent-decorators.md

future-types-exceptions.md

index.md

process-pools.md

synchronization-utilities.md

thread-pools.md

tile.json