0
# Future Types and Exceptions
1
2
Enhanced Future objects and exception types for handling concurrent execution results, timeouts, and error conditions specific to Pebble's execution model. These types provide advanced capabilities beyond standard concurrent.futures functionality.
3
4
## Capabilities
5
6
### ProcessFuture
7
8
Enhanced Future object specifically designed for process-based execution with additional capabilities for process management and timeout handling.
9
10
```python { .api }
11
class ProcessFuture(concurrent.futures.Future):
12
def cancel(self) -> bool:
13
"""
14
Attempt to cancel the process execution.
15
16
Returns:
17
True if cancellation was successful, False otherwise
18
"""
19
20
def result(self, timeout=None):
21
"""
22
Get the result of the process execution.
23
24
Parameters:
25
- timeout: Maximum time to wait for result in seconds
26
27
Returns:
28
The result of the executed function
29
30
Raises:
31
- TimeoutError: If timeout is exceeded
32
- ProcessExpired: If the process died unexpectedly
33
- CancelledError: If the execution was cancelled
34
- Exception: Any exception raised by the executed function
35
"""
36
37
def exception(self, timeout=None):
38
"""
39
Get the exception raised by the process execution.
40
41
Parameters:
42
- timeout: Maximum time to wait for completion in seconds
43
44
Returns:
45
Exception raised by the function, or None if successful
46
"""
47
48
def add_done_callback(self, fn):
49
"""
50
Add a callback to be called when the process completes.
51
52
Parameters:
53
- fn: Callback function that takes the Future as argument
54
"""
55
```
56
57
#### ProcessFuture Usage Examples
58
59
```python
60
from pebble.concurrent import process
61
from pebble import ProcessExpired
62
import time
63
64
@process(timeout=5.0)
65
def cpu_intensive_task(n):
66
total = 0
67
for i in range(n):
68
total += i ** 2
69
return total
70
71
@process
72
def potentially_failing_task(should_fail=False):
73
if should_fail:
74
raise ValueError("Task was configured to fail")
75
return "Success"
76
77
@process
78
def long_running_task():
79
time.sleep(10)
80
return "Finally done"
81
82
# Using ProcessFuture features
83
def test_process_future():
84
# Schedule tasks
85
future1 = cpu_intensive_task(1000000)
86
future2 = potentially_failing_task(should_fail=True)
87
future3 = long_running_task()
88
89
# Add callbacks
90
def completion_callback(future):
91
try:
92
result = future.result(timeout=0) # Don't wait
93
print(f"Task completed successfully: {result}")
94
except Exception as e:
95
print(f"Task failed: {type(e).__name__}: {e}")
96
97
future1.add_done_callback(completion_callback)
98
future2.add_done_callback(completion_callback)
99
future3.add_done_callback(completion_callback)
100
101
# Handle different scenarios
102
try:
103
# This should succeed
104
result1 = future1.result(timeout=10)
105
print(f"CPU task result: {result1}")
106
except TimeoutError:
107
print("CPU task timed out")
108
except ProcessExpired as e:
109
print(f"CPU task process died: {e}")
110
111
try:
112
# This should raise ValueError
113
result2 = future2.result(timeout=5)
114
print(f"Failing task result: {result2}")
115
except ValueError as e:
116
print(f"Expected failure: {e}")
117
except ProcessExpired as e:
118
print(f"Process died: {e}")
119
120
# Try to cancel long-running task
121
print(f"Attempting to cancel long-running task...")
122
cancelled = future3.cancel()
123
print(f"Cancellation {'successful' if cancelled else 'failed'}")
124
125
if not cancelled:
126
try:
127
result3 = future3.result(timeout=2)
128
print(f"Long task result: {result3}")
129
except TimeoutError:
130
print("Long task timed out as expected")
131
132
test_process_future()
133
```
134
135
### MapFuture and ProcessMapFuture
136
137
Future objects specifically designed for handling map operations in thread and process pools, with iteration capabilities and bulk result handling.
138
139
```python { .api }
140
class MapFuture(concurrent.futures.Future):
141
def __init__(self, futures, timeout=None):
142
"""
143
Future for thread pool map operations.
144
145
Parameters:
146
- futures: List of individual Future objects
147
- timeout: Overall timeout for the map operation
148
"""
149
150
def __iter__(self):
151
"""
152
Iterate over results as they become available.
153
154
Yields:
155
Results in the order they were submitted
156
"""
157
158
def cancel(self) -> bool:
159
"""Cancel all underlying futures."""
160
161
def result(self, timeout=None):
162
"""Get all results as a list."""
163
164
class ProcessMapFuture(concurrent.futures.Future):
165
def __init__(self, futures, timeout=None):
166
"""
167
Future for process pool map operations.
168
169
Parameters:
170
- futures: List of individual ProcessFuture objects
171
- timeout: Overall timeout for the map operation
172
"""
173
174
def __iter__(self):
175
"""
176
Iterate over results as they become available.
177
178
Yields:
179
Results in the order they were submitted
180
"""
181
182
def cancel(self) -> bool:
183
"""Cancel all underlying futures."""
184
185
def result(self, timeout=None):
186
"""Get all results as a list."""
187
```
188
189
#### Map Future Usage Examples
190
191
```python
192
from pebble import ThreadPool, ProcessPool
193
import time
194
import math
195
196
def io_bound_task(delay):
197
time.sleep(delay)
198
return f"IO task completed after {delay}s"
199
200
def cpu_bound_task(n):
201
return sum(math.sin(i) for i in range(n))
202
203
# Thread pool map operations
204
def test_map_futures():
205
# ThreadPool map
206
with ThreadPool(max_workers=4) as thread_pool:
207
delays = [0.5, 1.0, 1.5, 2.0, 0.3]
208
209
# Get MapFuture
210
map_future = thread_pool.map(io_bound_task, delays, timeout=10)
211
212
print("Thread pool map results:")
213
214
# Iterate over results as they complete
215
for i, result in enumerate(map_future):
216
print(f" Task {i}: {result}")
217
218
# Alternative: get all results at once
219
# all_results = list(map_future)
220
# print(f"All results: {all_results}")
221
222
# ProcessPool map
223
with ProcessPool(max_workers=3) as process_pool:
224
work_sizes = [10000, 20000, 15000, 25000]
225
226
# Get ProcessMapFuture
227
process_map_future = process_pool.map(
228
cpu_bound_task,
229
work_sizes,
230
chunksize=2,
231
timeout=30
232
)
233
234
print("\nProcess pool map results:")
235
236
try:
237
for i, result in enumerate(process_map_future):
238
print(f" CPU task {i} (size {work_sizes[i]}): {result:.2f}")
239
except TimeoutError:
240
print(" Map operation timed out")
241
except Exception as e:
242
print(f" Map operation failed: {e}")
243
244
test_map_futures()
245
```
246
247
### ProcessExpired Exception
248
249
Exception raised when a worker process dies unexpectedly, providing detailed information about the process failure.
250
251
```python { .api }
252
class ProcessExpired(OSError):
253
def __init__(self, msg, code=0, pid=None):
254
"""
255
Exception for unexpected process termination.
256
257
Parameters:
258
- msg: Error message describing the failure
259
- code: Process exit code
260
- pid: Process ID of the failed process
261
"""
262
super().__init__(msg)
263
self.exitcode = code
264
self.pid = pid
265
266
def __str__(self):
267
"""String representation including exit code and PID."""
268
```
269
270
#### ProcessExpired Handling Examples
271
272
```python
273
from pebble.concurrent import process
274
from pebble import ProcessExpired, ProcessPool
275
import os
276
import signal
277
import time
278
279
@process
280
def crashing_task(crash_type="exit"):
281
if crash_type == "exit":
282
# Clean exit with code
283
exit(42)
284
elif crash_type == "abort":
285
# Abnormal termination
286
os.abort()
287
elif crash_type == "segfault":
288
# Simulate segmentation fault (platform dependent)
289
import ctypes
290
ctypes.string_at(0)
291
elif crash_type == "signal":
292
# Kill self with signal
293
os.kill(os.getpid(), signal.SIGKILL)
294
else:
295
raise ValueError("Invalid crash type")
296
297
@process(timeout=2.0)
298
def timeout_task():
299
time.sleep(10) # Will be killed by timeout
300
return "Should never reach here"
301
302
def test_process_expired():
303
crash_types = ["exit", "abort", "signal"]
304
305
for crash_type in crash_types:
306
print(f"\nTesting {crash_type} crash:")
307
308
future = crashing_task(crash_type)
309
310
try:
311
result = future.result(timeout=5)
312
print(f" Unexpected success: {result}")
313
except ProcessExpired as e:
314
print(f" Process expired: {e}")
315
print(f" Exit code: {e.exitcode}")
316
print(f" Process PID: {e.pid}")
317
print(f" Exception args: {e.args}")
318
except Exception as e:
319
print(f" Other exception: {type(e).__name__}: {e}")
320
321
# Test timeout-induced process expiration
322
print(f"\nTesting timeout:")
323
future = timeout_task()
324
325
try:
326
result = future.result()
327
except TimeoutError:
328
print(" Task timed out as expected")
329
except ProcessExpired as e:
330
print(f" Process expired due to timeout: {e}")
331
except Exception as e:
332
print(f" Unexpected exception: {type(e).__name__}: {e}")
333
334
# Pool-level ProcessExpired handling
335
def test_pool_process_expired():
336
@process
337
def unstable_task(task_id, should_crash=False):
338
if should_crash:
339
if task_id % 2 == 0:
340
os._exit(1) # Hard exit
341
else:
342
raise RuntimeError(f"Task {task_id} crashed")
343
return f"Task {task_id} succeeded"
344
345
with ProcessPool(max_workers=3) as pool:
346
# Submit mix of stable and unstable tasks
347
futures = []
348
for i in range(10):
349
should_crash = i % 3 == 0 # Every third task crashes
350
future = pool.schedule(unstable_task, args=(i, should_crash))
351
futures.append((i, future))
352
353
print("Pool task results:")
354
for task_id, future in futures:
355
try:
356
result = future.result(timeout=5)
357
print(f" Task {task_id}: {result}")
358
except ProcessExpired as e:
359
print(f" Task {task_id}: Process died (PID: {e.pid}, code: {e.exitcode})")
360
except RuntimeError as e:
361
print(f" Task {task_id}: Runtime error: {e}")
362
except Exception as e:
363
print(f" Task {task_id}: Unexpected error: {type(e).__name__}: {e}")
364
365
test_process_expired()
366
test_pool_process_expired()
367
```
368
369
### Advanced Error Handling Patterns
370
371
Comprehensive error handling strategies using Pebble's future types and exceptions:
372
373
```python
374
from pebble import ProcessPool, ThreadPool, ProcessExpired
375
from pebble.concurrent import process, thread
376
import time
377
import random
378
import logging
379
380
# Setup logging for error tracking
381
logging.basicConfig(level=logging.INFO)
382
logger = logging.getLogger(__name__)
383
384
class TaskManager:
385
def __init__(self):
386
self.successful_tasks = 0
387
self.failed_tasks = 0
388
self.expired_processes = 0
389
self.timeout_tasks = 0
390
391
def execute_with_retry(self, task_func, max_retries=3, *args, **kwargs):
392
"""Execute a task with retry logic for different failure types"""
393
394
for attempt in range(max_retries + 1):
395
try:
396
# Create future for this attempt
397
if hasattr(task_func, '__wrapped__'): # It's a decorated function
398
future = task_func(*args, **kwargs)
399
else:
400
# Use process decorator
401
@process(timeout=10.0)
402
def wrapped_task():
403
return task_func(*args, **kwargs)
404
future = wrapped_task()
405
406
# Wait for result
407
result = future.result(timeout=15.0)
408
409
self.successful_tasks += 1
410
logger.info(f"Task succeeded on attempt {attempt + 1}")
411
return result
412
413
except ProcessExpired as e:
414
self.expired_processes += 1
415
logger.warning(f"Process expired on attempt {attempt + 1}: {e}")
416
417
if attempt == max_retries:
418
logger.error(f"Task failed after {max_retries + 1} attempts (process expiration)")
419
raise
420
421
# Wait before retry
422
time.sleep(0.5 * (attempt + 1))
423
424
except TimeoutError:
425
self.timeout_tasks += 1
426
logger.warning(f"Task timed out on attempt {attempt + 1}")
427
428
if attempt == max_retries:
429
logger.error(f"Task failed after {max_retries + 1} attempts (timeout)")
430
raise
431
432
# Wait before retry
433
time.sleep(1.0 * (attempt + 1))
434
435
except Exception as e:
436
self.failed_tasks += 1
437
logger.error(f"Task failed with exception on attempt {attempt + 1}: {e}")
438
439
# Don't retry for regular exceptions
440
raise
441
442
def get_stats(self):
443
return {
444
'successful': self.successful_tasks,
445
'failed': self.failed_tasks,
446
'expired': self.expired_processes,
447
'timeout': self.timeout_tasks
448
}
449
450
# Unreliable tasks for testing
451
def unreliable_task(task_id, failure_rate=0.3):
452
"""Task that randomly fails in different ways"""
453
454
failure_type = random.choice(['success', 'crash', 'timeout', 'exception'])
455
456
if random.random() < failure_rate:
457
if failure_type == 'crash':
458
import os
459
os._exit(1)
460
elif failure_type == 'timeout':
461
time.sleep(20) # Will cause timeout
462
elif failure_type == 'exception':
463
raise ValueError(f"Task {task_id} random failure")
464
465
# Simulate work
466
time.sleep(random.uniform(0.1, 1.0))
467
return f"Task {task_id} completed successfully"
468
469
# Test comprehensive error handling
470
def test_comprehensive_error_handling():
471
manager = TaskManager()
472
473
# Test individual task retry
474
print("Testing individual task retry:")
475
try:
476
result = manager.execute_with_retry(unreliable_task, 3, "test-task", 0.7)
477
print(f"Result: {result}")
478
except Exception as e:
479
print(f"Final failure: {type(e).__name__}: {e}")
480
481
# Test batch processing with error handling
482
print("\nTesting batch processing:")
483
484
with ProcessPool(max_workers=4) as pool:
485
futures = []
486
487
# Submit batch of unreliable tasks
488
for i in range(20):
489
future = pool.schedule(unreliable_task, args=(i, 0.4))
490
futures.append((i, future))
491
492
# Process results with different error handling strategies
493
results = {}
494
errors = {}
495
496
for task_id, future in futures:
497
try:
498
# Use shorter timeout for individual tasks
499
result = future.result(timeout=5.0)
500
results[task_id] = result
501
manager.successful_tasks += 1
502
503
except ProcessExpired as e:
504
error_msg = f"Process died (PID: {e.pid}, code: {e.exitcode})"
505
errors[task_id] = error_msg
506
manager.expired_processes += 1
507
logger.warning(f"Task {task_id}: {error_msg}")
508
509
except TimeoutError:
510
error_msg = "Task timed out"
511
errors[task_id] = error_msg
512
manager.timeout_tasks += 1
513
logger.warning(f"Task {task_id}: {error_msg}")
514
515
except Exception as e:
516
error_msg = f"{type(e).__name__}: {e}"
517
errors[task_id] = error_msg
518
manager.failed_tasks += 1
519
logger.error(f"Task {task_id}: {error_msg}")
520
521
# Print summary
522
stats = manager.get_stats()
523
total_tasks = sum(stats.values())
524
525
print(f"\nExecution Summary:")
526
print(f" Total tasks: {total_tasks}")
527
print(f" Successful: {stats['successful']} ({stats['successful']/total_tasks*100:.1f}%)")
528
print(f" Failed: {stats['failed']} ({stats['failed']/total_tasks*100:.1f}%)")
529
print(f" Process expired: {stats['expired']} ({stats['expired']/total_tasks*100:.1f}%)")
530
print(f" Timeout: {stats['timeout']} ({stats['timeout']/total_tasks*100:.1f}%)")
531
532
print(f"\nSuccessful results: {len(results)}")
533
print(f"Error conditions: {len(errors)}")
534
535
# Advanced callback and monitoring patterns
536
def test_advanced_monitoring():
537
"""Test advanced monitoring using callbacks and custom Future handling"""
538
539
completed_tasks = []
540
failed_tasks = []
541
542
def success_callback(future):
543
try:
544
result = future.result(timeout=0)
545
completed_tasks.append(result)
546
print(f"✓ Task completed: {result}")
547
except:
548
pass # Not completed yet or failed
549
550
def failure_callback(future):
551
try:
552
future.result(timeout=0)
553
except Exception as e:
554
failed_tasks.append(str(e))
555
print(f"✗ Task failed: {type(e).__name__}: {e}")
556
557
@process(timeout=3.0)
558
def monitored_task(task_id, duration):
559
time.sleep(duration)
560
if duration > 2.5: # Will timeout
561
time.sleep(10)
562
return f"Monitored task {task_id}"
563
564
print("Testing advanced monitoring:")
565
566
# Create tasks with different durations
567
durations = [0.5, 1.0, 1.5, 2.0, 3.0] # Last one will timeout
568
futures = []
569
570
for i, duration in enumerate(durations):
571
future = monitored_task(i, duration)
572
573
# Add callbacks
574
future.add_done_callback(success_callback)
575
future.add_done_callback(failure_callback)
576
577
futures.append(future)
578
579
# Wait for all to complete
580
time.sleep(5)
581
582
print(f"\nMonitoring results:")
583
print(f" Completed: {len(completed_tasks)}")
584
print(f" Failed: {len(failed_tasks)}")
585
586
# Run comprehensive tests
587
test_comprehensive_error_handling()
588
test_advanced_monitoring()
589
```