Highly concurrent networking library
—
Generic resource pooling for managing expensive resources like database connections, file handles, or network connections with automatic lifecycle management and connection reuse.
A flexible pool for managing any type of resource with creation, validation, and cleanup callbacks.
class Pool:
"""
Generic resource pool with configurable creation and cleanup.
"""
def __init__(self, min_size=0, max_size=4, order_as_stack=False, create=None):
"""
Create a resource pool.
Parameters:
- min_size: int, minimum number of resources to pre-populate (default: 0)
- max_size: int, maximum number of resources in the pool (default: 4)
- order_as_stack: bool, whether to use LIFO ordering (default: False)
- create: callable that creates new resources when needed
"""
def get(self):
"""
Get a resource from the pool, creating one if necessary.
Returns:
Resource from the pool
Note:
Resources must be returned to the pool using put()
"""
def put(self, item):
"""
Return a resource to the pool.
Parameters:
- item: resource to return to the pool
Returns:
None
"""
def resize(self, new_size):
"""
Resize the pool to a new maximum size.
Parameters:
- new_size: int, new maximum pool size
Returns:
None
"""
def __len__(self):
"""
Get the current number of resources in the pool.
Returns:
int: number of resources currently in pool
"""
def item(self):
"""
Context manager for getting and automatically returning a resource.
Returns:
Context manager that yields a resource from the pool
Usage:
with pool.item() as resource:
# use resource
pass
# resource automatically returned to pool
"""
class TokenPool:
"""
A pool that gives out unique opaque tokens instead of creating resources.
Useful for limiting concurrency without managing actual resources.
"""
def __init__(self, max_size=4):
"""
Create a token pool.
Parameters:
- max_size: int, maximum number of tokens available
"""
def get(self):
"""
Get a token from the pool.
Returns:
Opaque token object
Note:
Tokens must be returned using put()
"""
def put(self, token):
"""
Return a token to the pool.
Parameters:
- token: token to return
Returns:
None
"""Specialized pools for managing database connections with connection lifecycle and error handling.
class BaseConnectionPool:
"""
Base class for database connection pools.
"""
def __init__(self, db_module, *args, **kwargs):
"""
Create a database connection pool.
Parameters:
- db_module: database module (e.g., psycopg2, MySQLdb)
- *args: arguments for database connection
- **kwargs: keyword arguments for database connection
"""
def get(self):
"""
Get a database connection from the pool.
Returns:
Database connection object
"""
def put(self, conn):
"""
Return a database connection to the pool.
Parameters:
- conn: database connection to return
Returns:
None
"""
class ConnectionPool(BaseConnectionPool):
"""
Default database connection pool (alias for TpooledConnectionPool).
Uses tpool.Proxy to execute database operations in threads.
"""
class TpooledConnectionPool(BaseConnectionPool):
"""
Connection pool using tpool.Proxy for database connections.
Database operations are executed in a thread pool.
"""
class RawConnectionPool(BaseConnectionPool):
"""
Connection pool with plain database connections.
Database operations run in the main greenthread.
"""
class DatabaseConnector:
"""
Maintains separate connection pools for different database hosts.
"""
def __init__(self, db_module, *args, **kwargs):
"""
Create a database connector.
Parameters:
- db_module: database module
- *args: default connection arguments
- **kwargs: default connection keyword arguments
"""
def get(self, host=None, database=None):
"""
Get a connection for a specific host/database.
Parameters:
- host: database host (uses default if None)
- database: database name (uses default if None)
Returns:
Database connection
"""
def put(self, conn, host=None, database=None):
"""
Return a connection to the appropriate pool.
Parameters:
- conn: database connection
- host: database host
- database: database name
Returns:
None
"""
class ConnectTimeout(Exception):
"""
Exception raised when database connection times out.
"""
passThread pool for executing blocking operations without blocking the event loop.
def execute(method, *args, **kwargs):
"""
Execute a method in the thread pool, blocking the current greenthread.
Parameters:
- method: callable to execute in thread pool
- *args: positional arguments for method
- **kwargs: keyword arguments for method
Returns:
Return value of method
Raises:
Any exception raised by method
"""
class Proxy:
"""
Proxy object that forwards method calls to the native thread pool.
"""
def __init__(self, obj, autowrap=None):
"""
Create a proxy for an object.
Parameters:
- obj: object to proxy
- autowrap: tuple of method names to automatically proxy
"""
def killall():
"""
Kill all threads in the thread pool.
Returns:
None
"""
def set_num_threads(num_threads):
"""
Set the number of threads in the thread pool.
Parameters:
- num_threads: int, number of threads to use
Returns:
None
"""import eventlet
from eventlet import pools
import random
import time
class DatabaseConnection:
"""Mock database connection"""
def __init__(self, connection_id):
self.connection_id = connection_id
self.created_at = time.time()
self.query_count = 0
print(f"Created connection {self.connection_id}")
def query(self, sql):
"""Execute a query"""
self.query_count += 1
# Simulate query execution time
eventlet.sleep(random.uniform(0.1, 0.3))
return f"Result for '{sql}' from connection {self.connection_id}"
def close(self):
"""Close connection"""
print(f"Closed connection {self.connection_id} (executed {self.query_count} queries)")
def create_connection():
"""Factory function to create new connections"""
connection_id = random.randint(1000, 9999)
return DatabaseConnection(connection_id)
def database_worker(worker_id, pool, queries):
"""Worker that uses database connections from pool"""
print(f"Worker {worker_id} starting")
for query in queries:
# Get connection from pool
conn = pool.get()
try:
# Use the connection
result = conn.query(query)
print(f"Worker {worker_id}: {result}")
finally:
# Always return connection to pool
pool.put(conn)
# Small delay between queries
eventlet.sleep(0.1)
print(f"Worker {worker_id} finished")
def basic_pool_example():
"""Example of basic resource pool usage"""
# Create a pool with max 3 connections
db_pool = pools.Pool(create=create_connection, max_size=3)
# Define queries for workers
worker_queries = [
["SELECT * FROM users", "SELECT * FROM orders"],
["SELECT COUNT(*) FROM products", "SELECT * FROM categories"],
["UPDATE users SET last_login = NOW()", "SELECT * FROM logs"],
["SELECT * FROM settings", "INSERT INTO audit_log VALUES (...)"]
]
print("Starting database workers with connection pool...")
# Start multiple workers
greenthreads = []
for i, queries in enumerate(worker_queries):
gt = eventlet.spawn(database_worker, i+1, db_pool, queries)
greenthreads.append(gt)
# Wait for all workers to complete
for gt in greenthreads:
gt.wait()
print(f"Pool has {len(db_pool)} connections remaining")
if __name__ == "__main__":
basic_pool_example()import eventlet
from eventlet import pools
import random
def rate_limited_task(task_id, token_pool, duration):
"""Task that requires a token to run (limits concurrency)"""
print(f"Task {task_id} waiting for token...")
# Get token (blocks if none available)
token = token_pool.get()
try:
print(f"Task {task_id} got token, starting work...")
# Simulate work
eventlet.sleep(duration)
print(f"Task {task_id} completed work ({duration:.1f}s)")
finally:
# Always return the token
token_pool.put(token)
print(f"Task {task_id} returned token")
def token_pool_example():
"""Example using token pool to limit concurrency"""
# Create token pool - max 2 concurrent tasks
token_pool = pools.TokenPool(max_size=2)
# Create tasks with different durations
tasks = [
(1, 2.0), # Task 1 takes 2 seconds
(2, 1.5), # Task 2 takes 1.5 seconds
(3, 1.0), # Task 3 takes 1 second
(4, 2.5), # Task 4 takes 2.5 seconds
(5, 0.5), # Task 5 takes 0.5 seconds
]
print("Starting rate-limited tasks (max 2 concurrent)...")
start_time = time.time()
# Start all tasks
greenthreads = []
for task_id, duration in tasks:
gt = eventlet.spawn(rate_limited_task, task_id, token_pool, duration)
greenthreads.append(gt)
# Wait for completion
for gt in greenthreads:
gt.wait()
total_time = time.time() - start_time
print(f"All tasks completed in {total_time:.1f} seconds")
if __name__ == "__main__":
token_pool_example()import eventlet
from eventlet import db_pool, tpool
import sqlite3
import tempfile
import os
def setup_test_database():
"""Create a test SQLite database"""
fd, db_path = tempfile.mkstemp(suffix='.db')
os.close(fd)
# Create test table and data
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Insert test data
test_users = [
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com'),
('Charlie', 'charlie@example.com'),
('Diana', 'diana@example.com'),
('Eve', 'eve@example.com')
]
cursor.executemany('INSERT INTO users (name, email) VALUES (?, ?)', test_users)
conn.commit()
conn.close()
return db_path
def database_worker(worker_id, pool, operations):
"""Worker that performs database operations"""
print(f"DB Worker {worker_id} starting")
for operation, params in operations:
# Get connection from pool
conn = pool.get()
try:
cursor = conn.cursor()
if operation == 'select':
cursor.execute("SELECT * FROM users WHERE name LIKE ?", (f"%{params}%",))
results = cursor.fetchall()
print(f"Worker {worker_id}: Found {len(results)} users matching '{params}'")
elif operation == 'count':
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"Worker {worker_id}: Total users: {count}")
elif operation == 'insert':
name, email = params
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
conn.commit()
print(f"Worker {worker_id}: Inserted user {name}")
cursor.close()
except Exception as e:
print(f"Worker {worker_id} error: {e}")
finally:
# Return connection to pool
pool.put(conn)
# Small delay between operations
eventlet.sleep(0.1)
print(f"DB Worker {worker_id} finished")
def database_pool_example():
"""Example using database connection pool"""
db_path = setup_test_database()
try:
# Create connection pool with SQLite
# Using RawConnectionPool for simplicity (SQLite doesn't need threading)
pool = db_pool.RawConnectionPool(sqlite3, db_path, check_same_thread=False)
# Define operations for workers
worker_operations = [
[('count', None), ('select', 'A')], # Worker 1
[('select', 'B'), ('count', None)], # Worker 2
[('insert', ('Frank', 'frank@example.com')), ('count', None)], # Worker 3
[('select', 'e'), ('insert', ('Grace', 'grace@example.com'))], # Worker 4
]
print("Starting database operations with connection pool...")
# Start workers
greenthreads = []
for i, operations in enumerate(worker_operations):
gt = eventlet.spawn(database_worker, i+1, pool, operations)
greenthreads.append(gt)
# Wait for completion
for gt in greenthreads:
gt.wait()
print("All database operations completed")
finally:
# Clean up
os.unlink(db_path)
if __name__ == "__main__":
database_pool_example()import eventlet
from eventlet import tpool
import time
import os
import hashlib
def cpu_intensive_task(data, iterations=1000000):
"""CPU-intensive task that would block the event loop"""
# This simulates a CPU-bound operation
result = hashlib.sha256(data.encode()).hexdigest()
for i in range(iterations):
result = hashlib.sha256(result.encode()).hexdigest()
return {
'data': data,
'iterations': iterations,
'result': result[:16] # First 16 chars of final hash
}
def blocking_io_task(filename):
"""Blocking I/O task"""
# This simulates a blocking I/O operation
with open(filename, 'w') as f:
for i in range(10000):
f.write(f"Line {i}: Some data here\n")
# Read it back
with open(filename, 'r') as f:
lines = f.readlines()
os.unlink(filename) # Clean up
return {
'filename': filename,
'lines_written': 10000,
'lines_read': len(lines)
}
def mixed_workload_example():
"""Example mixing I/O-bound and CPU-bound tasks"""
print("Starting mixed workload (I/O + CPU tasks)...")
print("I/O tasks run in greenthreads, CPU tasks in thread pool")
def io_bound_worker(worker_id):
"""I/O-bound worker using cooperative I/O"""
start_time = time.time()
# Simulate network I/O
eventlet.sleep(random.uniform(0.5, 1.5))
elapsed = time.time() - start_time
return f"I/O Worker {worker_id} completed in {elapsed:.2f}s"
def cpu_bound_worker(worker_id, data):
"""CPU-bound worker using thread pool"""
start_time = time.time()
# Execute CPU-intensive task in thread pool
result = tpool.execute(cpu_intensive_task, data, 500000)
elapsed = time.time() - start_time
return f"CPU Worker {worker_id} completed in {elapsed:.2f}s - hash: {result['result']}"
def blocking_io_worker(worker_id):
"""Blocking I/O worker using thread pool"""
start_time = time.time()
filename = f"/tmp/test_file_{worker_id}.txt"
# Execute blocking I/O in thread pool
result = tpool.execute(blocking_io_task, filename)
elapsed = time.time() - start_time
return f"Blocking I/O Worker {worker_id} completed in {elapsed:.2f}s - {result['lines_read']} lines"
# Start mixed workload
greenthreads = []
# I/O-bound tasks (run in greenthreads)
for i in range(3):
gt = eventlet.spawn(io_bound_worker, i+1)
greenthreads.append(gt)
# CPU-bound tasks (run in thread pool)
for i in range(3):
data = f"test_data_{i}"
gt = eventlet.spawn(cpu_bound_worker, i+1, data)
greenthreads.append(gt)
# Blocking I/O tasks (run in thread pool)
for i in range(2):
gt = eventlet.spawn(blocking_io_worker, i+1)
greenthreads.append(gt)
# Collect results
print("\nResults:")
for gt in greenthreads:
result = gt.wait()
print(f" {result}")
def thread_pool_proxy_example():
"""Example using tpool.Proxy for automatic thread pool execution"""
class ExpensiveCalculator:
"""Class with expensive operations"""
def __init__(self, name):
self.name = name
def fibonacci(self, n):
"""CPU-intensive Fibonacci calculation"""
if n <= 1:
return n
return self.fibonacci(n-1) + self.fibonacci(n-2)
def factorial(self, n):
"""CPU-intensive factorial calculation"""
if n <= 1:
return 1
return n * self.factorial(n-1)
def process_data(self, data_size):
"""Simulate data processing"""
data = list(range(data_size))
return sum(x*x for x in data)
# Create calculator and proxy it to thread pool
calculator = ExpensiveCalculator("Calculator1")
# Wrap with proxy - all method calls go to thread pool
proxied_calculator = tpool.Proxy(calculator)
print("Testing thread pool proxy...")
def run_calculations(calc_id, proxy_calc):
"""Run calculations using proxied calculator"""
start_time = time.time()
results = {
'calc_id': calc_id,
'fibonacci_30': proxy_calc.fibonacci(30),
'factorial_10': proxy_calc.factorial(10),
'process_data': proxy_calc.process_data(10000)
}
elapsed = time.time() - start_time
results['elapsed'] = elapsed
return results
# Run multiple calculations concurrently
# Each will execute in the thread pool without blocking other greenthreads
greenthreads = []
for i in range(3):
gt = eventlet.spawn(run_calculations, i+1, proxied_calculator)
greenthreads.append(gt)
print("Calculations running in thread pool...")
# Collect results
for gt in greenthreads:
result = gt.wait()
print(f"Calculator {result['calc_id']} results ({result['elapsed']:.2f}s):")
print(f" Fibonacci(30): {result['fibonacci_30']}")
print(f" Factorial(10): {result['factorial_10']}")
print(f" Process data: {result['process_data']}")
if __name__ == "__main__":
import random
print("=== Mixed Workload Example ===")
mixed_workload_example()
print("\n=== Thread Pool Proxy Example ===")
thread_pool_proxy_example()
# Configure thread pool
print(f"\nConfiguring thread pool...")
tpool.set_num_threads(4) # Use 4 threads
print("Thread pool configured with 4 threads")import eventlet
from eventlet import pools
import time
import random
class ManagedResource:
"""Resource with lifecycle management"""
def __init__(self, resource_id):
self.resource_id = resource_id
self.created_at = time.time()
self.last_used = time.time()
self.use_count = 0
self.is_valid = True
print(f"Created resource {self.resource_id}")
def use(self, operation):
"""Use the resource"""
if not self.is_valid:
raise RuntimeError(f"Resource {self.resource_id} is invalid")
self.last_used = time.time()
self.use_count += 1
# Simulate work
eventlet.sleep(random.uniform(0.1, 0.5))
# Occasionally invalidate resource (simulate connection loss)
if random.random() < 0.1: # 10% chance
self.is_valid = False
print(f"Resource {self.resource_id} became invalid")
return f"Operation '{operation}' completed with resource {self.resource_id}"
def is_expired(self, max_age=30, max_idle=10):
"""Check if resource should be expired"""
now = time.time()
age = now - self.created_at
idle = now - self.last_used
return age > max_age or idle > max_idle or not self.is_valid
def close(self):
"""Clean up resource"""
print(f"Closed resource {self.resource_id} (used {self.use_count} times)")
class ManagedPool:
"""Pool with resource validation and cleanup"""
def __init__(self, create_func, max_size=4, max_age=30, max_idle=10):
self.create_func = create_func
self.max_size = max_size
self.max_age = max_age
self.max_idle = max_idle
self.pool = pools.Pool(create=self._create_resource, max_size=max_size)
self.all_resources = set()
def _create_resource(self):
"""Create and track a new resource"""
resource = self.create_func()
self.all_resources.add(resource)
return resource
def get(self):
"""Get a validated resource from pool"""
while True:
resource = self.pool.get()
if not resource.is_expired(self.max_age, self.max_idle):
return resource
else:
# Resource is expired, remove and create new one
print(f"Removing expired resource {resource.resource_id}")
self.all_resources.discard(resource)
resource.close()
# Continue loop to get a fresh resource
def put(self, resource):
"""Return resource to pool if still valid"""
if resource in self.all_resources and not resource.is_expired():
self.pool.put(resource)
else:
# Resource is expired or invalid
self.all_resources.discard(resource)
resource.close()
def cleanup_expired(self):
"""Clean up expired resources"""
expired = [r for r in self.all_resources if r.is_expired(self.max_age, self.max_idle)]
for resource in expired:
self.all_resources.discard(resource)
resource.close()
print(f"Cleaned up {len(expired)} expired resources")
def stats(self):
"""Get pool statistics"""
valid_resources = [r for r in self.all_resources if not r.is_expired()]
return {
'total_resources': len(self.all_resources),
'valid_resources': len(valid_resources),
'pool_size': len(self.pool),
'expired_resources': len(self.all_resources) - len(valid_resources)
}
def create_managed_resource():
"""Factory for creating managed resources"""
resource_id = random.randint(1000, 9999)
return ManagedResource(resource_id)
def managed_pool_example():
"""Example with resource lifecycle management"""
# Create managed pool
managed_pool = ManagedPool(
create_func=create_managed_resource,
max_size=3,
max_age=10, # Resources expire after 10 seconds
max_idle=5 # Resources expire after 5 seconds of inactivity
)
def worker_with_validation(worker_id, operations):
"""Worker that handles resource validation"""
print(f"Managed worker {worker_id} starting")
for operation in operations:
try:
# Get validated resource
resource = managed_pool.get()
# Use resource
result = resource.use(operation)
print(f"Worker {worker_id}: {result}")
# Return to pool
managed_pool.put(resource)
except Exception as e:
print(f"Worker {worker_id} error: {e}")
# Random delay between operations
eventlet.sleep(random.uniform(0.5, 2.0))
print(f"Managed worker {worker_id} finished")
# Start workers
worker_operations = [
['read', 'write', 'update'],
['query', 'insert', 'delete'],
['backup', 'restore', 'verify'],
['sync', 'analyze', 'optimize']
]
greenthreads = []
for i, operations in enumerate(worker_operations):
gt = eventlet.spawn(worker_with_validation, i+1, operations)
greenthreads.append(gt)
# Monitor pool stats
def monitor_pool():
"""Monitor pool statistics"""
for _ in range(15): # Monitor for 15 seconds
eventlet.sleep(1)
stats = managed_pool.stats()
print(f"Pool stats: {stats}")
# Periodic cleanup
if _ % 5 == 0:
managed_pool.cleanup_expired()
eventlet.spawn(monitor_pool)
# Wait for workers
for gt in greenthreads:
gt.wait()
# Final cleanup
managed_pool.cleanup_expired()
final_stats = managed_pool.stats()
print(f"Final pool stats: {final_stats}")
if __name__ == "__main__":
managed_pool_example()# For database connections
db_pool = eventlet.db_pool.ConnectionPool(
psycopg2,
host='localhost',
database='myapp',
user='user',
password='password',
max_size=20, # Match your database connection limit
pool_timeout=30 # Timeout waiting for connection
)
# For HTTP client connections
http_pool = eventlet.pools.Pool(
create=lambda: requests.Session(),
max_size=50 # Higher for I/O-bound operations
)
# For limiting concurrent operations
rate_limiter = eventlet.pools.TokenPool(max_size=10) # Max 10 concurrentdef validated_pool_get(pool, validate_func):
"""Get resource with validation"""
max_retries = 3
for attempt in range(max_retries):
resource = pool.get()
if validate_func(resource):
return resource
else:
# Resource is invalid, don't return to pool
resource.close()
if attempt == max_retries - 1:
raise RuntimeError("Unable to get valid resource")
def safe_pool_put(pool, resource, cleanup_func=None):
"""Safely return resource to pool"""
try:
if hasattr(resource, 'is_valid') and resource.is_valid():
pool.put(resource)
else:
if cleanup_func:
cleanup_func(resource)
except Exception as e:
print(f"Error returning resource to pool: {e}")
if cleanup_func:
cleanup_func(resource)Install with Tessl CLI
npx tessl i tessl/pypi-eventlet