Highly concurrent networking library
—
Managed pools of green threads for controlling concurrency levels and resource usage in high-throughput applications. Thread pools help prevent resource exhaustion while maximizing performance.
A pool that manages a fixed number of green threads, providing controlled concurrency and resource management.
class GreenPool:
"""
A pool of green threads with a maximum size constraint.
Manages concurrency levels and provides convenient methods for
spawning work across the pool.
"""
def __init__(self, size=1000):
"""
Create a green thread pool.
Parameters:
- size: int, maximum number of concurrent greenthreads
"""
def spawn(self, func, *args, **kwargs):
"""
Spawn a function in the pool, blocking if pool is full.
Parameters:
- func: callable to run in pool
- *args: positional arguments for func
- **kwargs: keyword arguments for func
Returns:
GreenThread instance for retrieving results
"""
def spawn_n(self, func, *args, **kwargs):
"""
Spawn a function in the pool without return value access.
Parameters:
- func: callable to run in pool
- *args: positional arguments for func
- **kwargs: keyword arguments for func
Returns:
None
"""
def waitall(self):
"""
Wait for all spawned greenthreads in the pool to complete.
Returns:
None
"""
def resize(self, new_size):
"""
Change the maximum size of the pool.
Parameters:
- new_size: int, new maximum pool size
Returns:
None
"""
def running(self):
"""
Get the number of currently running greenthreads.
Returns:
int: number of active greenthreads
"""
def free(self):
"""
Get the number of available slots in the pool.
Returns:
int: number of free slots (size - running)
"""
def starmap(self, func, iterable):
"""
Apply func to each item in iterable, spawning across the pool.
Similar to itertools.starmap but with green thread pool execution.
Parameters:
- func: callable to apply to each item
- iterable: iterable of argument tuples for func
Returns:
iterator of results
"""
def imap(self, func, *iterables):
"""
Apply func to items from iterables in parallel across the pool.
Similar to itertools.imap but with green thread pool execution.
Parameters:
- func: callable to apply to items
- *iterables: one or more iterables
Returns:
iterator of results in order
"""A data structure for managing I/O-related tasks that provides easy iteration over results as they become available.
class GreenPile:
"""
A pile of green threads for I/O operations.
Results can be retrieved as they become available through iteration.
"""
def __init__(self, size_or_pool=1000):
"""
Create a green pile.
Parameters:
- size_or_pool: int (pool size) or GreenPool instance
"""
def spawn(self, func, *args, **kwargs):
"""
Spawn a function and add its result to the pile.
Parameters:
- func: callable to run
- *args: positional arguments for func
- **kwargs: keyword arguments for func
Returns:
GreenThread instance
"""
def __iter__(self):
"""
Iterate over results as they become available.
Returns:
iterator yielding results in completion order
"""
def next(self):
"""
Get the next available result.
Returns:
Next completed result
Raises:
StopIteration: when no more results available
"""
def __len__(self):
"""
Get the number of spawned tasks.
Returns:
int: number of tasks in the pile
"""import eventlet
def worker(task_id, duration):
"""Simulate some work"""
print(f"Task {task_id} starting")
eventlet.sleep(duration)
print(f"Task {task_id} completed")
return f"Result {task_id}"
# Create a pool with max 5 concurrent greenthreads
pool = eventlet.GreenPool(5)
# Spawn multiple tasks
results = []
for i in range(10):
gt = pool.spawn(worker, i, 1.0)
results.append(gt)
# Wait for all tasks to complete
pool.waitall()
# Collect results
final_results = [gt.wait() for gt in results]
print(f"All results: {final_results}")import eventlet
def fetch_url(url):
"""Simulate fetching a URL"""
# In real usage, would use eventlet.green.urllib or similar
eventlet.sleep(0.5) # Simulate network delay
return f"Content from {url}"
urls = [
"http://example.com/page1",
"http://example.com/page2",
"http://example.com/page3",
"http://example.com/page4",
"http://example.com/page5"
]
pool = eventlet.GreenPool(3) # Max 3 concurrent requests
# Use imap to process URLs and get results in order
for result in pool.imap(fetch_url, urls):
print(f"Got: {result}")
# Use starmap with argument tuples
def fetch_with_headers(url, headers):
eventlet.sleep(0.3)
return f"Content from {url} with headers {headers}"
url_header_pairs = [
("http://api.example.com/users", {"Authorization": "Bearer token1"}),
("http://api.example.com/posts", {"Authorization": "Bearer token2"}),
("http://api.example.com/comments", {"Authorization": "Bearer token3"})
]
for result in pool.starmap(fetch_with_headers, url_header_pairs):
print(f"API result: {result}")import eventlet
def fetch_data(source):
"""Simulate fetching data from different sources"""
import random
delay = random.uniform(0.1, 2.0) # Random delay
eventlet.sleep(delay)
return f"Data from {source} (took {delay:.2f}s)"
# Create a pile for managing multiple I/O operations
pile = eventlet.GreenPile(10)
# Spawn multiple data fetches
sources = ["database", "api", "cache", "file", "network"]
for source in sources:
pile.spawn(fetch_data, source)
# Process results as they become available (not in original order)
print("Processing results as they arrive:")
for result in pile:
print(f"Received: {result}")
print("All operations completed")import eventlet
def monitoring_task():
"""Simulate a monitoring task"""
eventlet.sleep(1.0)
return "Monitor check complete"
pool = eventlet.GreenPool(2) # Start with small pool
# Spawn some initial tasks
for i in range(3):
pool.spawn(monitoring_task)
print(f"Pool stats - Running: {pool.running()}, Free: {pool.free()}")
# Increase pool size based on demand
if pool.free() == 0:
print("Pool full, increasing size")
pool.resize(5)
print(f"After resize - Running: {pool.running()}, Free: {pool.free()}")
# Spawn more tasks with increased capacity
for i in range(3, 7):
pool.spawn(monitoring_task)
# Wait for completion
pool.waitall()
print("All monitoring tasks completed")import eventlet
def producer(pool, queue, num_items):
"""Produce items using a pool of workers"""
def create_item(item_id):
eventlet.sleep(0.1) # Simulate work
return f"item_{item_id}"
# Use pool to create items concurrently
for i in range(num_items):
gt = pool.spawn(create_item, i)
# Put the greenthread in queue so consumer can wait for result
queue.put(gt)
def consumer(queue, num_items):
"""Consume items as they're produced"""
for _ in range(num_items):
gt = queue.get() # Get the greenthread
item = gt.wait() # Wait for the actual result
print(f"Consumed: {item}")
# Set up producer-consumer with pool
production_pool = eventlet.GreenPool(5)
item_queue = eventlet.Queue()
# Start producer and consumer
eventlet.spawn(producer, production_pool, item_queue, 10)
eventlet.spawn(consumer, item_queue, 10)
# Let them run
eventlet.sleep(5)import eventlet
# For I/O-bound tasks (network, disk)
io_pool = eventlet.GreenPool(100) # Higher concurrency
# For CPU-bound tasks or resource-limited operations
cpu_pool = eventlet.GreenPool(10) # Lower concurrency
# For database connections (limited by DB connection pool)
db_pool = eventlet.GreenPool(20) # Match DB pool sizeimport eventlet
def risky_task(task_id):
"""Task that might fail"""
if task_id % 3 == 0:
raise ValueError(f"Task {task_id} failed!")
return f"Success {task_id}"
pool = eventlet.GreenPool(5)
greenthreads = []
# Spawn tasks
for i in range(10):
gt = pool.spawn(risky_task, i)
greenthreads.append(gt)
# Collect results with error handling
results = []
for gt in greenthreads:
try:
result = gt.wait()
results.append(result)
except Exception as e:
print(f"Task failed: {e}")
results.append(None)
print(f"Successful results: {[r for r in results if r is not None]}")Install with Tessl CLI
npx tessl i tessl/pypi-eventlet