or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asynchronous-decorators.mdconcurrent-decorators.mdfuture-types-exceptions.mdindex.mdprocess-pools.mdsynchronization-utilities.mdthread-pools.md

future-types-exceptions.mddocs/

0

# Future Types and Exceptions

1

2

Enhanced Future objects and exception types for handling concurrent execution results, timeouts, and error conditions specific to Pebble's execution model. These types provide advanced capabilities beyond standard concurrent.futures functionality.

3

4

## Capabilities

5

6

### ProcessFuture

7

8

Enhanced Future object specifically designed for process-based execution with additional capabilities for process management and timeout handling.

9

10

```python { .api }

11

class ProcessFuture(concurrent.futures.Future):

12

def cancel(self) -> bool:

13

"""

14

Attempt to cancel the process execution.

15

16

Returns:

17

True if cancellation was successful, False otherwise

18

"""

19

20

def result(self, timeout=None):

21

"""

22

Get the result of the process execution.

23

24

Parameters:

25

- timeout: Maximum time to wait for result in seconds

26

27

Returns:

28

The result of the executed function

29

30

Raises:

31

- TimeoutError: If timeout is exceeded

32

- ProcessExpired: If the process died unexpectedly

33

- CancelledError: If the execution was cancelled

34

- Exception: Any exception raised by the executed function

35

"""

36

37

def exception(self, timeout=None):

38

"""

39

Get the exception raised by the process execution.

40

41

Parameters:

42

- timeout: Maximum time to wait for completion in seconds

43

44

Returns:

45

Exception raised by the function, or None if successful

46

"""

47

48

def add_done_callback(self, fn):

49

"""

50

Add a callback to be called when the process completes.

51

52

Parameters:

53

- fn: Callback function that takes the Future as argument

54

"""

55

```

56

57

#### ProcessFuture Usage Examples

58

59

```python

60

from pebble.concurrent import process

61

from pebble import ProcessExpired

62

import time

63

64

@process(timeout=5.0)

65

def cpu_intensive_task(n):

66

total = 0

67

for i in range(n):

68

total += i ** 2

69

return total

70

71

@process

72

def potentially_failing_task(should_fail=False):

73

if should_fail:

74

raise ValueError("Task was configured to fail")

75

return "Success"

76

77

@process

78

def long_running_task():

79

time.sleep(10)

80

return "Finally done"

81

82

# Using ProcessFuture features

83

def test_process_future():

84

# Schedule tasks

85

future1 = cpu_intensive_task(1000000)

86

future2 = potentially_failing_task(should_fail=True)

87

future3 = long_running_task()

88

89

# Add callbacks

90

def completion_callback(future):

91

try:

92

result = future.result(timeout=0) # Don't wait

93

print(f"Task completed successfully: {result}")

94

except Exception as e:

95

print(f"Task failed: {type(e).__name__}: {e}")

96

97

future1.add_done_callback(completion_callback)

98

future2.add_done_callback(completion_callback)

99

future3.add_done_callback(completion_callback)

100

101

# Handle different scenarios

102

try:

103

# This should succeed

104

result1 = future1.result(timeout=10)

105

print(f"CPU task result: {result1}")

106

except TimeoutError:

107

print("CPU task timed out")

108

except ProcessExpired as e:

109

print(f"CPU task process died: {e}")

110

111

try:

112

# This should raise ValueError

113

result2 = future2.result(timeout=5)

114

print(f"Failing task result: {result2}")

115

except ValueError as e:

116

print(f"Expected failure: {e}")

117

except ProcessExpired as e:

118

print(f"Process died: {e}")

119

120

# Try to cancel long-running task

121

print(f"Attempting to cancel long-running task...")

122

cancelled = future3.cancel()

123

print(f"Cancellation {'successful' if cancelled else 'failed'}")

124

125

if not cancelled:

126

try:

127

result3 = future3.result(timeout=2)

128

print(f"Long task result: {result3}")

129

except TimeoutError:

130

print("Long task timed out as expected")

131

132

test_process_future()

133

```

134

135

### MapFuture and ProcessMapFuture

136

137

Future objects specifically designed for handling map operations in thread and process pools, with iteration capabilities and bulk result handling.

138

139

```python { .api }

140

class MapFuture(concurrent.futures.Future):

141

def __init__(self, futures, timeout=None):

142

"""

143

Future for thread pool map operations.

144

145

Parameters:

146

- futures: List of individual Future objects

147

- timeout: Overall timeout for the map operation

148

"""

149

150

def __iter__(self):

151

"""

152

Iterate over results as they become available.

153

154

Yields:

155

Results in the order they were submitted

156

"""

157

158

def cancel(self) -> bool:

159

"""Cancel all underlying futures."""

160

161

def result(self, timeout=None):

162

"""Get all results as a list."""

163

164

class ProcessMapFuture(concurrent.futures.Future):

165

def __init__(self, futures, timeout=None):

166

"""

167

Future for process pool map operations.

168

169

Parameters:

170

- futures: List of individual ProcessFuture objects

171

- timeout: Overall timeout for the map operation

172

"""

173

174

def __iter__(self):

175

"""

176

Iterate over results as they become available.

177

178

Yields:

179

Results in the order they were submitted

180

"""

181

182

def cancel(self) -> bool:

183

"""Cancel all underlying futures."""

184

185

def result(self, timeout=None):

186

"""Get all results as a list."""

187

```

188

189

#### Map Future Usage Examples

190

191

```python

192

from pebble import ThreadPool, ProcessPool

193

import time

194

import math

195

196

def io_bound_task(delay):

197

time.sleep(delay)

198

return f"IO task completed after {delay}s"

199

200

def cpu_bound_task(n):

201

return sum(math.sin(i) for i in range(n))

202

203

# Thread pool map operations

204

def test_map_futures():

205

# ThreadPool map

206

with ThreadPool(max_workers=4) as thread_pool:

207

delays = [0.5, 1.0, 1.5, 2.0, 0.3]

208

209

# Get MapFuture

210

map_future = thread_pool.map(io_bound_task, delays, timeout=10)

211

212

print("Thread pool map results:")

213

214

# Iterate over results as they complete

215

for i, result in enumerate(map_future):

216

print(f" Task {i}: {result}")

217

218

# Alternative: get all results at once

219

# all_results = list(map_future)

220

# print(f"All results: {all_results}")

221

222

# ProcessPool map

223

with ProcessPool(max_workers=3) as process_pool:

224

work_sizes = [10000, 20000, 15000, 25000]

225

226

# Get ProcessMapFuture

227

process_map_future = process_pool.map(

228

cpu_bound_task,

229

work_sizes,

230

chunksize=2,

231

timeout=30

232

)

233

234

print("\nProcess pool map results:")

235

236

try:

237

for i, result in enumerate(process_map_future):

238

print(f" CPU task {i} (size {work_sizes[i]}): {result:.2f}")

239

except TimeoutError:

240

print(" Map operation timed out")

241

except Exception as e:

242

print(f" Map operation failed: {e}")

243

244

test_map_futures()

245

```

246

247

### ProcessExpired Exception

248

249

Exception raised when a worker process dies unexpectedly, providing detailed information about the process failure.

250

251

```python { .api }

252

class ProcessExpired(OSError):

253

def __init__(self, msg, code=0, pid=None):

254

"""

255

Exception for unexpected process termination.

256

257

Parameters:

258

- msg: Error message describing the failure

259

- code: Process exit code

260

- pid: Process ID of the failed process

261

"""

262

super().__init__(msg)

263

self.exitcode = code

264

self.pid = pid

265

266

def __str__(self):

267

"""String representation including exit code and PID."""

268

```

269

270

#### ProcessExpired Handling Examples

271

272

```python

273

from pebble.concurrent import process

274

from pebble import ProcessExpired, ProcessPool

275

import os

276

import signal

277

import time

278

279

@process

280

def crashing_task(crash_type="exit"):

281

if crash_type == "exit":

282

# Clean exit with code

283

exit(42)

284

elif crash_type == "abort":

285

# Abnormal termination

286

os.abort()

287

elif crash_type == "segfault":

288

# Simulate segmentation fault (platform dependent)

289

import ctypes

290

ctypes.string_at(0)

291

elif crash_type == "signal":

292

# Kill self with signal

293

os.kill(os.getpid(), signal.SIGKILL)

294

else:

295

raise ValueError("Invalid crash type")

296

297

@process(timeout=2.0)

298

def timeout_task():

299

time.sleep(10) # Will be killed by timeout

300

return "Should never reach here"

301

302

def test_process_expired():

303

crash_types = ["exit", "abort", "signal"]

304

305

for crash_type in crash_types:

306

print(f"\nTesting {crash_type} crash:")

307

308

future = crashing_task(crash_type)

309

310

try:

311

result = future.result(timeout=5)

312

print(f" Unexpected success: {result}")

313

except ProcessExpired as e:

314

print(f" Process expired: {e}")

315

print(f" Exit code: {e.exitcode}")

316

print(f" Process PID: {e.pid}")

317

print(f" Exception args: {e.args}")

318

except Exception as e:

319

print(f" Other exception: {type(e).__name__}: {e}")

320

321

# Test timeout-induced process expiration

322

print(f"\nTesting timeout:")

323

future = timeout_task()

324

325

try:

326

result = future.result()

327

except TimeoutError:

328

print(" Task timed out as expected")

329

except ProcessExpired as e:

330

print(f" Process expired due to timeout: {e}")

331

except Exception as e:

332

print(f" Unexpected exception: {type(e).__name__}: {e}")

333

334

# Pool-level ProcessExpired handling

335

def test_pool_process_expired():

336

@process

337

def unstable_task(task_id, should_crash=False):

338

if should_crash:

339

if task_id % 2 == 0:

340

os._exit(1) # Hard exit

341

else:

342

raise RuntimeError(f"Task {task_id} crashed")

343

return f"Task {task_id} succeeded"

344

345

with ProcessPool(max_workers=3) as pool:

346

# Submit mix of stable and unstable tasks

347

futures = []

348

for i in range(10):

349

should_crash = i % 3 == 0 # Every third task crashes

350

future = pool.schedule(unstable_task, args=(i, should_crash))

351

futures.append((i, future))

352

353

print("Pool task results:")

354

for task_id, future in futures:

355

try:

356

result = future.result(timeout=5)

357

print(f" Task {task_id}: {result}")

358

except ProcessExpired as e:

359

print(f" Task {task_id}: Process died (PID: {e.pid}, code: {e.exitcode})")

360

except RuntimeError as e:

361

print(f" Task {task_id}: Runtime error: {e}")

362

except Exception as e:

363

print(f" Task {task_id}: Unexpected error: {type(e).__name__}: {e}")

364

365

test_process_expired()

366

test_pool_process_expired()

367

```

368

369

### Advanced Error Handling Patterns

370

371

Comprehensive error handling strategies using Pebble's future types and exceptions:

372

373

```python

374

from pebble import ProcessPool, ThreadPool, ProcessExpired

375

from pebble.concurrent import process, thread

376

import time

377

import random

378

import logging

379

380

# Setup logging for error tracking

381

logging.basicConfig(level=logging.INFO)

382

logger = logging.getLogger(__name__)

383

384

class TaskManager:

385

def __init__(self):

386

self.successful_tasks = 0

387

self.failed_tasks = 0

388

self.expired_processes = 0

389

self.timeout_tasks = 0

390

391

def execute_with_retry(self, task_func, max_retries=3, *args, **kwargs):

392

"""Execute a task with retry logic for different failure types"""

393

394

for attempt in range(max_retries + 1):

395

try:

396

# Create future for this attempt

397

if hasattr(task_func, '__wrapped__'): # It's a decorated function

398

future = task_func(*args, **kwargs)

399

else:

400

# Use process decorator

401

@process(timeout=10.0)

402

def wrapped_task():

403

return task_func(*args, **kwargs)

404

future = wrapped_task()

405

406

# Wait for result

407

result = future.result(timeout=15.0)

408

409

self.successful_tasks += 1

410

logger.info(f"Task succeeded on attempt {attempt + 1}")

411

return result

412

413

except ProcessExpired as e:

414

self.expired_processes += 1

415

logger.warning(f"Process expired on attempt {attempt + 1}: {e}")

416

417

if attempt == max_retries:

418

logger.error(f"Task failed after {max_retries + 1} attempts (process expiration)")

419

raise

420

421

# Wait before retry

422

time.sleep(0.5 * (attempt + 1))

423

424

except TimeoutError:

425

self.timeout_tasks += 1

426

logger.warning(f"Task timed out on attempt {attempt + 1}")

427

428

if attempt == max_retries:

429

logger.error(f"Task failed after {max_retries + 1} attempts (timeout)")

430

raise

431

432

# Wait before retry

433

time.sleep(1.0 * (attempt + 1))

434

435

except Exception as e:

436

self.failed_tasks += 1

437

logger.error(f"Task failed with exception on attempt {attempt + 1}: {e}")

438

439

# Don't retry for regular exceptions

440

raise

441

442

def get_stats(self):

443

return {

444

'successful': self.successful_tasks,

445

'failed': self.failed_tasks,

446

'expired': self.expired_processes,

447

'timeout': self.timeout_tasks

448

}

449

450

# Unreliable tasks for testing

451

def unreliable_task(task_id, failure_rate=0.3):

452

"""Task that randomly fails in different ways"""

453

454

failure_type = random.choice(['success', 'crash', 'timeout', 'exception'])

455

456

if random.random() < failure_rate:

457

if failure_type == 'crash':

458

import os

459

os._exit(1)

460

elif failure_type == 'timeout':

461

time.sleep(20) # Will cause timeout

462

elif failure_type == 'exception':

463

raise ValueError(f"Task {task_id} random failure")

464

465

# Simulate work

466

time.sleep(random.uniform(0.1, 1.0))

467

return f"Task {task_id} completed successfully"

468

469

# Test comprehensive error handling

470

def test_comprehensive_error_handling():

471

manager = TaskManager()

472

473

# Test individual task retry

474

print("Testing individual task retry:")

475

try:

476

result = manager.execute_with_retry(unreliable_task, 3, "test-task", 0.7)

477

print(f"Result: {result}")

478

except Exception as e:

479

print(f"Final failure: {type(e).__name__}: {e}")

480

481

# Test batch processing with error handling

482

print("\nTesting batch processing:")

483

484

with ProcessPool(max_workers=4) as pool:

485

futures = []

486

487

# Submit batch of unreliable tasks

488

for i in range(20):

489

future = pool.schedule(unreliable_task, args=(i, 0.4))

490

futures.append((i, future))

491

492

# Process results with different error handling strategies

493

results = {}

494

errors = {}

495

496

for task_id, future in futures:

497

try:

498

# Use shorter timeout for individual tasks

499

result = future.result(timeout=5.0)

500

results[task_id] = result

501

manager.successful_tasks += 1

502

503

except ProcessExpired as e:

504

error_msg = f"Process died (PID: {e.pid}, code: {e.exitcode})"

505

errors[task_id] = error_msg

506

manager.expired_processes += 1

507

logger.warning(f"Task {task_id}: {error_msg}")

508

509

except TimeoutError:

510

error_msg = "Task timed out"

511

errors[task_id] = error_msg

512

manager.timeout_tasks += 1

513

logger.warning(f"Task {task_id}: {error_msg}")

514

515

except Exception as e:

516

error_msg = f"{type(e).__name__}: {e}"

517

errors[task_id] = error_msg

518

manager.failed_tasks += 1

519

logger.error(f"Task {task_id}: {error_msg}")

520

521

# Print summary

522

stats = manager.get_stats()

523

total_tasks = sum(stats.values())

524

525

print(f"\nExecution Summary:")

526

print(f" Total tasks: {total_tasks}")

527

print(f" Successful: {stats['successful']} ({stats['successful']/total_tasks*100:.1f}%)")

528

print(f" Failed: {stats['failed']} ({stats['failed']/total_tasks*100:.1f}%)")

529

print(f" Process expired: {stats['expired']} ({stats['expired']/total_tasks*100:.1f}%)")

530

print(f" Timeout: {stats['timeout']} ({stats['timeout']/total_tasks*100:.1f}%)")

531

532

print(f"\nSuccessful results: {len(results)}")

533

print(f"Error conditions: {len(errors)}")

534

535

# Advanced callback and monitoring patterns

536

def test_advanced_monitoring():

537

"""Test advanced monitoring using callbacks and custom Future handling"""

538

539

completed_tasks = []

540

failed_tasks = []

541

542

def success_callback(future):

543

try:

544

result = future.result(timeout=0)

545

completed_tasks.append(result)

546

print(f"✓ Task completed: {result}")

547

except:

548

pass # Not completed yet or failed

549

550

def failure_callback(future):

551

try:

552

future.result(timeout=0)

553

except Exception as e:

554

failed_tasks.append(str(e))

555

print(f"✗ Task failed: {type(e).__name__}: {e}")

556

557

@process(timeout=3.0)

558

def monitored_task(task_id, duration):

559

time.sleep(duration)

560

if duration > 2.5: # Will timeout

561

time.sleep(10)

562

return f"Monitored task {task_id}"

563

564

print("Testing advanced monitoring:")

565

566

# Create tasks with different durations

567

durations = [0.5, 1.0, 1.5, 2.0, 3.0] # Last one will timeout

568

futures = []

569

570

for i, duration in enumerate(durations):

571

future = monitored_task(i, duration)

572

573

# Add callbacks

574

future.add_done_callback(success_callback)

575

future.add_done_callback(failure_callback)

576

577

futures.append(future)

578

579

# Wait for all to complete

580

time.sleep(5)

581

582

print(f"\nMonitoring results:")

583

print(f" Completed: {len(completed_tasks)}")

584

print(f" Failed: {len(failed_tasks)}")

585

586

# Run comprehensive tests

587

test_comprehensive_error_handling()

588

test_advanced_monitoring()

589

```