Threading and multiprocessing eye-candy with decorator-based concurrent execution and advanced worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Enhanced Future objects and exception types for handling concurrent execution results, timeouts, and error conditions specific to Pebble's execution model. These types provide advanced capabilities beyond standard concurrent.futures functionality.
Enhanced Future object specifically designed for process-based execution with additional capabilities for process management and timeout handling.
class ProcessFuture(concurrent.futures.Future):
def cancel(self) -> bool:
"""
Attempt to cancel the process execution.
Returns:
True if cancellation was successful, False otherwise
"""
def result(self, timeout=None):
"""
Get the result of the process execution.
Parameters:
- timeout: Maximum time to wait for result in seconds
Returns:
The result of the executed function
Raises:
- TimeoutError: If timeout is exceeded
- ProcessExpired: If the process died unexpectedly
- CancelledError: If the execution was cancelled
- Exception: Any exception raised by the executed function
"""
def exception(self, timeout=None):
"""
Get the exception raised by the process execution.
Parameters:
- timeout: Maximum time to wait for completion in seconds
Returns:
Exception raised by the function, or None if successful
"""
def add_done_callback(self, fn):
"""
Add a callback to be called when the process completes.
Parameters:
- fn: Callback function that takes the Future as argument
"""from pebble.concurrent import process
from pebble import ProcessExpired
import time
@process(timeout=5.0)
def cpu_intensive_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
@process
def potentially_failing_task(should_fail=False):
if should_fail:
raise ValueError("Task was configured to fail")
return "Success"
@process
def long_running_task():
time.sleep(10)
return "Finally done"
# Using ProcessFuture features
def test_process_future():
# Schedule tasks
future1 = cpu_intensive_task(1000000)
future2 = potentially_failing_task(should_fail=True)
future3 = long_running_task()
# Add callbacks
def completion_callback(future):
try:
result = future.result(timeout=0) # Don't wait
print(f"Task completed successfully: {result}")
except Exception as e:
print(f"Task failed: {type(e).__name__}: {e}")
future1.add_done_callback(completion_callback)
future2.add_done_callback(completion_callback)
future3.add_done_callback(completion_callback)
# Handle different scenarios
try:
# This should succeed
result1 = future1.result(timeout=10)
print(f"CPU task result: {result1}")
except TimeoutError:
print("CPU task timed out")
except ProcessExpired as e:
print(f"CPU task process died: {e}")
try:
# This should raise ValueError
result2 = future2.result(timeout=5)
print(f"Failing task result: {result2}")
except ValueError as e:
print(f"Expected failure: {e}")
except ProcessExpired as e:
print(f"Process died: {e}")
# Try to cancel long-running task
print(f"Attempting to cancel long-running task...")
cancelled = future3.cancel()
print(f"Cancellation {'successful' if cancelled else 'failed'}")
if not cancelled:
try:
result3 = future3.result(timeout=2)
print(f"Long task result: {result3}")
except TimeoutError:
print("Long task timed out as expected")
test_process_future()Future objects specifically designed for handling map operations in thread and process pools, with iteration capabilities and bulk result handling.
class MapFuture(concurrent.futures.Future):
def __init__(self, futures, timeout=None):
"""
Future for thread pool map operations.
Parameters:
- futures: List of individual Future objects
- timeout: Overall timeout for the map operation
"""
def __iter__(self):
"""
Iterate over results as they become available.
Yields:
Results in the order they were submitted
"""
def cancel(self) -> bool:
"""Cancel all underlying futures."""
def result(self, timeout=None):
"""Get all results as a list."""
class ProcessMapFuture(concurrent.futures.Future):
def __init__(self, futures, timeout=None):
"""
Future for process pool map operations.
Parameters:
- futures: List of individual ProcessFuture objects
- timeout: Overall timeout for the map operation
"""
def __iter__(self):
"""
Iterate over results as they become available.
Yields:
Results in the order they were submitted
"""
def cancel(self) -> bool:
"""Cancel all underlying futures."""
def result(self, timeout=None):
"""Get all results as a list."""from pebble import ThreadPool, ProcessPool
import time
import math
def io_bound_task(delay):
time.sleep(delay)
return f"IO task completed after {delay}s"
def cpu_bound_task(n):
return sum(math.sin(i) for i in range(n))
# Thread pool map operations
def test_map_futures():
# ThreadPool map
with ThreadPool(max_workers=4) as thread_pool:
delays = [0.5, 1.0, 1.5, 2.0, 0.3]
# Get MapFuture
map_future = thread_pool.map(io_bound_task, delays, timeout=10)
print("Thread pool map results:")
# Iterate over results as they complete
for i, result in enumerate(map_future):
print(f" Task {i}: {result}")
# Alternative: get all results at once
# all_results = list(map_future)
# print(f"All results: {all_results}")
# ProcessPool map
with ProcessPool(max_workers=3) as process_pool:
work_sizes = [10000, 20000, 15000, 25000]
# Get ProcessMapFuture
process_map_future = process_pool.map(
cpu_bound_task,
work_sizes,
chunksize=2,
timeout=30
)
print("\nProcess pool map results:")
try:
for i, result in enumerate(process_map_future):
print(f" CPU task {i} (size {work_sizes[i]}): {result:.2f}")
except TimeoutError:
print(" Map operation timed out")
except Exception as e:
print(f" Map operation failed: {e}")
test_map_futures()Exception raised when a worker process dies unexpectedly, providing detailed information about the process failure.
class ProcessExpired(OSError):
def __init__(self, msg, code=0, pid=None):
"""
Exception for unexpected process termination.
Parameters:
- msg: Error message describing the failure
- code: Process exit code
- pid: Process ID of the failed process
"""
super().__init__(msg)
self.exitcode = code
self.pid = pid
def __str__(self):
"""String representation including exit code and PID."""from pebble.concurrent import process
from pebble import ProcessExpired, ProcessPool
import os
import signal
import time
@process
def crashing_task(crash_type="exit"):
if crash_type == "exit":
# Clean exit with code
exit(42)
elif crash_type == "abort":
# Abnormal termination
os.abort()
elif crash_type == "segfault":
# Simulate segmentation fault (platform dependent)
import ctypes
ctypes.string_at(0)
elif crash_type == "signal":
# Kill self with signal
os.kill(os.getpid(), signal.SIGKILL)
else:
raise ValueError("Invalid crash type")
@process(timeout=2.0)
def timeout_task():
time.sleep(10) # Will be killed by timeout
return "Should never reach here"
def test_process_expired():
crash_types = ["exit", "abort", "signal"]
for crash_type in crash_types:
print(f"\nTesting {crash_type} crash:")
future = crashing_task(crash_type)
try:
result = future.result(timeout=5)
print(f" Unexpected success: {result}")
except ProcessExpired as e:
print(f" Process expired: {e}")
print(f" Exit code: {e.exitcode}")
print(f" Process PID: {e.pid}")
print(f" Exception args: {e.args}")
except Exception as e:
print(f" Other exception: {type(e).__name__}: {e}")
# Test timeout-induced process expiration
print(f"\nTesting timeout:")
future = timeout_task()
try:
result = future.result()
except TimeoutError:
print(" Task timed out as expected")
except ProcessExpired as e:
print(f" Process expired due to timeout: {e}")
except Exception as e:
print(f" Unexpected exception: {type(e).__name__}: {e}")
# Pool-level ProcessExpired handling
def test_pool_process_expired():
@process
def unstable_task(task_id, should_crash=False):
if should_crash:
if task_id % 2 == 0:
os._exit(1) # Hard exit
else:
raise RuntimeError(f"Task {task_id} crashed")
return f"Task {task_id} succeeded"
with ProcessPool(max_workers=3) as pool:
# Submit mix of stable and unstable tasks
futures = []
for i in range(10):
should_crash = i % 3 == 0 # Every third task crashes
future = pool.schedule(unstable_task, args=(i, should_crash))
futures.append((i, future))
print("Pool task results:")
for task_id, future in futures:
try:
result = future.result(timeout=5)
print(f" Task {task_id}: {result}")
except ProcessExpired as e:
print(f" Task {task_id}: Process died (PID: {e.pid}, code: {e.exitcode})")
except RuntimeError as e:
print(f" Task {task_id}: Runtime error: {e}")
except Exception as e:
print(f" Task {task_id}: Unexpected error: {type(e).__name__}: {e}")
test_process_expired()
test_pool_process_expired()Comprehensive error handling strategies using Pebble's future types and exceptions:
from pebble import ProcessPool, ThreadPool, ProcessExpired
from pebble.concurrent import process, thread
import time
import random
import logging
# Setup logging for error tracking
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TaskManager:
def __init__(self):
self.successful_tasks = 0
self.failed_tasks = 0
self.expired_processes = 0
self.timeout_tasks = 0
def execute_with_retry(self, task_func, max_retries=3, *args, **kwargs):
"""Execute a task with retry logic for different failure types"""
for attempt in range(max_retries + 1):
try:
# Create future for this attempt
if hasattr(task_func, '__wrapped__'): # It's a decorated function
future = task_func(*args, **kwargs)
else:
# Use process decorator
@process(timeout=10.0)
def wrapped_task():
return task_func(*args, **kwargs)
future = wrapped_task()
# Wait for result
result = future.result(timeout=15.0)
self.successful_tasks += 1
logger.info(f"Task succeeded on attempt {attempt + 1}")
return result
except ProcessExpired as e:
self.expired_processes += 1
logger.warning(f"Process expired on attempt {attempt + 1}: {e}")
if attempt == max_retries:
logger.error(f"Task failed after {max_retries + 1} attempts (process expiration)")
raise
# Wait before retry
time.sleep(0.5 * (attempt + 1))
except TimeoutError:
self.timeout_tasks += 1
logger.warning(f"Task timed out on attempt {attempt + 1}")
if attempt == max_retries:
logger.error(f"Task failed after {max_retries + 1} attempts (timeout)")
raise
# Wait before retry
time.sleep(1.0 * (attempt + 1))
except Exception as e:
self.failed_tasks += 1
logger.error(f"Task failed with exception on attempt {attempt + 1}: {e}")
# Don't retry for regular exceptions
raise
def get_stats(self):
return {
'successful': self.successful_tasks,
'failed': self.failed_tasks,
'expired': self.expired_processes,
'timeout': self.timeout_tasks
}
# Unreliable tasks for testing
def unreliable_task(task_id, failure_rate=0.3):
"""Task that randomly fails in different ways"""
failure_type = random.choice(['success', 'crash', 'timeout', 'exception'])
if random.random() < failure_rate:
if failure_type == 'crash':
import os
os._exit(1)
elif failure_type == 'timeout':
time.sleep(20) # Will cause timeout
elif failure_type == 'exception':
raise ValueError(f"Task {task_id} random failure")
# Simulate work
time.sleep(random.uniform(0.1, 1.0))
return f"Task {task_id} completed successfully"
# Test comprehensive error handling
def test_comprehensive_error_handling():
manager = TaskManager()
# Test individual task retry
print("Testing individual task retry:")
try:
result = manager.execute_with_retry(unreliable_task, 3, "test-task", 0.7)
print(f"Result: {result}")
except Exception as e:
print(f"Final failure: {type(e).__name__}: {e}")
# Test batch processing with error handling
print("\nTesting batch processing:")
with ProcessPool(max_workers=4) as pool:
futures = []
# Submit batch of unreliable tasks
for i in range(20):
future = pool.schedule(unreliable_task, args=(i, 0.4))
futures.append((i, future))
# Process results with different error handling strategies
results = {}
errors = {}
for task_id, future in futures:
try:
# Use shorter timeout for individual tasks
result = future.result(timeout=5.0)
results[task_id] = result
manager.successful_tasks += 1
except ProcessExpired as e:
error_msg = f"Process died (PID: {e.pid}, code: {e.exitcode})"
errors[task_id] = error_msg
manager.expired_processes += 1
logger.warning(f"Task {task_id}: {error_msg}")
except TimeoutError:
error_msg = "Task timed out"
errors[task_id] = error_msg
manager.timeout_tasks += 1
logger.warning(f"Task {task_id}: {error_msg}")
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"
errors[task_id] = error_msg
manager.failed_tasks += 1
logger.error(f"Task {task_id}: {error_msg}")
# Print summary
stats = manager.get_stats()
total_tasks = sum(stats.values())
print(f"\nExecution Summary:")
print(f" Total tasks: {total_tasks}")
print(f" Successful: {stats['successful']} ({stats['successful']/total_tasks*100:.1f}%)")
print(f" Failed: {stats['failed']} ({stats['failed']/total_tasks*100:.1f}%)")
print(f" Process expired: {stats['expired']} ({stats['expired']/total_tasks*100:.1f}%)")
print(f" Timeout: {stats['timeout']} ({stats['timeout']/total_tasks*100:.1f}%)")
print(f"\nSuccessful results: {len(results)}")
print(f"Error conditions: {len(errors)}")
# Advanced callback and monitoring patterns
def test_advanced_monitoring():
"""Test advanced monitoring using callbacks and custom Future handling"""
completed_tasks = []
failed_tasks = []
def success_callback(future):
try:
result = future.result(timeout=0)
completed_tasks.append(result)
print(f"✓ Task completed: {result}")
except:
pass # Not completed yet or failed
def failure_callback(future):
try:
future.result(timeout=0)
except Exception as e:
failed_tasks.append(str(e))
print(f"✗ Task failed: {type(e).__name__}: {e}")
@process(timeout=3.0)
def monitored_task(task_id, duration):
time.sleep(duration)
if duration > 2.5: # Will timeout
time.sleep(10)
return f"Monitored task {task_id}"
print("Testing advanced monitoring:")
# Create tasks with different durations
durations = [0.5, 1.0, 1.5, 2.0, 3.0] # Last one will timeout
futures = []
for i, duration in enumerate(durations):
future = monitored_task(i, duration)
# Add callbacks
future.add_done_callback(success_callback)
future.add_done_callback(failure_callback)
futures.append(future)
# Wait for all to complete
time.sleep(5)
print(f"\nMonitoring results:")
print(f" Completed: {len(completed_tasks)}")
print(f" Failed: {len(failed_tasks)}")
# Run comprehensive tests
test_comprehensive_error_handling()
test_advanced_monitoring()Install with Tessl CLI
npx tessl i tessl/pypi-pebble