or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-concurrency.mddebugging.mdgreen-stdlib.mdindex.mdmonkey-patching.mdnetworking.mdresource-pooling.mdsynchronization.mdthread-pools.mdweb-server.md

resource-pooling.mddocs/

0

# Resource Pooling

1

2

Generic resource pooling for managing expensive resources like database connections, file handles, or network connections with automatic lifecycle management and connection reuse.

3

4

## Capabilities

5

6

### Generic Resource Pool

7

8

A flexible pool for managing any type of resource with creation, validation, and cleanup callbacks.

9

10

```python { .api }

11

class Pool:

12

"""

13

Generic resource pool with configurable creation and cleanup.

14

"""

15

16

def __init__(self, min_size=0, max_size=4, order_as_stack=False, create=None):

17

"""

18

Create a resource pool.

19

20

Parameters:

21

- min_size: int, minimum number of resources to pre-populate (default: 0)

22

- max_size: int, maximum number of resources in the pool (default: 4)

23

- order_as_stack: bool, whether to use LIFO ordering (default: False)

24

- create: callable that creates new resources when needed

25

"""

26

27

def get(self):

28

"""

29

Get a resource from the pool, creating one if necessary.

30

31

Returns:

32

Resource from the pool

33

34

Note:

35

Resources must be returned to the pool using put()

36

"""

37

38

def put(self, item):

39

"""

40

Return a resource to the pool.

41

42

Parameters:

43

- item: resource to return to the pool

44

45

Returns:

46

None

47

"""

48

49

def resize(self, new_size):

50

"""

51

Resize the pool to a new maximum size.

52

53

Parameters:

54

- new_size: int, new maximum pool size

55

56

Returns:

57

None

58

"""

59

60

def __len__(self):

61

"""

62

Get the current number of resources in the pool.

63

64

Returns:

65

int: number of resources currently in pool

66

"""

67

68

def item(self):

69

"""

70

Context manager for getting and automatically returning a resource.

71

72

Returns:

73

Context manager that yields a resource from the pool

74

75

Usage:

76

with pool.item() as resource:

77

# use resource

78

pass

79

# resource automatically returned to pool

80

"""

81

82

class TokenPool:

83

"""

84

A pool that gives out unique opaque tokens instead of creating resources.

85

Useful for limiting concurrency without managing actual resources.

86

"""

87

88

def __init__(self, max_size=4):

89

"""

90

Create a token pool.

91

92

Parameters:

93

- max_size: int, maximum number of tokens available

94

"""

95

96

def get(self):

97

"""

98

Get a token from the pool.

99

100

Returns:

101

Opaque token object

102

103

Note:

104

Tokens must be returned using put()

105

"""

106

107

def put(self, token):

108

"""

109

Return a token to the pool.

110

111

Parameters:

112

- token: token to return

113

114

Returns:

115

None

116

"""

117

```

118

119

### Database Connection Pooling

120

121

Specialized pools for managing database connections with connection lifecycle and error handling.

122

123

```python { .api }

124

class BaseConnectionPool:

125

"""

126

Base class for database connection pools.

127

"""

128

129

def __init__(self, db_module, *args, **kwargs):

130

"""

131

Create a database connection pool.

132

133

Parameters:

134

- db_module: database module (e.g., psycopg2, MySQLdb)

135

- *args: arguments for database connection

136

- **kwargs: keyword arguments for database connection

137

"""

138

139

def get(self):

140

"""

141

Get a database connection from the pool.

142

143

Returns:

144

Database connection object

145

"""

146

147

def put(self, conn):

148

"""

149

Return a database connection to the pool.

150

151

Parameters:

152

- conn: database connection to return

153

154

Returns:

155

None

156

"""

157

158

class ConnectionPool(BaseConnectionPool):

159

"""

160

Default database connection pool (alias for TpooledConnectionPool).

161

Uses tpool.Proxy to execute database operations in threads.

162

"""

163

164

class TpooledConnectionPool(BaseConnectionPool):

165

"""

166

Connection pool using tpool.Proxy for database connections.

167

Database operations are executed in a thread pool.

168

"""

169

170

class RawConnectionPool(BaseConnectionPool):

171

"""

172

Connection pool with plain database connections.

173

Database operations run in the main greenthread.

174

"""

175

176

class DatabaseConnector:

177

"""

178

Maintains separate connection pools for different database hosts.

179

"""

180

181

def __init__(self, db_module, *args, **kwargs):

182

"""

183

Create a database connector.

184

185

Parameters:

186

- db_module: database module

187

- *args: default connection arguments

188

- **kwargs: default connection keyword arguments

189

"""

190

191

def get(self, host=None, database=None):

192

"""

193

Get a connection for a specific host/database.

194

195

Parameters:

196

- host: database host (uses default if None)

197

- database: database name (uses default if None)

198

199

Returns:

200

Database connection

201

"""

202

203

def put(self, conn, host=None, database=None):

204

"""

205

Return a connection to the appropriate pool.

206

207

Parameters:

208

- conn: database connection

209

- host: database host

210

- database: database name

211

212

Returns:

213

None

214

"""

215

216

class ConnectTimeout(Exception):

217

"""

218

Exception raised when database connection times out.

219

"""

220

pass

221

```

222

223

### Thread Pool for Blocking Operations

224

225

Thread pool for executing blocking operations without blocking the event loop.

226

227

```python { .api }

228

def execute(method, *args, **kwargs):

229

"""

230

Execute a method in the thread pool, blocking the current greenthread.

231

232

Parameters:

233

- method: callable to execute in thread pool

234

- *args: positional arguments for method

235

- **kwargs: keyword arguments for method

236

237

Returns:

238

Return value of method

239

240

Raises:

241

Any exception raised by method

242

"""

243

244

class Proxy:

245

"""

246

Proxy object that forwards method calls to the native thread pool.

247

"""

248

249

def __init__(self, obj, autowrap=None):

250

"""

251

Create a proxy for an object.

252

253

Parameters:

254

- obj: object to proxy

255

- autowrap: tuple of method names to automatically proxy

256

"""

257

258

def killall():

259

"""

260

Kill all threads in the thread pool.

261

262

Returns:

263

None

264

"""

265

266

def set_num_threads(num_threads):

267

"""

268

Set the number of threads in the thread pool.

269

270

Parameters:

271

- num_threads: int, number of threads to use

272

273

Returns:

274

None

275

"""

276

```

277

278

## Usage Examples

279

280

### Basic Resource Pool

281

282

```python

283

import eventlet

284

from eventlet import pools

285

import random

286

import time

287

288

class DatabaseConnection:

289

"""Mock database connection"""

290

291

def __init__(self, connection_id):

292

self.connection_id = connection_id

293

self.created_at = time.time()

294

self.query_count = 0

295

print(f"Created connection {self.connection_id}")

296

297

def query(self, sql):

298

"""Execute a query"""

299

self.query_count += 1

300

# Simulate query execution time

301

eventlet.sleep(random.uniform(0.1, 0.3))

302

return f"Result for '{sql}' from connection {self.connection_id}"

303

304

def close(self):

305

"""Close connection"""

306

print(f"Closed connection {self.connection_id} (executed {self.query_count} queries)")

307

308

def create_connection():

309

"""Factory function to create new connections"""

310

connection_id = random.randint(1000, 9999)

311

return DatabaseConnection(connection_id)

312

313

def database_worker(worker_id, pool, queries):

314

"""Worker that uses database connections from pool"""

315

print(f"Worker {worker_id} starting")

316

317

for query in queries:

318

# Get connection from pool

319

conn = pool.get()

320

321

try:

322

# Use the connection

323

result = conn.query(query)

324

print(f"Worker {worker_id}: {result}")

325

finally:

326

# Always return connection to pool

327

pool.put(conn)

328

329

# Small delay between queries

330

eventlet.sleep(0.1)

331

332

print(f"Worker {worker_id} finished")

333

334

def basic_pool_example():

335

"""Example of basic resource pool usage"""

336

337

# Create a pool with max 3 connections

338

db_pool = pools.Pool(create=create_connection, max_size=3)

339

340

# Define queries for workers

341

worker_queries = [

342

["SELECT * FROM users", "SELECT * FROM orders"],

343

["SELECT COUNT(*) FROM products", "SELECT * FROM categories"],

344

["UPDATE users SET last_login = NOW()", "SELECT * FROM logs"],

345

["SELECT * FROM settings", "INSERT INTO audit_log VALUES (...)"]

346

]

347

348

print("Starting database workers with connection pool...")

349

350

# Start multiple workers

351

greenthreads = []

352

for i, queries in enumerate(worker_queries):

353

gt = eventlet.spawn(database_worker, i+1, db_pool, queries)

354

greenthreads.append(gt)

355

356

# Wait for all workers to complete

357

for gt in greenthreads:

358

gt.wait()

359

360

print(f"Pool has {len(db_pool)} connections remaining")

361

362

if __name__ == "__main__":

363

basic_pool_example()

364

```

365

366

### Token Pool for Concurrency Control

367

368

```python

369

import eventlet

370

from eventlet import pools

371

import random

372

373

def rate_limited_task(task_id, token_pool, duration):

374

"""Task that requires a token to run (limits concurrency)"""

375

376

print(f"Task {task_id} waiting for token...")

377

378

# Get token (blocks if none available)

379

token = token_pool.get()

380

381

try:

382

print(f"Task {task_id} got token, starting work...")

383

384

# Simulate work

385

eventlet.sleep(duration)

386

387

print(f"Task {task_id} completed work ({duration:.1f}s)")

388

389

finally:

390

# Always return the token

391

token_pool.put(token)

392

print(f"Task {task_id} returned token")

393

394

def token_pool_example():

395

"""Example using token pool to limit concurrency"""

396

397

# Create token pool - max 2 concurrent tasks

398

token_pool = pools.TokenPool(max_size=2)

399

400

# Create tasks with different durations

401

tasks = [

402

(1, 2.0), # Task 1 takes 2 seconds

403

(2, 1.5), # Task 2 takes 1.5 seconds

404

(3, 1.0), # Task 3 takes 1 second

405

(4, 2.5), # Task 4 takes 2.5 seconds

406

(5, 0.5), # Task 5 takes 0.5 seconds

407

]

408

409

print("Starting rate-limited tasks (max 2 concurrent)...")

410

start_time = time.time()

411

412

# Start all tasks

413

greenthreads = []

414

for task_id, duration in tasks:

415

gt = eventlet.spawn(rate_limited_task, task_id, token_pool, duration)

416

greenthreads.append(gt)

417

418

# Wait for completion

419

for gt in greenthreads:

420

gt.wait()

421

422

total_time = time.time() - start_time

423

print(f"All tasks completed in {total_time:.1f} seconds")

424

425

if __name__ == "__main__":

426

token_pool_example()

427

```

428

429

### Database Connection Pool

430

431

```python

432

import eventlet

433

from eventlet import db_pool, tpool

434

import sqlite3

435

import tempfile

436

import os

437

438

def setup_test_database():

439

"""Create a test SQLite database"""

440

fd, db_path = tempfile.mkstemp(suffix='.db')

441

os.close(fd)

442

443

# Create test table and data

444

conn = sqlite3.connect(db_path)

445

cursor = conn.cursor()

446

447

cursor.execute('''

448

CREATE TABLE users (

449

id INTEGER PRIMARY KEY,

450

name TEXT NOT NULL,

451

email TEXT UNIQUE,

452

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

453

)

454

''')

455

456

# Insert test data

457

test_users = [

458

('Alice', 'alice@example.com'),

459

('Bob', 'bob@example.com'),

460

('Charlie', 'charlie@example.com'),

461

('Diana', 'diana@example.com'),

462

('Eve', 'eve@example.com')

463

]

464

465

cursor.executemany('INSERT INTO users (name, email) VALUES (?, ?)', test_users)

466

conn.commit()

467

conn.close()

468

469

return db_path

470

471

def database_worker(worker_id, pool, operations):

472

"""Worker that performs database operations"""

473

print(f"DB Worker {worker_id} starting")

474

475

for operation, params in operations:

476

# Get connection from pool

477

conn = pool.get()

478

479

try:

480

cursor = conn.cursor()

481

482

if operation == 'select':

483

cursor.execute("SELECT * FROM users WHERE name LIKE ?", (f"%{params}%",))

484

results = cursor.fetchall()

485

print(f"Worker {worker_id}: Found {len(results)} users matching '{params}'")

486

487

elif operation == 'count':

488

cursor.execute("SELECT COUNT(*) FROM users")

489

count = cursor.fetchone()[0]

490

print(f"Worker {worker_id}: Total users: {count}")

491

492

elif operation == 'insert':

493

name, email = params

494

cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))

495

conn.commit()

496

print(f"Worker {worker_id}: Inserted user {name}")

497

498

cursor.close()

499

500

except Exception as e:

501

print(f"Worker {worker_id} error: {e}")

502

finally:

503

# Return connection to pool

504

pool.put(conn)

505

506

# Small delay between operations

507

eventlet.sleep(0.1)

508

509

print(f"DB Worker {worker_id} finished")

510

511

def database_pool_example():

512

"""Example using database connection pool"""

513

514

db_path = setup_test_database()

515

516

try:

517

# Create connection pool with SQLite

518

# Using RawConnectionPool for simplicity (SQLite doesn't need threading)

519

pool = db_pool.RawConnectionPool(sqlite3, db_path, check_same_thread=False)

520

521

# Define operations for workers

522

worker_operations = [

523

[('count', None), ('select', 'A')], # Worker 1

524

[('select', 'B'), ('count', None)], # Worker 2

525

[('insert', ('Frank', 'frank@example.com')), ('count', None)], # Worker 3

526

[('select', 'e'), ('insert', ('Grace', 'grace@example.com'))], # Worker 4

527

]

528

529

print("Starting database operations with connection pool...")

530

531

# Start workers

532

greenthreads = []

533

for i, operations in enumerate(worker_operations):

534

gt = eventlet.spawn(database_worker, i+1, pool, operations)

535

greenthreads.append(gt)

536

537

# Wait for completion

538

for gt in greenthreads:

539

gt.wait()

540

541

print("All database operations completed")

542

543

finally:

544

# Clean up

545

os.unlink(db_path)

546

547

if __name__ == "__main__":

548

database_pool_example()

549

```

550

551

### Thread Pool for Blocking Operations

552

553

```python

554

import eventlet

555

from eventlet import tpool

556

import time

557

import os

558

import hashlib

559

560

def cpu_intensive_task(data, iterations=1000000):

561

"""CPU-intensive task that would block the event loop"""

562

563

# This simulates a CPU-bound operation

564

result = hashlib.sha256(data.encode()).hexdigest()

565

566

for i in range(iterations):

567

result = hashlib.sha256(result.encode()).hexdigest()

568

569

return {

570

'data': data,

571

'iterations': iterations,

572

'result': result[:16] # First 16 chars of final hash

573

}

574

575

def blocking_io_task(filename):

576

"""Blocking I/O task"""

577

578

# This simulates a blocking I/O operation

579

with open(filename, 'w') as f:

580

for i in range(10000):

581

f.write(f"Line {i}: Some data here\n")

582

583

# Read it back

584

with open(filename, 'r') as f:

585

lines = f.readlines()

586

587

os.unlink(filename) # Clean up

588

589

return {

590

'filename': filename,

591

'lines_written': 10000,

592

'lines_read': len(lines)

593

}

594

595

def mixed_workload_example():

596

"""Example mixing I/O-bound and CPU-bound tasks"""

597

598

print("Starting mixed workload (I/O + CPU tasks)...")

599

print("I/O tasks run in greenthreads, CPU tasks in thread pool")

600

601

def io_bound_worker(worker_id):

602

"""I/O-bound worker using cooperative I/O"""

603

start_time = time.time()

604

605

# Simulate network I/O

606

eventlet.sleep(random.uniform(0.5, 1.5))

607

608

elapsed = time.time() - start_time

609

return f"I/O Worker {worker_id} completed in {elapsed:.2f}s"

610

611

def cpu_bound_worker(worker_id, data):

612

"""CPU-bound worker using thread pool"""

613

start_time = time.time()

614

615

# Execute CPU-intensive task in thread pool

616

result = tpool.execute(cpu_intensive_task, data, 500000)

617

618

elapsed = time.time() - start_time

619

return f"CPU Worker {worker_id} completed in {elapsed:.2f}s - hash: {result['result']}"

620

621

def blocking_io_worker(worker_id):

622

"""Blocking I/O worker using thread pool"""

623

start_time = time.time()

624

625

filename = f"/tmp/test_file_{worker_id}.txt"

626

627

# Execute blocking I/O in thread pool

628

result = tpool.execute(blocking_io_task, filename)

629

630

elapsed = time.time() - start_time

631

return f"Blocking I/O Worker {worker_id} completed in {elapsed:.2f}s - {result['lines_read']} lines"

632

633

# Start mixed workload

634

greenthreads = []

635

636

# I/O-bound tasks (run in greenthreads)

637

for i in range(3):

638

gt = eventlet.spawn(io_bound_worker, i+1)

639

greenthreads.append(gt)

640

641

# CPU-bound tasks (run in thread pool)

642

for i in range(3):

643

data = f"test_data_{i}"

644

gt = eventlet.spawn(cpu_bound_worker, i+1, data)

645

greenthreads.append(gt)

646

647

# Blocking I/O tasks (run in thread pool)

648

for i in range(2):

649

gt = eventlet.spawn(blocking_io_worker, i+1)

650

greenthreads.append(gt)

651

652

# Collect results

653

print("\nResults:")

654

for gt in greenthreads:

655

result = gt.wait()

656

print(f" {result}")

657

658

def thread_pool_proxy_example():

659

"""Example using tpool.Proxy for automatic thread pool execution"""

660

661

class ExpensiveCalculator:

662

"""Class with expensive operations"""

663

664

def __init__(self, name):

665

self.name = name

666

667

def fibonacci(self, n):

668

"""CPU-intensive Fibonacci calculation"""

669

if n <= 1:

670

return n

671

return self.fibonacci(n-1) + self.fibonacci(n-2)

672

673

def factorial(self, n):

674

"""CPU-intensive factorial calculation"""

675

if n <= 1:

676

return 1

677

return n * self.factorial(n-1)

678

679

def process_data(self, data_size):

680

"""Simulate data processing"""

681

data = list(range(data_size))

682

return sum(x*x for x in data)

683

684

# Create calculator and proxy it to thread pool

685

calculator = ExpensiveCalculator("Calculator1")

686

687

# Wrap with proxy - all method calls go to thread pool

688

proxied_calculator = tpool.Proxy(calculator)

689

690

print("Testing thread pool proxy...")

691

692

def run_calculations(calc_id, proxy_calc):

693

"""Run calculations using proxied calculator"""

694

start_time = time.time()

695

696

results = {

697

'calc_id': calc_id,

698

'fibonacci_30': proxy_calc.fibonacci(30),

699

'factorial_10': proxy_calc.factorial(10),

700

'process_data': proxy_calc.process_data(10000)

701

}

702

703

elapsed = time.time() - start_time

704

results['elapsed'] = elapsed

705

706

return results

707

708

# Run multiple calculations concurrently

709

# Each will execute in the thread pool without blocking other greenthreads

710

greenthreads = []

711

for i in range(3):

712

gt = eventlet.spawn(run_calculations, i+1, proxied_calculator)

713

greenthreads.append(gt)

714

715

print("Calculations running in thread pool...")

716

717

# Collect results

718

for gt in greenthreads:

719

result = gt.wait()

720

print(f"Calculator {result['calc_id']} results ({result['elapsed']:.2f}s):")

721

print(f" Fibonacci(30): {result['fibonacci_30']}")

722

print(f" Factorial(10): {result['factorial_10']}")

723

print(f" Process data: {result['process_data']}")

724

725

if __name__ == "__main__":

726

import random

727

728

print("=== Mixed Workload Example ===")

729

mixed_workload_example()

730

731

print("\n=== Thread Pool Proxy Example ===")

732

thread_pool_proxy_example()

733

734

# Configure thread pool

735

print(f"\nConfiguring thread pool...")

736

tpool.set_num_threads(4) # Use 4 threads

737

print("Thread pool configured with 4 threads")

738

```

739

740

### Advanced Pool Management

741

742

```python

743

import eventlet

744

from eventlet import pools

745

import time

746

import random

747

748

class ManagedResource:

749

"""Resource with lifecycle management"""

750

751

def __init__(self, resource_id):

752

self.resource_id = resource_id

753

self.created_at = time.time()

754

self.last_used = time.time()

755

self.use_count = 0

756

self.is_valid = True

757

print(f"Created resource {self.resource_id}")

758

759

def use(self, operation):

760

"""Use the resource"""

761

if not self.is_valid:

762

raise RuntimeError(f"Resource {self.resource_id} is invalid")

763

764

self.last_used = time.time()

765

self.use_count += 1

766

767

# Simulate work

768

eventlet.sleep(random.uniform(0.1, 0.5))

769

770

# Occasionally invalidate resource (simulate connection loss)

771

if random.random() < 0.1: # 10% chance

772

self.is_valid = False

773

print(f"Resource {self.resource_id} became invalid")

774

775

return f"Operation '{operation}' completed with resource {self.resource_id}"

776

777

def is_expired(self, max_age=30, max_idle=10):

778

"""Check if resource should be expired"""

779

now = time.time()

780

age = now - self.created_at

781

idle = now - self.last_used

782

783

return age > max_age or idle > max_idle or not self.is_valid

784

785

def close(self):

786

"""Clean up resource"""

787

print(f"Closed resource {self.resource_id} (used {self.use_count} times)")

788

789

class ManagedPool:

790

"""Pool with resource validation and cleanup"""

791

792

def __init__(self, create_func, max_size=4, max_age=30, max_idle=10):

793

self.create_func = create_func

794

self.max_size = max_size

795

self.max_age = max_age

796

self.max_idle = max_idle

797

self.pool = pools.Pool(create=self._create_resource, max_size=max_size)

798

self.all_resources = set()

799

800

def _create_resource(self):

801

"""Create and track a new resource"""

802

resource = self.create_func()

803

self.all_resources.add(resource)

804

return resource

805

806

def get(self):

807

"""Get a validated resource from pool"""

808

while True:

809

resource = self.pool.get()

810

811

if not resource.is_expired(self.max_age, self.max_idle):

812

return resource

813

else:

814

# Resource is expired, remove and create new one

815

print(f"Removing expired resource {resource.resource_id}")

816

self.all_resources.discard(resource)

817

resource.close()

818

# Continue loop to get a fresh resource

819

820

def put(self, resource):

821

"""Return resource to pool if still valid"""

822

if resource in self.all_resources and not resource.is_expired():

823

self.pool.put(resource)

824

else:

825

# Resource is expired or invalid

826

self.all_resources.discard(resource)

827

resource.close()

828

829

def cleanup_expired(self):

830

"""Clean up expired resources"""

831

expired = [r for r in self.all_resources if r.is_expired(self.max_age, self.max_idle)]

832

833

for resource in expired:

834

self.all_resources.discard(resource)

835

resource.close()

836

837

print(f"Cleaned up {len(expired)} expired resources")

838

839

def stats(self):

840

"""Get pool statistics"""

841

valid_resources = [r for r in self.all_resources if not r.is_expired()]

842

843

return {

844

'total_resources': len(self.all_resources),

845

'valid_resources': len(valid_resources),

846

'pool_size': len(self.pool),

847

'expired_resources': len(self.all_resources) - len(valid_resources)

848

}

849

850

def create_managed_resource():

851

"""Factory for creating managed resources"""

852

resource_id = random.randint(1000, 9999)

853

return ManagedResource(resource_id)

854

855

def managed_pool_example():

856

"""Example with resource lifecycle management"""

857

858

# Create managed pool

859

managed_pool = ManagedPool(

860

create_func=create_managed_resource,

861

max_size=3,

862

max_age=10, # Resources expire after 10 seconds

863

max_idle=5 # Resources expire after 5 seconds of inactivity

864

)

865

866

def worker_with_validation(worker_id, operations):

867

"""Worker that handles resource validation"""

868

print(f"Managed worker {worker_id} starting")

869

870

for operation in operations:

871

try:

872

# Get validated resource

873

resource = managed_pool.get()

874

875

# Use resource

876

result = resource.use(operation)

877

print(f"Worker {worker_id}: {result}")

878

879

# Return to pool

880

managed_pool.put(resource)

881

882

except Exception as e:

883

print(f"Worker {worker_id} error: {e}")

884

885

# Random delay between operations

886

eventlet.sleep(random.uniform(0.5, 2.0))

887

888

print(f"Managed worker {worker_id} finished")

889

890

# Start workers

891

worker_operations = [

892

['read', 'write', 'update'],

893

['query', 'insert', 'delete'],

894

['backup', 'restore', 'verify'],

895

['sync', 'analyze', 'optimize']

896

]

897

898

greenthreads = []

899

for i, operations in enumerate(worker_operations):

900

gt = eventlet.spawn(worker_with_validation, i+1, operations)

901

greenthreads.append(gt)

902

903

# Monitor pool stats

904

def monitor_pool():

905

"""Monitor pool statistics"""

906

for _ in range(15): # Monitor for 15 seconds

907

eventlet.sleep(1)

908

stats = managed_pool.stats()

909

print(f"Pool stats: {stats}")

910

911

# Periodic cleanup

912

if _ % 5 == 0:

913

managed_pool.cleanup_expired()

914

915

eventlet.spawn(monitor_pool)

916

917

# Wait for workers

918

for gt in greenthreads:

919

gt.wait()

920

921

# Final cleanup

922

managed_pool.cleanup_expired()

923

final_stats = managed_pool.stats()

924

print(f"Final pool stats: {final_stats}")

925

926

if __name__ == "__main__":

927

managed_pool_example()

928

```

929

930

## Pool Configuration Best Practices

931

932

### Connection Pool Sizing

933

934

```python

935

# For database connections

936

db_pool = eventlet.db_pool.ConnectionPool(

937

psycopg2,

938

host='localhost',

939

database='myapp',

940

user='user',

941

password='password',

942

max_size=20, # Match your database connection limit

943

pool_timeout=30 # Timeout waiting for connection

944

)

945

946

# For HTTP client connections

947

http_pool = eventlet.pools.Pool(

948

create=lambda: requests.Session(),

949

max_size=50 # Higher for I/O-bound operations

950

)

951

952

# For limiting concurrent operations

953

rate_limiter = eventlet.pools.TokenPool(max_size=10) # Max 10 concurrent

954

```

955

956

### Error Handling and Validation

957

958

```python

959

def validated_pool_get(pool, validate_func):

960

"""Get resource with validation"""

961

max_retries = 3

962

for attempt in range(max_retries):

963

resource = pool.get()

964

if validate_func(resource):

965

return resource

966

else:

967

# Resource is invalid, don't return to pool

968

resource.close()

969

if attempt == max_retries - 1:

970

raise RuntimeError("Unable to get valid resource")

971

972

def safe_pool_put(pool, resource, cleanup_func=None):

973

"""Safely return resource to pool"""

974

try:

975

if hasattr(resource, 'is_valid') and resource.is_valid():

976

pool.put(resource)

977

else:

978

if cleanup_func:

979

cleanup_func(resource)

980

except Exception as e:

981

print(f"Error returning resource to pool: {e}")

982

if cleanup_func:

983

cleanup_func(resource)

984

```