or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asynchronous-decorators.mdconcurrent-decorators.mdfuture-types-exceptions.mdindex.mdprocess-pools.mdsynchronization-utilities.mdthread-pools.md

process-pools.mddocs/

0

# Process Pools

1

2

Managed pools of worker processes for executing CPU-intensive tasks. Process pools bypass Python's Global Interpreter Lock (GIL) to provide true parallelism with advanced features like timeouts, automatic worker restart, and comprehensive error handling with process isolation.

3

4

## Capabilities

5

6

### ProcessPool Class

7

8

A managed pool of worker processes that can execute multiple tasks concurrently with true parallelism, automatic process lifecycle management, and advanced error handling.

9

10

```python { .api }

11

class ProcessPool:

12

def __init__(

13

self,

14

max_workers: int = multiprocessing.cpu_count(),

15

max_tasks: int = 0,

16

initializer: Callable = None,

17

initargs: list = (),

18

context: multiprocessing.context.BaseContext = multiprocessing

19

):

20

"""

21

Create a process pool for CPU-intensive concurrent task execution.

22

23

Parameters:

24

- max_workers: Maximum number of worker processes (defaults to CPU count)

25

- max_tasks: Maximum tasks per worker before restart (0 = no limit)

26

- initializer: Function called when each worker process starts

27

- initargs: Arguments passed to initializer function

28

- context: Multiprocessing context (spawn, fork, forkserver)

29

"""

30

```

31

32

#### Basic Usage

33

34

```python

35

from pebble import ProcessPool

36

import time

37

38

# Create pool with default settings

39

pool = ProcessPool()

40

41

# Create pool with custom configuration

42

pool = ProcessPool(max_workers=4, max_tasks=50)

43

44

def cpu_intensive_task(n, multiplier=1):

45

# CPU-intensive computation that benefits from true parallelism

46

total = 0

47

for i in range(n):

48

total += (i ** 2) * multiplier

49

return total

50

51

# Schedule tasks

52

future1 = pool.schedule(cpu_intensive_task, args=(100000,), kwargs={"multiplier": 2})

53

future2 = pool.schedule(cpu_intensive_task, args=(200000,))

54

55

# Get results

56

result1 = future1.result()

57

result2 = future2.result()

58

59

print(f"Results: {result1}, {result2}")

60

61

# Always clean up

62

pool.close()

63

pool.join()

64

```

65

66

### Task Scheduling with Timeouts

67

68

Schedule individual tasks with timeout protection to prevent runaway processes:

69

70

```python { .api }

71

def schedule(

72

self,

73

function: Callable,

74

args: list = (),

75

kwargs: dict = {},

76

timeout: float = None

77

) -> ProcessFuture:

78

"""

79

Schedule a function for execution in the process pool.

80

81

Parameters:

82

- function: The function to execute

83

- args: Positional arguments to pass to function

84

- kwargs: Keyword arguments to pass to function

85

- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)

86

87

Returns:

88

ProcessFuture object for retrieving the result

89

"""

90

91

def submit(

92

self,

93

function: Callable,

94

timeout: Optional[float],

95

/,

96

*args,

97

**kwargs

98

) -> ProcessFuture:

99

"""

100

Submit a function for execution (compatibility with concurrent.futures).

101

102

Parameters:

103

- function: The function to execute

104

- timeout: Maximum execution time in seconds (positional-only parameter)

105

- args: Positional arguments to pass to function

106

- kwargs: Keyword arguments to pass to function

107

108

Returns:

109

ProcessFuture object for retrieving the result

110

"""

111

```

112

113

#### Scheduling Examples

114

115

```python

116

from pebble import ProcessPool, ProcessExpired

117

import time

118

import math

119

120

def prime_factorization(n):

121

"""CPU-intensive task: find prime factors"""

122

factors = []

123

d = 2

124

while d * d <= n:

125

while n % d == 0:

126

factors.append(d)

127

n //= d

128

d += 1

129

if n > 1:

130

factors.append(n)

131

return factors

132

133

def monte_carlo_pi(iterations):

134

"""CPU-intensive task: estimate Pi using Monte Carlo method"""

135

import random

136

inside_circle = 0

137

for _ in range(iterations):

138

x = random.random()

139

y = random.random()

140

if x*x + y*y <= 1:

141

inside_circle += 1

142

return 4.0 * inside_circle / iterations

143

144

def potentially_slow_task(delay):

145

"""Task that might run too long"""

146

time.sleep(delay)

147

return f"Completed after {delay} seconds"

148

149

# CPU-intensive work with timeouts

150

with ProcessPool(max_workers=4) as pool:

151

# Schedule CPU-intensive tasks

152

numbers = [982451653, 982451654, 982451655, 982451656]

153

factor_futures = []

154

155

for num in numbers:

156

future = pool.schedule(

157

prime_factorization,

158

args=(num,),

159

timeout=30.0 # 30 second timeout

160

)

161

factor_futures.append(future)

162

163

# Monte Carlo Pi estimation

164

pi_futures = []

165

for iterations in [1000000, 2000000, 3000000]:

166

future = pool.schedule(

167

monte_carlo_pi,

168

args=(iterations,),

169

timeout=60.0 # 60 second timeout

170

)

171

pi_futures.append(future)

172

173

# Schedule tasks that might timeout

174

timeout_futures = [

175

pool.schedule(potentially_slow_task, args=(1,), timeout=5.0), # Should complete

176

pool.schedule(potentially_slow_task, args=(10,), timeout=5.0) # Should timeout

177

]

178

179

# Collect results with error handling

180

print("Prime factorizations:")

181

for i, future in enumerate(factor_futures):

182

try:

183

result = future.result()

184

print(f" {numbers[i]} = {' × '.join(map(str, result))}")

185

except TimeoutError:

186

print(f" {numbers[i]} = TIMEOUT")

187

except Exception as e:

188

print(f" {numbers[i]} = ERROR: {e}")

189

190

print("\nPi estimations:")

191

for i, future in enumerate(pi_futures):

192

try:

193

pi_estimate = future.result()

194

iterations = [1000000, 2000000, 3000000][i]

195

error = abs(pi_estimate - math.pi)

196

print(f" {iterations:,} iterations: π ≈ {pi_estimate:.6f} (error: {error:.6f})")

197

except Exception as e:

198

print(f" ERROR: {e}")

199

200

print("\nTimeout examples:")

201

for i, future in enumerate(timeout_futures):

202

try:

203

result = future.result()

204

print(f" Task {i+1}: {result}")

205

except TimeoutError:

206

print(f" Task {i+1}: TIMEOUT")

207

except ProcessExpired as e:

208

print(f" Task {i+1}: PROCESS DIED: {e}")

209

```

210

211

### Bulk Operations with Map

212

213

Execute a function across multiple inputs efficiently using process pools:

214

215

```python { .api }

216

def map(

217

self,

218

function: Callable,

219

*iterables,

220

chunksize: int = None,

221

timeout: float = None

222

) -> ProcessMapFuture:

223

"""

224

Apply function to every item of iterables in parallel using processes.

225

226

Parameters:

227

- function: Function to apply to each item

228

- iterables: One or more iterables to process

229

- chunksize: Number of items per chunk sent to each process

230

- timeout: Maximum time to wait for all results

231

232

Returns:

233

ProcessMapFuture object that yields results as they become available

234

"""

235

```

236

237

#### Map Usage Examples

238

239

```python

240

from pebble import ProcessPool

241

import math

242

import time

243

244

def cpu_bound_function(x):

245

"""Simulate CPU-intensive work"""

246

result = 0

247

for i in range(x * 1000):

248

result += math.sin(i) * math.cos(i)

249

return result

250

251

def data_processing_pipeline(data_chunk):

252

"""Process a chunk of data"""

253

processed = []

254

for item in data_chunk:

255

# Simulate complex processing

256

processed_item = {

257

'original': item,

258

'squared': item ** 2,

259

'sqrt': math.sqrt(abs(item)),

260

'factorial': math.factorial(min(abs(item), 10)) # Limit to prevent huge numbers

261

}

262

processed.append(processed_item)

263

return processed

264

265

def matrix_operation(matrix_row):

266

"""Perform operations on matrix row"""

267

return [x ** 2 + math.sin(x) for x in matrix_row]

268

269

# Efficient parallel processing with map

270

with ProcessPool(max_workers=6) as pool:

271

# Process large dataset

272

large_dataset = list(range(1, 101))

273

274

print("Processing large dataset...")

275

start_time = time.time()

276

277

# Use map with optimal chunk size

278

results = pool.map(

279

cpu_bound_function,

280

large_dataset,

281

chunksize=10, # Process 10 items per chunk

282

timeout=120 # 2 minute timeout for entire operation

283

)

284

285

# Convert to list to get all results

286

processed_results = list(results)

287

end_time = time.time()

288

289

print(f"Processed {len(processed_results)} items in {end_time - start_time:.2f} seconds")

290

print(f"Average result: {sum(processed_results) / len(processed_results):.2f}")

291

292

# Data processing pipeline

293

raw_data = [list(range(i*10, (i+1)*10)) for i in range(20)] # 20 chunks of 10 items each

294

295

print("\nRunning data processing pipeline...")

296

pipeline_results = pool.map(

297

data_processing_pipeline,

298

raw_data,

299

chunksize=2, # 2 data chunks per process

300

timeout=60

301

)

302

303

# Flatten results

304

all_processed = []

305

for chunk_result in pipeline_results:

306

all_processed.extend(chunk_result)

307

308

print(f"Processed {len(all_processed)} data items through pipeline")

309

310

# Matrix operations

311

matrix = [[i+j for j in range(100)] for i in range(50)] # 50x100 matrix

312

313

print("\nPerforming matrix operations...")

314

matrix_results = pool.map(

315

matrix_operation,

316

matrix,

317

chunksize=5, # 5 rows per process

318

timeout=30

319

)

320

321

processed_matrix = list(matrix_results)

322

print(f"Processed matrix with {len(processed_matrix)} rows")

323

```

324

325

### Multiprocessing Context Configuration

326

327

Configure the multiprocessing context for different process creation methods:

328

329

```python

330

import multiprocessing

331

from pebble import ProcessPool

332

333

def worker_task(data, worker_id=None):

334

import os

335

return {

336

'data': data,

337

'worker_pid': os.getpid(),

338

'worker_id': worker_id

339

}

340

341

# Different multiprocessing contexts

342

def context_examples():

343

# Spawn context (creates fresh Python interpreter)

344

spawn_ctx = multiprocessing.get_context('spawn')

345

spawn_pool = ProcessPool(max_workers=2, context=spawn_ctx)

346

347

# Fork context (copies current process) - Unix only

348

try:

349

fork_ctx = multiprocessing.get_context('fork')

350

fork_pool = ProcessPool(max_workers=2, context=fork_ctx)

351

except RuntimeError:

352

print("Fork context not available on this platform")

353

fork_pool = None

354

355

# Forkserver context (hybrid approach) - Unix only

356

try:

357

forkserver_ctx = multiprocessing.get_context('forkserver')

358

forkserver_pool = ProcessPool(max_workers=2, context=forkserver_ctx)

359

except RuntimeError:

360

print("Forkserver context not available on this platform")

361

forkserver_pool = None

362

363

# Test different contexts

364

test_data = list(range(10))

365

366

print("Testing spawn context:")

367

with spawn_pool:

368

spawn_results = [

369

spawn_pool.schedule(worker_task, args=(data, f"spawn-{data}"))

370

for data in test_data

371

]

372

for future in spawn_results:

373

print(f" {future.result()}")

374

375

if fork_pool:

376

print("\nTesting fork context:")

377

with fork_pool:

378

fork_results = [

379

fork_pool.schedule(worker_task, args=(data, f"fork-{data}"))

380

for data in test_data

381

]

382

for future in fork_results:

383

print(f" {future.result()}")

384

385

if forkserver_pool:

386

print("\nTesting forkserver context:")

387

with forkserver_pool:

388

forkserver_results = [

389

forkserver_pool.schedule(worker_task, args=(data, f"forkserver-{data}"))

390

for data in test_data

391

]

392

for future in forkserver_results:

393

print(f" {future.result()}")

394

395

# Run context examples

396

context_examples()

397

```

398

399

### Process Initialization and Cleanup

400

401

Initialize worker processes with shared resources and handle cleanup:

402

403

```python

404

from pebble import ProcessPool

405

import multiprocessing

406

import logging

407

import os

408

409

# Global state for worker processes

410

worker_state = {}

411

412

def init_worker_process(config, log_level):

413

"""Initialize each worker process"""

414

global worker_state

415

416

# Setup logging for this process

417

logging.basicConfig(

418

level=log_level,

419

format=f'PID-{os.getpid()}: %(levelname)s - %(message)s'

420

)

421

logger = logging.getLogger(__name__)

422

423

# Initialize worker state

424

worker_state = {

425

'config': config,

426

'logger': logger,

427

'task_count': 0,

428

'process_id': os.getpid()

429

}

430

431

logger.info(f"Worker process {os.getpid()} initialized with config: {config}")

432

433

def worker_task_with_state(task_data):

434

"""Task that uses initialized worker state"""

435

global worker_state

436

437

worker_state['task_count'] += 1

438

logger = worker_state['logger']

439

440

logger.info(f"Processing task {worker_state['task_count']}: {task_data}")

441

442

# Simulate work using config

443

multiplier = worker_state['config'].get('multiplier', 1)

444

result = task_data * multiplier

445

446

# Simulate some processing time

447

import time

448

time.sleep(0.1)

449

450

logger.info(f"Task completed. Result: {result}")

451

452

return {

453

'input': task_data,

454

'result': result,

455

'task_number': worker_state['task_count'],

456

'process_id': worker_state['process_id']

457

}

458

459

# Create pool with worker initialization

460

config = {'multiplier': 3, 'timeout': 30}

461

462

pool = ProcessPool(

463

max_workers=3,

464

max_tasks=5, # Restart workers every 5 tasks

465

initializer=init_worker_process,

466

initargs=(config, logging.INFO)

467

)

468

469

try:

470

# Schedule multiple tasks to see worker behavior

471

tasks = list(range(1, 16)) # 15 tasks

472

futures = []

473

474

for task in tasks:

475

future = pool.schedule(worker_task_with_state, args=(task,))

476

futures.append(future)

477

478

# Collect results

479

results = []

480

for future in futures:

481

try:

482

result = future.result(timeout=10)

483

results.append(result)

484

except Exception as e:

485

print(f"Task failed: {e}")

486

487

# Print results showing worker process recycling

488

print(f"\nProcessed {len(results)} tasks:")

489

for result in results:

490

print(f" Task {result['task_number']} in PID {result['process_id']}: "

491

f"{result['input']} -> {result['result']}")

492

493

# Group by process ID to see worker recycling

494

by_process = {}

495

for result in results:

496

pid = result['process_id']

497

if pid not in by_process:

498

by_process[pid] = []

499

by_process[pid].append(result['task_number'])

500

501

print(f"\nTasks by worker process:")

502

for pid, task_numbers in by_process.items():

503

print(f" PID {pid}: tasks {task_numbers}")

504

505

finally:

506

pool.close()

507

pool.join()

508

```

509

510

### Error Handling and Recovery

511

512

Handle various error conditions specific to process-based execution:

513

514

```python

515

from pebble import ProcessPool, ProcessExpired

516

import signal

517

import time

518

import os

519

520

def normal_task(x):

521

return x * 2

522

523

def crashing_task():

524

# This will cause the process to crash

525

os._exit(1) # Immediate process termination

526

527

def hanging_task():

528

# This task will hang indefinitely

529

while True:

530

time.sleep(1)

531

532

def memory_intensive_task(size):

533

# This might run out of memory

534

big_list = [0] * size

535

return len(big_list)

536

537

def signal_task():

538

# This task will receive a signal

539

import signal

540

os.kill(os.getpid(), signal.SIGTERM)

541

542

# Comprehensive error handling

543

with ProcessPool(max_workers=4) as pool:

544

# Schedule various types of tasks

545

futures = {

546

'normal': pool.schedule(normal_task, args=(42,)),

547

'crashing': pool.schedule(crashing_task),

548

'hanging': pool.schedule(hanging_task),

549

'memory': pool.schedule(memory_intensive_task, args=(10**9,)), # Huge allocation

550

'timeout': pool.schedule(hanging_task, timeout=2.0), # Will timeout

551

'signal': pool.schedule(signal_task)

552

}

553

554

# Handle each type of error

555

for task_name, future in futures.items():

556

try:

557

if task_name == 'hanging':

558

# Don't wait for hanging task

559

result = future.result(timeout=1.0)

560

else:

561

result = future.result(timeout=10.0)

562

print(f"{task_name}: SUCCESS - {result}")

563

564

except TimeoutError:

565

print(f"{task_name}: TIMEOUT - Task exceeded time limit")

566

567

except ProcessExpired as e:

568

print(f"{task_name}: PROCESS DIED - PID: {e.pid}, Exit code: {e.exitcode}")

569

570

except MemoryError:

571

print(f"{task_name}: MEMORY ERROR - Not enough memory")

572

573

except OSError as e:

574

print(f"{task_name}: OS ERROR - {e}")

575

576

except Exception as e:

577

print(f"{task_name}: UNEXPECTED ERROR - {type(e).__name__}: {e}")

578

579

print("\nAll error handling completed")

580

```

581

582

### Advanced Pool Configuration

583

584

Configure pools for specific performance and reliability requirements:

585

586

```python

587

from pebble import ProcessPool

588

import multiprocessing

589

import time

590

591

# High-performance pool for CPU-intensive work

592

def create_high_performance_pool():

593

return ProcessPool(

594

max_workers=multiprocessing.cpu_count() * 2, # Oversubscribe for mixed workloads

595

max_tasks=0, # No worker recycling for maximum performance

596

context=multiprocessing.get_context('spawn') # Clean process creation

597

)

598

599

# Reliable pool with frequent worker recycling

600

def create_reliable_pool():

601

return ProcessPool(

602

max_workers=multiprocessing.cpu_count(),

603

max_tasks=10, # Recycle workers frequently to prevent memory leaks

604

context=multiprocessing.get_context('spawn')

605

)

606

607

# Memory-conscious pool

608

def create_memory_conscious_pool():

609

return ProcessPool(

610

max_workers=max(1, multiprocessing.cpu_count() // 2), # Fewer workers

611

max_tasks=5, # Frequent recycling to free memory

612

context=multiprocessing.get_context('spawn')

613

)

614

615

def benchmark_task(iterations):

616

"""Benchmark task for testing pool performance"""

617

import math

618

total = 0

619

for i in range(iterations):

620

total += math.sin(i) * math.cos(i)

621

return total

622

623

# Benchmark different pool configurations

624

configurations = {

625

'high_performance': create_high_performance_pool(),

626

'reliable': create_reliable_pool(),

627

'memory_conscious': create_memory_conscious_pool()

628

}

629

630

for config_name, pool in configurations.items():

631

print(f"\nTesting {config_name} configuration:")

632

633

start_time = time.time()

634

635

with pool:

636

# Submit benchmark tasks

637

futures = [

638

pool.schedule(benchmark_task, args=(100000,))

639

for _ in range(20)

640

]

641

642

# Wait for completion

643

results = [f.result() for f in futures]

644

645

end_time = time.time()

646

647

print(f" Completed {len(results)} tasks in {end_time - start_time:.2f} seconds")

648

print(f" Average result: {sum(results) / len(results):.2f}")

649

```