A coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of libev event loop
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Tools for managing groups of greenlets including pools with size limits, groups for coordination, and thread pools for CPU-bound operations. These provide structured concurrency management and resource control.
Limited-size pools of greenlets for controlling concurrency.
class Pool:
"""
Pool of greenlets with maximum size limit.
"""
def __init__(self, size=None, greenlet_class=None):
"""
Create a pool.
Parameters:
- size: int, maximum pool size (None for unlimited)
- greenlet_class: class, greenlet class to use
"""
def spawn(self, function, *args, **kwargs) -> Greenlet:
"""
Spawn function in pool, blocking if pool is full.
Parameters:
- function: callable to execute
- *args: positional arguments
- **kwargs: keyword arguments
Returns:
Greenlet object
"""
def map(self, func, iterable, maxsize=None):
"""
Map function over iterable using pool.
Parameters:
- func: function to apply
- iterable: sequence to process
- maxsize: int, buffer size for results
Returns:
list of results in order
"""
def imap(self, func, iterable, maxsize=None):
"""
Lazy map function over iterable using pool.
Parameters:
- func: function to apply
- iterable: sequence to process
- maxsize: int, buffer size for results
Yields:
Results as they become available
"""
def imap_unordered(self, func, iterable, maxsize=None):
"""
Lazy map returning results in completion order.
Parameters:
- func: function to apply
- iterable: sequence to process
- maxsize: int, buffer size for results
Yields:
Results as they complete (unordered)
"""
def apply(self, func, args=(), kwds={}):
"""
Synchronously apply function with arguments.
Parameters:
- func: function to call
- args: tuple, positional arguments
- kwds: dict, keyword arguments
Returns:
Function result
"""
def apply_async(self, func, args=(), kwds={}, callback=None):
"""
Asynchronously apply function with arguments.
Parameters:
- func: function to call
- args: tuple, positional arguments
- kwds: dict, keyword arguments
- callback: callable, called with result
Returns:
AsyncResult object
"""
def join(self, timeout=None, raise_error=False):
"""
Wait for all greenlets in pool to finish.
Parameters:
- timeout: float, maximum time to wait
- raise_error: bool, raise exception if any failed
Returns:
None
"""
def kill(self, exception=GreenletExit, block=True, timeout=None):
"""
Kill all greenlets in pool.
Parameters:
- exception: exception to raise in greenlets
- block: bool, wait for greenlets to die
- timeout: float, maximum time to wait
Returns:
None
"""
@property
def size(self) -> int:
"""Current number of greenlets in pool."""
@property
def free_count(self) -> int:
"""Number of available slots in pool."""
class PoolFull(Exception):
"""Exception raised when pool is full and cannot accept more greenlets."""Unbound collections of greenlets for coordination.
class Group:
"""
Collection of greenlets for coordination without size limits.
"""
def add(self, greenlet):
"""
Add greenlet to group.
Parameters:
- greenlet: Greenlet object to add
Returns:
None
"""
def discard(self, greenlet):
"""
Remove greenlet from group.
Parameters:
- greenlet: Greenlet object to remove
Returns:
None
"""
def spawn(self, function, *args, **kwargs) -> Greenlet:
"""
Spawn greenlet and add to group.
Parameters:
- function: callable to execute
- *args: positional arguments
- **kwargs: keyword arguments
Returns:
Greenlet object
"""
def map(self, func, iterable):
"""
Map function over iterable using group greenlets.
Parameters:
- func: function to apply
- iterable: sequence to process
Returns:
list of results in order
"""
def imap(self, func, iterable):
"""
Lazy map function over iterable.
Parameters:
- func: function to apply
- iterable: sequence to process
Yields:
Results as they become available
"""
def join(self, timeout=None, raise_error=False):
"""
Wait for all greenlets in group to finish.
Parameters:
- timeout: float, maximum time to wait
- raise_error: bool, raise exception if any failed
Returns:
None
"""
def kill(self, exception=GreenletExit, block=True, timeout=None):
"""
Kill all greenlets in group.
Parameters:
- exception: exception to raise in greenlets
- block: bool, wait for greenlets to die
- timeout: float, maximum time to wait
Returns:
None
"""
def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
"""
Kill one greenlet in group.
Parameters:
- greenlet: Greenlet object to kill
- exception: exception to raise
- block: bool, wait for greenlet to die
- timeout: float, maximum time to wait
Returns:
None
"""Thread pools for CPU-bound operations that need true parallelism.
class ThreadPool:
"""
Pool of OS threads for CPU-bound operations.
"""
def __init__(self, maxsize, hub=None):
"""
Create thread pool.
Parameters:
- maxsize: int, maximum number of threads
- hub: Hub, hub to use for coordination
"""
def spawn(self, func, *args, **kwargs) -> ThreadResult:
"""
Execute function in thread pool.
Parameters:
- func: function to execute
- *args: positional arguments
- **kwargs: keyword arguments
Returns:
ThreadResult object
"""
def apply(self, func, args=(), kwds={}):
"""
Synchronously execute function in thread.
Parameters:
- func: function to execute
- args: tuple, positional arguments
- kwds: dict, keyword arguments
Returns:
Function result
"""
def apply_async(self, func, args=(), kwds={}, callback=None):
"""
Asynchronously execute function in thread.
Parameters:
- func: function to execute
- args: tuple, positional arguments
- kwds: dict, keyword arguments
- callback: callable, called with result
Returns:
AsyncResult object
"""
def kill(self):
"""
Kill all threads in pool.
Returns:
None
"""
class ThreadResult:
"""
Result object for thread pool operations.
"""
def get(self, timeout=None):
"""
Get the result, blocking if necessary.
Parameters:
- timeout: float, maximum time to wait
Returns:
Function result
"""
def ready(self) -> bool:
"""
Check if result is available.
Returns:
bool, True if result is ready
"""import gevent
from gevent import pool
from gevent import socket
import time
def fetch_url(url):
# Simulate HTTP request
print(f"Fetching {url}")
gevent.sleep(1) # Simulate network delay
return f"Content from {url}"
# Create pool with max 5 concurrent operations
request_pool = pool.Pool(5)
urls = [f"http://example.com/page{i}" for i in range(20)]
# Process URLs with pool
start_time = time.time()
results = request_pool.map(fetch_url, urls)
end_time = time.time()
print(f"Processed {len(urls)} URLs in {end_time - start_time:.2f} seconds")
print(f"First result: {results[0]}")import gevent
from gevent import pool
def worker(task_id, duration):
print(f"Task {task_id} starting")
gevent.sleep(duration)
print(f"Task {task_id} completed")
return f"Result {task_id}"
# Create group for related tasks
task_group = pool.Group()
# Add tasks to group
for i in range(5):
task_group.spawn(worker, i, i * 0.5)
# Wait for all tasks
print("Waiting for all tasks to complete...")
task_group.join()
print("All tasks completed!")import gevent
from gevent import threadpool
import math
def cpu_intensive_task(n):
"""CPU-intensive task that benefits from real threads."""
result = 0
for i in range(n):
result += math.sqrt(i)
return result
# Create thread pool
thread_pool = threadpool.ThreadPool(4)
# Execute CPU-bound tasks
tasks = [100000, 200000, 300000, 400000]
results = []
for task in tasks:
result = thread_pool.spawn(cpu_intensive_task, task)
results.append(result)
# Get results
for i, result in enumerate(results):
value = result.get() # Blocks until complete
print(f"Task {i}: {value:.2f}")
thread_pool.kill()import gevent
from gevent import pool
import random
def unreliable_task(task_id):
gevent.sleep(random.uniform(0.1, 0.5))
if random.random() < 0.3: # 30% chance of failure
raise Exception(f"Task {task_id} failed")
return f"Success {task_id}"
# Create pool
work_pool = pool.Pool(3)
# Process tasks with error handling
task_ids = range(10)
results = []
for task_id in task_ids:
greenlet = work_pool.spawn(unreliable_task, task_id)
results.append(greenlet)
# Check results
for i, greenlet in enumerate(results):
try:
result = greenlet.get()
print(f"Task {i}: {result}")
except Exception as e:
print(f"Task {i} failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-gevent