CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gevent

A coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of libev event loop

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

pooling.mddocs/

Pooling

Tools for managing groups of greenlets including pools with size limits, groups for coordination, and thread pools for CPU-bound operations. These provide structured concurrency management and resource control.

Capabilities

Greenlet Pools

Limited-size pools of greenlets for controlling concurrency.

class Pool:
    """
    Pool of greenlets with maximum size limit.
    """
    
    def __init__(self, size=None, greenlet_class=None):
        """
        Create a pool.
        
        Parameters:
        - size: int, maximum pool size (None for unlimited)
        - greenlet_class: class, greenlet class to use
        """
    
    def spawn(self, function, *args, **kwargs) -> Greenlet:
        """
        Spawn function in pool, blocking if pool is full.
        
        Parameters:
        - function: callable to execute
        - *args: positional arguments
        - **kwargs: keyword arguments
        
        Returns:
        Greenlet object
        """
    
    def map(self, func, iterable, maxsize=None):
        """
        Map function over iterable using pool.
        
        Parameters:
        - func: function to apply
        - iterable: sequence to process
        - maxsize: int, buffer size for results
        
        Returns:
        list of results in order
        """
    
    def imap(self, func, iterable, maxsize=None):
        """
        Lazy map function over iterable using pool.
        
        Parameters:
        - func: function to apply
        - iterable: sequence to process
        - maxsize: int, buffer size for results
        
        Yields:
        Results as they become available
        """
    
    def imap_unordered(self, func, iterable, maxsize=None):
        """
        Lazy map returning results in completion order.
        
        Parameters:
        - func: function to apply
        - iterable: sequence to process
        - maxsize: int, buffer size for results
        
        Yields:
        Results as they complete (unordered)
        """
    
    def apply(self, func, args=(), kwds={}):
        """
        Synchronously apply function with arguments.
        
        Parameters:
        - func: function to call
        - args: tuple, positional arguments
        - kwds: dict, keyword arguments
        
        Returns:
        Function result
        """
    
    def apply_async(self, func, args=(), kwds={}, callback=None):
        """
        Asynchronously apply function with arguments.
        
        Parameters:
        - func: function to call
        - args: tuple, positional arguments
        - kwds: dict, keyword arguments
        - callback: callable, called with result
        
        Returns:
        AsyncResult object
        """
    
    def join(self, timeout=None, raise_error=False):
        """
        Wait for all greenlets in pool to finish.
        
        Parameters:
        - timeout: float, maximum time to wait
        - raise_error: bool, raise exception if any failed
        
        Returns:
        None
        """
    
    def kill(self, exception=GreenletExit, block=True, timeout=None):
        """
        Kill all greenlets in pool.
        
        Parameters:
        - exception: exception to raise in greenlets
        - block: bool, wait for greenlets to die
        - timeout: float, maximum time to wait
        
        Returns:
        None
        """
    
    @property
    def size(self) -> int:
        """Current number of greenlets in pool."""
    
    @property
    def free_count(self) -> int:
        """Number of available slots in pool."""

class PoolFull(Exception):
    """Exception raised when pool is full and cannot accept more greenlets."""

Greenlet Groups

Unbound collections of greenlets for coordination.

class Group:
    """
    Collection of greenlets for coordination without size limits.
    """
    
    def add(self, greenlet):
        """
        Add greenlet to group.
        
        Parameters:
        - greenlet: Greenlet object to add
        
        Returns:
        None
        """
    
    def discard(self, greenlet):
        """
        Remove greenlet from group.
        
        Parameters:
        - greenlet: Greenlet object to remove
        
        Returns:
        None
        """
    
    def spawn(self, function, *args, **kwargs) -> Greenlet:
        """
        Spawn greenlet and add to group.
        
        Parameters:
        - function: callable to execute
        - *args: positional arguments
        - **kwargs: keyword arguments
        
        Returns:
        Greenlet object
        """
    
    def map(self, func, iterable):
        """
        Map function over iterable using group greenlets.
        
        Parameters:
        - func: function to apply
        - iterable: sequence to process
        
        Returns:
        list of results in order
        """
    
    def imap(self, func, iterable):
        """
        Lazy map function over iterable.
        
        Parameters:
        - func: function to apply
        - iterable: sequence to process
        
        Yields:
        Results as they become available
        """
    
    def join(self, timeout=None, raise_error=False):
        """
        Wait for all greenlets in group to finish.
        
        Parameters:
        - timeout: float, maximum time to wait
        - raise_error: bool, raise exception if any failed
        
        Returns:
        None
        """
    
    def kill(self, exception=GreenletExit, block=True, timeout=None):
        """
        Kill all greenlets in group.
        
        Parameters:
        - exception: exception to raise in greenlets
        - block: bool, wait for greenlets to die
        - timeout: float, maximum time to wait
        
        Returns:
        None
        """
    
    def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
        """
        Kill one greenlet in group.
        
        Parameters:
        - greenlet: Greenlet object to kill
        - exception: exception to raise
        - block: bool, wait for greenlet to die
        - timeout: float, maximum time to wait
        
        Returns:
        None
        """

Thread Pools

Thread pools for CPU-bound operations that need true parallelism.

class ThreadPool:
    """
    Pool of OS threads for CPU-bound operations.
    """
    
    def __init__(self, maxsize, hub=None):
        """
        Create thread pool.
        
        Parameters:
        - maxsize: int, maximum number of threads
        - hub: Hub, hub to use for coordination
        """
    
    def spawn(self, func, *args, **kwargs) -> ThreadResult:
        """
        Execute function in thread pool.
        
        Parameters:
        - func: function to execute
        - *args: positional arguments
        - **kwargs: keyword arguments
        
        Returns:
        ThreadResult object
        """
    
    def apply(self, func, args=(), kwds={}):
        """
        Synchronously execute function in thread.
        
        Parameters:
        - func: function to execute
        - args: tuple, positional arguments
        - kwds: dict, keyword arguments
        
        Returns:
        Function result
        """
    
    def apply_async(self, func, args=(), kwds={}, callback=None):
        """
        Asynchronously execute function in thread.
        
        Parameters:
        - func: function to execute
        - args: tuple, positional arguments
        - kwds: dict, keyword arguments
        - callback: callable, called with result
        
        Returns:
        AsyncResult object
        """
    
    def kill(self):
        """
        Kill all threads in pool.
        
        Returns:
        None
        """

class ThreadResult:
    """
    Result object for thread pool operations.
    """
    
    def get(self, timeout=None):
        """
        Get the result, blocking if necessary.
        
        Parameters:
        - timeout: float, maximum time to wait
        
        Returns:
        Function result
        """
    
    def ready(self) -> bool:
        """
        Check if result is available.
        
        Returns:
        bool, True if result is ready
        """

Usage Examples

Pool for Concurrent Requests

import gevent
from gevent import pool
from gevent import socket
import time

def fetch_url(url):
    # Simulate HTTP request
    print(f"Fetching {url}")
    gevent.sleep(1)  # Simulate network delay
    return f"Content from {url}"

# Create pool with max 5 concurrent operations
request_pool = pool.Pool(5)

urls = [f"http://example.com/page{i}" for i in range(20)]

# Process URLs with pool
start_time = time.time()
results = request_pool.map(fetch_url, urls)
end_time = time.time()

print(f"Processed {len(urls)} URLs in {end_time - start_time:.2f} seconds")
print(f"First result: {results[0]}")

Group for Related Tasks

import gevent
from gevent import pool

def worker(task_id, duration):
    print(f"Task {task_id} starting")
    gevent.sleep(duration)
    print(f"Task {task_id} completed")
    return f"Result {task_id}"

# Create group for related tasks
task_group = pool.Group()

# Add tasks to group
for i in range(5):
    task_group.spawn(worker, i, i * 0.5)

# Wait for all tasks
print("Waiting for all tasks to complete...")
task_group.join()
print("All tasks completed!")

Thread Pool for CPU-Bound Work

import gevent
from gevent import threadpool
import math

def cpu_intensive_task(n):
    """CPU-intensive task that benefits from real threads."""
    result = 0
    for i in range(n):
        result += math.sqrt(i)
    return result

# Create thread pool
thread_pool = threadpool.ThreadPool(4)

# Execute CPU-bound tasks
tasks = [100000, 200000, 300000, 400000]
results = []

for task in tasks:
    result = thread_pool.spawn(cpu_intensive_task, task)
    results.append(result)

# Get results
for i, result in enumerate(results):
    value = result.get()  # Blocks until complete
    print(f"Task {i}: {value:.2f}")

thread_pool.kill()

Pool with Error Handling

import gevent
from gevent import pool
import random

def unreliable_task(task_id):
    gevent.sleep(random.uniform(0.1, 0.5))
    if random.random() < 0.3:  # 30% chance of failure
        raise Exception(f"Task {task_id} failed")
    return f"Success {task_id}"

# Create pool
work_pool = pool.Pool(3)

# Process tasks with error handling
task_ids = range(10)
results = []

for task_id in task_ids:
    greenlet = work_pool.spawn(unreliable_task, task_id)
    results.append(greenlet)

# Check results
for i, greenlet in enumerate(results):
    try:
        result = greenlet.get()
        print(f"Task {i}: {result}")
    except Exception as e:
        print(f"Task {i} failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-gevent

docs

core-greenlets.md

index.md

monkey-patching.md

networking.md

pooling.md

queues.md

servers.md

synchronization.md

timeouts.md

tile.json