0
# Resource Pooling
1
2
Generic resource pooling for managing expensive resources like database connections, file handles, or network connections with automatic lifecycle management and connection reuse.
3
4
## Capabilities
5
6
### Generic Resource Pool
7
8
A flexible pool for managing any type of resource with creation, validation, and cleanup callbacks.
9
10
```python { .api }
11
class Pool:
12
"""
13
Generic resource pool with configurable creation and cleanup.
14
"""
15
16
def __init__(self, min_size=0, max_size=4, order_as_stack=False, create=None):
17
"""
18
Create a resource pool.
19
20
Parameters:
21
- min_size: int, minimum number of resources to pre-populate (default: 0)
22
- max_size: int, maximum number of resources in the pool (default: 4)
23
- order_as_stack: bool, whether to use LIFO ordering (default: False)
24
- create: callable that creates new resources when needed
25
"""
26
27
def get(self):
28
"""
29
Get a resource from the pool, creating one if necessary.
30
31
Returns:
32
Resource from the pool
33
34
Note:
35
Resources must be returned to the pool using put()
36
"""
37
38
def put(self, item):
39
"""
40
Return a resource to the pool.
41
42
Parameters:
43
- item: resource to return to the pool
44
45
Returns:
46
None
47
"""
48
49
def resize(self, new_size):
50
"""
51
Resize the pool to a new maximum size.
52
53
Parameters:
54
- new_size: int, new maximum pool size
55
56
Returns:
57
None
58
"""
59
60
def __len__(self):
61
"""
62
Get the current number of resources in the pool.
63
64
Returns:
65
int: number of resources currently in pool
66
"""
67
68
def item(self):
69
"""
70
Context manager for getting and automatically returning a resource.
71
72
Returns:
73
Context manager that yields a resource from the pool
74
75
Usage:
76
with pool.item() as resource:
77
# use resource
78
pass
79
# resource automatically returned to pool
80
"""
81
82
class TokenPool:
83
"""
84
A pool that gives out unique opaque tokens instead of creating resources.
85
Useful for limiting concurrency without managing actual resources.
86
"""
87
88
def __init__(self, max_size=4):
89
"""
90
Create a token pool.
91
92
Parameters:
93
- max_size: int, maximum number of tokens available
94
"""
95
96
def get(self):
97
"""
98
Get a token from the pool.
99
100
Returns:
101
Opaque token object
102
103
Note:
104
Tokens must be returned using put()
105
"""
106
107
def put(self, token):
108
"""
109
Return a token to the pool.
110
111
Parameters:
112
- token: token to return
113
114
Returns:
115
None
116
"""
117
```
118
119
### Database Connection Pooling
120
121
Specialized pools for managing database connections with connection lifecycle and error handling.
122
123
```python { .api }
124
class BaseConnectionPool:
125
"""
126
Base class for database connection pools.
127
"""
128
129
def __init__(self, db_module, *args, **kwargs):
130
"""
131
Create a database connection pool.
132
133
Parameters:
134
- db_module: database module (e.g., psycopg2, MySQLdb)
135
- *args: arguments for database connection
136
- **kwargs: keyword arguments for database connection
137
"""
138
139
def get(self):
140
"""
141
Get a database connection from the pool.
142
143
Returns:
144
Database connection object
145
"""
146
147
def put(self, conn):
148
"""
149
Return a database connection to the pool.
150
151
Parameters:
152
- conn: database connection to return
153
154
Returns:
155
None
156
"""
157
158
class ConnectionPool(BaseConnectionPool):
159
"""
160
Default database connection pool (alias for TpooledConnectionPool).
161
Uses tpool.Proxy to execute database operations in threads.
162
"""
163
164
class TpooledConnectionPool(BaseConnectionPool):
165
"""
166
Connection pool using tpool.Proxy for database connections.
167
Database operations are executed in a thread pool.
168
"""
169
170
class RawConnectionPool(BaseConnectionPool):
171
"""
172
Connection pool with plain database connections.
173
Database operations run in the main greenthread.
174
"""
175
176
class DatabaseConnector:
177
"""
178
Maintains separate connection pools for different database hosts.
179
"""
180
181
def __init__(self, db_module, *args, **kwargs):
182
"""
183
Create a database connector.
184
185
Parameters:
186
- db_module: database module
187
- *args: default connection arguments
188
- **kwargs: default connection keyword arguments
189
"""
190
191
def get(self, host=None, database=None):
192
"""
193
Get a connection for a specific host/database.
194
195
Parameters:
196
- host: database host (uses default if None)
197
- database: database name (uses default if None)
198
199
Returns:
200
Database connection
201
"""
202
203
def put(self, conn, host=None, database=None):
204
"""
205
Return a connection to the appropriate pool.
206
207
Parameters:
208
- conn: database connection
209
- host: database host
210
- database: database name
211
212
Returns:
213
None
214
"""
215
216
class ConnectTimeout(Exception):
217
"""
218
Exception raised when database connection times out.
219
"""
220
pass
221
```
222
223
### Thread Pool for Blocking Operations
224
225
Thread pool for executing blocking operations without blocking the event loop.
226
227
```python { .api }
228
def execute(method, *args, **kwargs):
229
"""
230
Execute a method in the thread pool, blocking the current greenthread.
231
232
Parameters:
233
- method: callable to execute in thread pool
234
- *args: positional arguments for method
235
- **kwargs: keyword arguments for method
236
237
Returns:
238
Return value of method
239
240
Raises:
241
Any exception raised by method
242
"""
243
244
class Proxy:
245
"""
246
Proxy object that forwards method calls to the native thread pool.
247
"""
248
249
def __init__(self, obj, autowrap=None):
250
"""
251
Create a proxy for an object.
252
253
Parameters:
254
- obj: object to proxy
255
- autowrap: tuple of method names to automatically proxy
256
"""
257
258
def killall():
259
"""
260
Kill all threads in the thread pool.
261
262
Returns:
263
None
264
"""
265
266
def set_num_threads(num_threads):
267
"""
268
Set the number of threads in the thread pool.
269
270
Parameters:
271
- num_threads: int, number of threads to use
272
273
Returns:
274
None
275
"""
276
```
277
278
## Usage Examples
279
280
### Basic Resource Pool
281
282
```python
283
import eventlet
284
from eventlet import pools
285
import random
286
import time
287
288
class DatabaseConnection:
289
"""Mock database connection"""
290
291
def __init__(self, connection_id):
292
self.connection_id = connection_id
293
self.created_at = time.time()
294
self.query_count = 0
295
print(f"Created connection {self.connection_id}")
296
297
def query(self, sql):
298
"""Execute a query"""
299
self.query_count += 1
300
# Simulate query execution time
301
eventlet.sleep(random.uniform(0.1, 0.3))
302
return f"Result for '{sql}' from connection {self.connection_id}"
303
304
def close(self):
305
"""Close connection"""
306
print(f"Closed connection {self.connection_id} (executed {self.query_count} queries)")
307
308
def create_connection():
309
"""Factory function to create new connections"""
310
connection_id = random.randint(1000, 9999)
311
return DatabaseConnection(connection_id)
312
313
def database_worker(worker_id, pool, queries):
314
"""Worker that uses database connections from pool"""
315
print(f"Worker {worker_id} starting")
316
317
for query in queries:
318
# Get connection from pool
319
conn = pool.get()
320
321
try:
322
# Use the connection
323
result = conn.query(query)
324
print(f"Worker {worker_id}: {result}")
325
finally:
326
# Always return connection to pool
327
pool.put(conn)
328
329
# Small delay between queries
330
eventlet.sleep(0.1)
331
332
print(f"Worker {worker_id} finished")
333
334
def basic_pool_example():
335
"""Example of basic resource pool usage"""
336
337
# Create a pool with max 3 connections
338
db_pool = pools.Pool(create=create_connection, max_size=3)
339
340
# Define queries for workers
341
worker_queries = [
342
["SELECT * FROM users", "SELECT * FROM orders"],
343
["SELECT COUNT(*) FROM products", "SELECT * FROM categories"],
344
["UPDATE users SET last_login = NOW()", "SELECT * FROM logs"],
345
["SELECT * FROM settings", "INSERT INTO audit_log VALUES (...)"]
346
]
347
348
print("Starting database workers with connection pool...")
349
350
# Start multiple workers
351
greenthreads = []
352
for i, queries in enumerate(worker_queries):
353
gt = eventlet.spawn(database_worker, i+1, db_pool, queries)
354
greenthreads.append(gt)
355
356
# Wait for all workers to complete
357
for gt in greenthreads:
358
gt.wait()
359
360
print(f"Pool has {len(db_pool)} connections remaining")
361
362
if __name__ == "__main__":
363
basic_pool_example()
364
```
365
366
### Token Pool for Concurrency Control
367
368
```python
369
import eventlet
370
from eventlet import pools
371
import random
372
373
def rate_limited_task(task_id, token_pool, duration):
374
"""Task that requires a token to run (limits concurrency)"""
375
376
print(f"Task {task_id} waiting for token...")
377
378
# Get token (blocks if none available)
379
token = token_pool.get()
380
381
try:
382
print(f"Task {task_id} got token, starting work...")
383
384
# Simulate work
385
eventlet.sleep(duration)
386
387
print(f"Task {task_id} completed work ({duration:.1f}s)")
388
389
finally:
390
# Always return the token
391
token_pool.put(token)
392
print(f"Task {task_id} returned token")
393
394
def token_pool_example():
395
"""Example using token pool to limit concurrency"""
396
397
# Create token pool - max 2 concurrent tasks
398
token_pool = pools.TokenPool(max_size=2)
399
400
# Create tasks with different durations
401
tasks = [
402
(1, 2.0), # Task 1 takes 2 seconds
403
(2, 1.5), # Task 2 takes 1.5 seconds
404
(3, 1.0), # Task 3 takes 1 second
405
(4, 2.5), # Task 4 takes 2.5 seconds
406
(5, 0.5), # Task 5 takes 0.5 seconds
407
]
408
409
print("Starting rate-limited tasks (max 2 concurrent)...")
410
start_time = time.time()
411
412
# Start all tasks
413
greenthreads = []
414
for task_id, duration in tasks:
415
gt = eventlet.spawn(rate_limited_task, task_id, token_pool, duration)
416
greenthreads.append(gt)
417
418
# Wait for completion
419
for gt in greenthreads:
420
gt.wait()
421
422
total_time = time.time() - start_time
423
print(f"All tasks completed in {total_time:.1f} seconds")
424
425
if __name__ == "__main__":
426
token_pool_example()
427
```
428
429
### Database Connection Pool
430
431
```python
432
import eventlet
433
from eventlet import db_pool, tpool
434
import sqlite3
435
import tempfile
436
import os
437
438
def setup_test_database():
439
"""Create a test SQLite database"""
440
fd, db_path = tempfile.mkstemp(suffix='.db')
441
os.close(fd)
442
443
# Create test table and data
444
conn = sqlite3.connect(db_path)
445
cursor = conn.cursor()
446
447
cursor.execute('''
448
CREATE TABLE users (
449
id INTEGER PRIMARY KEY,
450
name TEXT NOT NULL,
451
email TEXT UNIQUE,
452
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
453
)
454
''')
455
456
# Insert test data
457
test_users = [
458
('Alice', 'alice@example.com'),
459
('Bob', 'bob@example.com'),
460
('Charlie', 'charlie@example.com'),
461
('Diana', 'diana@example.com'),
462
('Eve', 'eve@example.com')
463
]
464
465
cursor.executemany('INSERT INTO users (name, email) VALUES (?, ?)', test_users)
466
conn.commit()
467
conn.close()
468
469
return db_path
470
471
def database_worker(worker_id, pool, operations):
472
"""Worker that performs database operations"""
473
print(f"DB Worker {worker_id} starting")
474
475
for operation, params in operations:
476
# Get connection from pool
477
conn = pool.get()
478
479
try:
480
cursor = conn.cursor()
481
482
if operation == 'select':
483
cursor.execute("SELECT * FROM users WHERE name LIKE ?", (f"%{params}%",))
484
results = cursor.fetchall()
485
print(f"Worker {worker_id}: Found {len(results)} users matching '{params}'")
486
487
elif operation == 'count':
488
cursor.execute("SELECT COUNT(*) FROM users")
489
count = cursor.fetchone()[0]
490
print(f"Worker {worker_id}: Total users: {count}")
491
492
elif operation == 'insert':
493
name, email = params
494
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
495
conn.commit()
496
print(f"Worker {worker_id}: Inserted user {name}")
497
498
cursor.close()
499
500
except Exception as e:
501
print(f"Worker {worker_id} error: {e}")
502
finally:
503
# Return connection to pool
504
pool.put(conn)
505
506
# Small delay between operations
507
eventlet.sleep(0.1)
508
509
print(f"DB Worker {worker_id} finished")
510
511
def database_pool_example():
512
"""Example using database connection pool"""
513
514
db_path = setup_test_database()
515
516
try:
517
# Create connection pool with SQLite
518
# Using RawConnectionPool for simplicity (SQLite doesn't need threading)
519
pool = db_pool.RawConnectionPool(sqlite3, db_path, check_same_thread=False)
520
521
# Define operations for workers
522
worker_operations = [
523
[('count', None), ('select', 'A')], # Worker 1
524
[('select', 'B'), ('count', None)], # Worker 2
525
[('insert', ('Frank', 'frank@example.com')), ('count', None)], # Worker 3
526
[('select', 'e'), ('insert', ('Grace', 'grace@example.com'))], # Worker 4
527
]
528
529
print("Starting database operations with connection pool...")
530
531
# Start workers
532
greenthreads = []
533
for i, operations in enumerate(worker_operations):
534
gt = eventlet.spawn(database_worker, i+1, pool, operations)
535
greenthreads.append(gt)
536
537
# Wait for completion
538
for gt in greenthreads:
539
gt.wait()
540
541
print("All database operations completed")
542
543
finally:
544
# Clean up
545
os.unlink(db_path)
546
547
if __name__ == "__main__":
548
database_pool_example()
549
```
550
551
### Thread Pool for Blocking Operations
552
553
```python
554
import eventlet
555
from eventlet import tpool
556
import time
557
import os
558
import hashlib
559
560
def cpu_intensive_task(data, iterations=1000000):
561
"""CPU-intensive task that would block the event loop"""
562
563
# This simulates a CPU-bound operation
564
result = hashlib.sha256(data.encode()).hexdigest()
565
566
for i in range(iterations):
567
result = hashlib.sha256(result.encode()).hexdigest()
568
569
return {
570
'data': data,
571
'iterations': iterations,
572
'result': result[:16] # First 16 chars of final hash
573
}
574
575
def blocking_io_task(filename):
576
"""Blocking I/O task"""
577
578
# This simulates a blocking I/O operation
579
with open(filename, 'w') as f:
580
for i in range(10000):
581
f.write(f"Line {i}: Some data here\n")
582
583
# Read it back
584
with open(filename, 'r') as f:
585
lines = f.readlines()
586
587
os.unlink(filename) # Clean up
588
589
return {
590
'filename': filename,
591
'lines_written': 10000,
592
'lines_read': len(lines)
593
}
594
595
def mixed_workload_example():
596
"""Example mixing I/O-bound and CPU-bound tasks"""
597
598
print("Starting mixed workload (I/O + CPU tasks)...")
599
print("I/O tasks run in greenthreads, CPU tasks in thread pool")
600
601
def io_bound_worker(worker_id):
602
"""I/O-bound worker using cooperative I/O"""
603
start_time = time.time()
604
605
# Simulate network I/O
606
eventlet.sleep(random.uniform(0.5, 1.5))
607
608
elapsed = time.time() - start_time
609
return f"I/O Worker {worker_id} completed in {elapsed:.2f}s"
610
611
def cpu_bound_worker(worker_id, data):
612
"""CPU-bound worker using thread pool"""
613
start_time = time.time()
614
615
# Execute CPU-intensive task in thread pool
616
result = tpool.execute(cpu_intensive_task, data, 500000)
617
618
elapsed = time.time() - start_time
619
return f"CPU Worker {worker_id} completed in {elapsed:.2f}s - hash: {result['result']}"
620
621
def blocking_io_worker(worker_id):
622
"""Blocking I/O worker using thread pool"""
623
start_time = time.time()
624
625
filename = f"/tmp/test_file_{worker_id}.txt"
626
627
# Execute blocking I/O in thread pool
628
result = tpool.execute(blocking_io_task, filename)
629
630
elapsed = time.time() - start_time
631
return f"Blocking I/O Worker {worker_id} completed in {elapsed:.2f}s - {result['lines_read']} lines"
632
633
# Start mixed workload
634
greenthreads = []
635
636
# I/O-bound tasks (run in greenthreads)
637
for i in range(3):
638
gt = eventlet.spawn(io_bound_worker, i+1)
639
greenthreads.append(gt)
640
641
# CPU-bound tasks (run in thread pool)
642
for i in range(3):
643
data = f"test_data_{i}"
644
gt = eventlet.spawn(cpu_bound_worker, i+1, data)
645
greenthreads.append(gt)
646
647
# Blocking I/O tasks (run in thread pool)
648
for i in range(2):
649
gt = eventlet.spawn(blocking_io_worker, i+1)
650
greenthreads.append(gt)
651
652
# Collect results
653
print("\nResults:")
654
for gt in greenthreads:
655
result = gt.wait()
656
print(f" {result}")
657
658
def thread_pool_proxy_example():
659
"""Example using tpool.Proxy for automatic thread pool execution"""
660
661
class ExpensiveCalculator:
662
"""Class with expensive operations"""
663
664
def __init__(self, name):
665
self.name = name
666
667
def fibonacci(self, n):
668
"""CPU-intensive Fibonacci calculation"""
669
if n <= 1:
670
return n
671
return self.fibonacci(n-1) + self.fibonacci(n-2)
672
673
def factorial(self, n):
674
"""CPU-intensive factorial calculation"""
675
if n <= 1:
676
return 1
677
return n * self.factorial(n-1)
678
679
def process_data(self, data_size):
680
"""Simulate data processing"""
681
data = list(range(data_size))
682
return sum(x*x for x in data)
683
684
# Create calculator and proxy it to thread pool
685
calculator = ExpensiveCalculator("Calculator1")
686
687
# Wrap with proxy - all method calls go to thread pool
688
proxied_calculator = tpool.Proxy(calculator)
689
690
print("Testing thread pool proxy...")
691
692
def run_calculations(calc_id, proxy_calc):
693
"""Run calculations using proxied calculator"""
694
start_time = time.time()
695
696
results = {
697
'calc_id': calc_id,
698
'fibonacci_30': proxy_calc.fibonacci(30),
699
'factorial_10': proxy_calc.factorial(10),
700
'process_data': proxy_calc.process_data(10000)
701
}
702
703
elapsed = time.time() - start_time
704
results['elapsed'] = elapsed
705
706
return results
707
708
# Run multiple calculations concurrently
709
# Each will execute in the thread pool without blocking other greenthreads
710
greenthreads = []
711
for i in range(3):
712
gt = eventlet.spawn(run_calculations, i+1, proxied_calculator)
713
greenthreads.append(gt)
714
715
print("Calculations running in thread pool...")
716
717
# Collect results
718
for gt in greenthreads:
719
result = gt.wait()
720
print(f"Calculator {result['calc_id']} results ({result['elapsed']:.2f}s):")
721
print(f" Fibonacci(30): {result['fibonacci_30']}")
722
print(f" Factorial(10): {result['factorial_10']}")
723
print(f" Process data: {result['process_data']}")
724
725
if __name__ == "__main__":
726
import random
727
728
print("=== Mixed Workload Example ===")
729
mixed_workload_example()
730
731
print("\n=== Thread Pool Proxy Example ===")
732
thread_pool_proxy_example()
733
734
# Configure thread pool
735
print(f"\nConfiguring thread pool...")
736
tpool.set_num_threads(4) # Use 4 threads
737
print("Thread pool configured with 4 threads")
738
```
739
740
### Advanced Pool Management
741
742
```python
743
import eventlet
744
from eventlet import pools
745
import time
746
import random
747
748
class ManagedResource:
749
"""Resource with lifecycle management"""
750
751
def __init__(self, resource_id):
752
self.resource_id = resource_id
753
self.created_at = time.time()
754
self.last_used = time.time()
755
self.use_count = 0
756
self.is_valid = True
757
print(f"Created resource {self.resource_id}")
758
759
def use(self, operation):
760
"""Use the resource"""
761
if not self.is_valid:
762
raise RuntimeError(f"Resource {self.resource_id} is invalid")
763
764
self.last_used = time.time()
765
self.use_count += 1
766
767
# Simulate work
768
eventlet.sleep(random.uniform(0.1, 0.5))
769
770
# Occasionally invalidate resource (simulate connection loss)
771
if random.random() < 0.1: # 10% chance
772
self.is_valid = False
773
print(f"Resource {self.resource_id} became invalid")
774
775
return f"Operation '{operation}' completed with resource {self.resource_id}"
776
777
def is_expired(self, max_age=30, max_idle=10):
778
"""Check if resource should be expired"""
779
now = time.time()
780
age = now - self.created_at
781
idle = now - self.last_used
782
783
return age > max_age or idle > max_idle or not self.is_valid
784
785
def close(self):
786
"""Clean up resource"""
787
print(f"Closed resource {self.resource_id} (used {self.use_count} times)")
788
789
class ManagedPool:
790
"""Pool with resource validation and cleanup"""
791
792
def __init__(self, create_func, max_size=4, max_age=30, max_idle=10):
793
self.create_func = create_func
794
self.max_size = max_size
795
self.max_age = max_age
796
self.max_idle = max_idle
797
self.pool = pools.Pool(create=self._create_resource, max_size=max_size)
798
self.all_resources = set()
799
800
def _create_resource(self):
801
"""Create and track a new resource"""
802
resource = self.create_func()
803
self.all_resources.add(resource)
804
return resource
805
806
def get(self):
807
"""Get a validated resource from pool"""
808
while True:
809
resource = self.pool.get()
810
811
if not resource.is_expired(self.max_age, self.max_idle):
812
return resource
813
else:
814
# Resource is expired, remove and create new one
815
print(f"Removing expired resource {resource.resource_id}")
816
self.all_resources.discard(resource)
817
resource.close()
818
# Continue loop to get a fresh resource
819
820
def put(self, resource):
821
"""Return resource to pool if still valid"""
822
if resource in self.all_resources and not resource.is_expired():
823
self.pool.put(resource)
824
else:
825
# Resource is expired or invalid
826
self.all_resources.discard(resource)
827
resource.close()
828
829
def cleanup_expired(self):
830
"""Clean up expired resources"""
831
expired = [r for r in self.all_resources if r.is_expired(self.max_age, self.max_idle)]
832
833
for resource in expired:
834
self.all_resources.discard(resource)
835
resource.close()
836
837
print(f"Cleaned up {len(expired)} expired resources")
838
839
def stats(self):
840
"""Get pool statistics"""
841
valid_resources = [r for r in self.all_resources if not r.is_expired()]
842
843
return {
844
'total_resources': len(self.all_resources),
845
'valid_resources': len(valid_resources),
846
'pool_size': len(self.pool),
847
'expired_resources': len(self.all_resources) - len(valid_resources)
848
}
849
850
def create_managed_resource():
851
"""Factory for creating managed resources"""
852
resource_id = random.randint(1000, 9999)
853
return ManagedResource(resource_id)
854
855
def managed_pool_example():
856
"""Example with resource lifecycle management"""
857
858
# Create managed pool
859
managed_pool = ManagedPool(
860
create_func=create_managed_resource,
861
max_size=3,
862
max_age=10, # Resources expire after 10 seconds
863
max_idle=5 # Resources expire after 5 seconds of inactivity
864
)
865
866
def worker_with_validation(worker_id, operations):
867
"""Worker that handles resource validation"""
868
print(f"Managed worker {worker_id} starting")
869
870
for operation in operations:
871
try:
872
# Get validated resource
873
resource = managed_pool.get()
874
875
# Use resource
876
result = resource.use(operation)
877
print(f"Worker {worker_id}: {result}")
878
879
# Return to pool
880
managed_pool.put(resource)
881
882
except Exception as e:
883
print(f"Worker {worker_id} error: {e}")
884
885
# Random delay between operations
886
eventlet.sleep(random.uniform(0.5, 2.0))
887
888
print(f"Managed worker {worker_id} finished")
889
890
# Start workers
891
worker_operations = [
892
['read', 'write', 'update'],
893
['query', 'insert', 'delete'],
894
['backup', 'restore', 'verify'],
895
['sync', 'analyze', 'optimize']
896
]
897
898
greenthreads = []
899
for i, operations in enumerate(worker_operations):
900
gt = eventlet.spawn(worker_with_validation, i+1, operations)
901
greenthreads.append(gt)
902
903
# Monitor pool stats
904
def monitor_pool():
905
"""Monitor pool statistics"""
906
for _ in range(15): # Monitor for 15 seconds
907
eventlet.sleep(1)
908
stats = managed_pool.stats()
909
print(f"Pool stats: {stats}")
910
911
# Periodic cleanup
912
if _ % 5 == 0:
913
managed_pool.cleanup_expired()
914
915
eventlet.spawn(monitor_pool)
916
917
# Wait for workers
918
for gt in greenthreads:
919
gt.wait()
920
921
# Final cleanup
922
managed_pool.cleanup_expired()
923
final_stats = managed_pool.stats()
924
print(f"Final pool stats: {final_stats}")
925
926
if __name__ == "__main__":
927
managed_pool_example()
928
```
929
930
## Pool Configuration Best Practices
931
932
### Connection Pool Sizing
933
934
```python
935
# For database connections
936
db_pool = eventlet.db_pool.ConnectionPool(
937
psycopg2,
938
host='localhost',
939
database='myapp',
940
user='user',
941
password='password',
942
max_size=20, # Match your database connection limit
943
pool_timeout=30 # Timeout waiting for connection
944
)
945
946
# For HTTP client connections
947
http_pool = eventlet.pools.Pool(
948
create=lambda: requests.Session(),
949
max_size=50 # Higher for I/O-bound operations
950
)
951
952
# For limiting concurrent operations
953
rate_limiter = eventlet.pools.TokenPool(max_size=10) # Max 10 concurrent
954
```
955
956
### Error Handling and Validation
957
958
```python
959
def validated_pool_get(pool, validate_func):
960
"""Get resource with validation"""
961
max_retries = 3
962
for attempt in range(max_retries):
963
resource = pool.get()
964
if validate_func(resource):
965
return resource
966
else:
967
# Resource is invalid, don't return to pool
968
resource.close()
969
if attempt == max_retries - 1:
970
raise RuntimeError("Unable to get valid resource")
971
972
def safe_pool_put(pool, resource, cleanup_func=None):
973
"""Safely return resource to pool"""
974
try:
975
if hasattr(resource, 'is_valid') and resource.is_valid():
976
pool.put(resource)
977
else:
978
if cleanup_func:
979
cleanup_func(resource)
980
except Exception as e:
981
print(f"Error returning resource to pool: {e}")
982
if cleanup_func:
983
cleanup_func(resource)
984
```