CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eventlet

Highly concurrent networking library

Pending
Overview
Eval results
Files

thread-pools.mddocs/

Thread Pools

Managed pools of green threads for controlling concurrency levels and resource usage in high-throughput applications. Thread pools help prevent resource exhaustion while maximizing performance.

Capabilities

Green Thread Pool

A pool that manages a fixed number of green threads, providing controlled concurrency and resource management.

class GreenPool:
    """
    A pool of green threads with a maximum size constraint.
    Manages concurrency levels and provides convenient methods for
    spawning work across the pool.
    """
    
    def __init__(self, size=1000):
        """
        Create a green thread pool.
        
        Parameters:
        - size: int, maximum number of concurrent greenthreads
        """
    
    def spawn(self, func, *args, **kwargs):
        """
        Spawn a function in the pool, blocking if pool is full.
        
        Parameters:
        - func: callable to run in pool
        - *args: positional arguments for func
        - **kwargs: keyword arguments for func
        
        Returns:
        GreenThread instance for retrieving results
        """
    
    def spawn_n(self, func, *args, **kwargs):
        """
        Spawn a function in the pool without return value access.
        
        Parameters:
        - func: callable to run in pool
        - *args: positional arguments for func
        - **kwargs: keyword arguments for func
        
        Returns:
        None
        """
    
    def waitall(self):
        """
        Wait for all spawned greenthreads in the pool to complete.
        
        Returns:
        None
        """
    
    def resize(self, new_size):
        """
        Change the maximum size of the pool.
        
        Parameters:
        - new_size: int, new maximum pool size
        
        Returns:
        None
        """
    
    def running(self):
        """
        Get the number of currently running greenthreads.
        
        Returns:
        int: number of active greenthreads
        """
    
    def free(self):
        """
        Get the number of available slots in the pool.
        
        Returns:
        int: number of free slots (size - running)
        """
    
    def starmap(self, func, iterable):
        """
        Apply func to each item in iterable, spawning across the pool.
        Similar to itertools.starmap but with green thread pool execution.
        
        Parameters:
        - func: callable to apply to each item
        - iterable: iterable of argument tuples for func
        
        Returns:
        iterator of results
        """
    
    def imap(self, func, *iterables):
        """
        Apply func to items from iterables in parallel across the pool.
        Similar to itertools.imap but with green thread pool execution.
        
        Parameters:
        - func: callable to apply to items
        - *iterables: one or more iterables
        
        Returns:
        iterator of results in order
        """

Green Pile

A data structure for managing I/O-related tasks that provides easy iteration over results as they become available.

class GreenPile:
    """
    A pile of green threads for I/O operations.
    Results can be retrieved as they become available through iteration.
    """
    
    def __init__(self, size_or_pool=1000):
        """
        Create a green pile.
        
        Parameters:
        - size_or_pool: int (pool size) or GreenPool instance
        """
    
    def spawn(self, func, *args, **kwargs):
        """
        Spawn a function and add its result to the pile.
        
        Parameters:
        - func: callable to run
        - *args: positional arguments for func
        - **kwargs: keyword arguments for func
        
        Returns:
        GreenThread instance
        """
    
    def __iter__(self):
        """
        Iterate over results as they become available.
        
        Returns:
        iterator yielding results in completion order
        """
    
    def next(self):
        """
        Get the next available result.
        
        Returns:
        Next completed result
        
        Raises:
        StopIteration: when no more results available
        """
    
    def __len__(self):
        """
        Get the number of spawned tasks.
        
        Returns:
        int: number of tasks in the pile
        """

Usage Examples

Basic Pool Usage

import eventlet

def worker(task_id, duration):
    """Simulate some work"""
    print(f"Task {task_id} starting")
    eventlet.sleep(duration)
    print(f"Task {task_id} completed")
    return f"Result {task_id}"

# Create a pool with max 5 concurrent greenthreads
pool = eventlet.GreenPool(5)

# Spawn multiple tasks
results = []
for i in range(10):
    gt = pool.spawn(worker, i, 1.0)
    results.append(gt)

# Wait for all tasks to complete
pool.waitall()

# Collect results
final_results = [gt.wait() for gt in results]
print(f"All results: {final_results}")

Pool with Map Operations

import eventlet

def fetch_url(url):
    """Simulate fetching a URL"""
    # In real usage, would use eventlet.green.urllib or similar
    eventlet.sleep(0.5)  # Simulate network delay
    return f"Content from {url}"

urls = [
    "http://example.com/page1",
    "http://example.com/page2", 
    "http://example.com/page3",
    "http://example.com/page4",
    "http://example.com/page5"
]

pool = eventlet.GreenPool(3)  # Max 3 concurrent requests

# Use imap to process URLs and get results in order
for result in pool.imap(fetch_url, urls):
    print(f"Got: {result}")

# Use starmap with argument tuples
def fetch_with_headers(url, headers):
    eventlet.sleep(0.3)
    return f"Content from {url} with headers {headers}"

url_header_pairs = [
    ("http://api.example.com/users", {"Authorization": "Bearer token1"}),
    ("http://api.example.com/posts", {"Authorization": "Bearer token2"}),
    ("http://api.example.com/comments", {"Authorization": "Bearer token3"})
]

for result in pool.starmap(fetch_with_headers, url_header_pairs):
    print(f"API result: {result}")

Green Pile Usage

import eventlet

def fetch_data(source):
    """Simulate fetching data from different sources"""
    import random
    delay = random.uniform(0.1, 2.0)  # Random delay
    eventlet.sleep(delay)
    return f"Data from {source} (took {delay:.2f}s)"

# Create a pile for managing multiple I/O operations
pile = eventlet.GreenPile(10)

# Spawn multiple data fetches
sources = ["database", "api", "cache", "file", "network"]
for source in sources:
    pile.spawn(fetch_data, source)

# Process results as they become available (not in original order)
print("Processing results as they arrive:")
for result in pile:
    print(f"Received: {result}")

print("All operations completed")

Dynamic Pool Resizing

import eventlet

def monitoring_task():
    """Simulate a monitoring task"""
    eventlet.sleep(1.0)
    return "Monitor check complete"

pool = eventlet.GreenPool(2)  # Start with small pool

# Spawn some initial tasks
for i in range(3):
    pool.spawn(monitoring_task)

print(f"Pool stats - Running: {pool.running()}, Free: {pool.free()}")

# Increase pool size based on demand
if pool.free() == 0:
    print("Pool full, increasing size")
    pool.resize(5)
    print(f"After resize - Running: {pool.running()}, Free: {pool.free()}")

# Spawn more tasks with increased capacity
for i in range(3, 7):
    pool.spawn(monitoring_task)

# Wait for completion
pool.waitall()
print("All monitoring tasks completed")

Producer-Consumer with Pools

import eventlet

def producer(pool, queue, num_items):
    """Produce items using a pool of workers"""
    def create_item(item_id):
        eventlet.sleep(0.1)  # Simulate work
        return f"item_{item_id}"
    
    # Use pool to create items concurrently
    for i in range(num_items):
        gt = pool.spawn(create_item, i)
        # Put the greenthread in queue so consumer can wait for result
        queue.put(gt)

def consumer(queue, num_items):
    """Consume items as they're produced"""
    for _ in range(num_items):
        gt = queue.get()  # Get the greenthread
        item = gt.wait()  # Wait for the actual result
        print(f"Consumed: {item}")

# Set up producer-consumer with pool
production_pool = eventlet.GreenPool(5)
item_queue = eventlet.Queue()

# Start producer and consumer
eventlet.spawn(producer, production_pool, item_queue, 10)
eventlet.spawn(consumer, item_queue, 10)

# Let them run
eventlet.sleep(5)

Pool Management Best Practices

Choosing Pool Size

import eventlet

# For I/O-bound tasks (network, disk)
io_pool = eventlet.GreenPool(100)  # Higher concurrency

# For CPU-bound tasks or resource-limited operations  
cpu_pool = eventlet.GreenPool(10)  # Lower concurrency

# For database connections (limited by DB connection pool)
db_pool = eventlet.GreenPool(20)  # Match DB pool size

Error Handling in Pools

import eventlet

def risky_task(task_id):
    """Task that might fail"""
    if task_id % 3 == 0:
        raise ValueError(f"Task {task_id} failed!")
    return f"Success {task_id}"

pool = eventlet.GreenPool(5)
greenthreads = []

# Spawn tasks
for i in range(10):
    gt = pool.spawn(risky_task, i)
    greenthreads.append(gt)

# Collect results with error handling
results = []
for gt in greenthreads:
    try:
        result = gt.wait()
        results.append(result)
    except Exception as e:
        print(f"Task failed: {e}")
        results.append(None)

print(f"Successful results: {[r for r in results if r is not None]}")

Install with Tessl CLI

npx tessl i tessl/pypi-eventlet

docs

core-concurrency.md

debugging.md

green-stdlib.md

index.md

monkey-patching.md

networking.md

resource-pooling.md

synchronization.md

thread-pools.md

web-server.md

tile.json