A coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of libev event loop
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Thread-safe synchronization objects adapted for cooperative greenlet concurrency. These primitives enable coordination between greenlets while maintaining the cooperative nature of gevent's event loop.
Binary state signaling between greenlets.
class Event:
"""
Event object for signaling between greenlets.
"""
def set(self):
"""
Set the event, notifying all waiting greenlets.
Returns:
None
"""
def clear(self):
"""
Clear the event.
Returns:
None
"""
def wait(self, timeout=None) -> bool:
"""
Wait for the event to be set.
Parameters:
- timeout: float, maximum time to wait in seconds
Returns:
bool, True if event was set, False if timeout occurred
"""
def is_set(self) -> bool:
"""
Check if the event is set.
Returns:
bool, True if event is set
"""
def isSet(self) -> bool:
"""
Alias for is_set() for threading compatibility.
Returns:
bool, True if event is set
"""Container for results that may not be available immediately.
class AsyncResult:
"""
Container for a result that may not be available yet.
"""
def set(self, value=None):
"""
Set the result value.
Parameters:
- value: any, the result value
Returns:
None
"""
def set_exception(self, exception):
"""
Set an exception as the result.
Parameters:
- exception: Exception object
Returns:
None
"""
def get(self, block=True, timeout=None):
"""
Get the result, blocking if necessary.
Parameters:
- block: bool, whether to block if result not ready
- timeout: float, maximum time to wait in seconds
Returns:
The result value
Raises:
The stored exception if one was set
"""
def ready(self) -> bool:
"""
Check if result is available.
Returns:
bool, True if result is ready
"""
def successful(self) -> bool:
"""
Check if result completed successfully.
Returns:
bool, True if completed without exception
"""
def wait(self, timeout=None) -> bool:
"""
Wait for result to be available.
Parameters:
- timeout: float, maximum time to wait in seconds
Returns:
bool, True if result became available
"""Counting synchronization primitive to limit resource access.
class Semaphore:
"""
Semaphore for limiting concurrent access to resources.
"""
def __init__(self, value=1):
"""
Initialize semaphore with given value.
Parameters:
- value: int, initial semaphore count
"""
def acquire(self, blocking=True, timeout=None) -> bool:
"""
Acquire the semaphore.
Parameters:
- blocking: bool, whether to block if semaphore unavailable
- timeout: float, maximum time to wait in seconds
Returns:
bool, True if acquired successfully
"""
def release(self):
"""
Release the semaphore.
Returns:
None
"""
def wait(self, timeout=None) -> bool:
"""
Wait for semaphore to become available.
Parameters:
- timeout: float, maximum time to wait in seconds
Returns:
bool, True if semaphore became available
"""
class BoundedSemaphore(Semaphore):
"""
Semaphore with maximum value bounds to prevent over-release.
"""
class DummySemaphore:
"""
No-op semaphore for testing or disabled synchronization.
"""Mutual exclusion locks for protecting critical sections.
class RLock:
"""
Reentrant lock that can be acquired multiple times by same greenlet.
"""
def acquire(self, blocking=True, timeout=None) -> bool:
"""
Acquire the lock.
Parameters:
- blocking: bool, whether to block if lock unavailable
- timeout: float, maximum time to wait in seconds
Returns:
bool, True if acquired successfully
"""
def release(self):
"""
Release the lock.
Returns:
None
"""Primitives for waiting on multiple objects.
def iwait(objects, timeout=None, count=None):
"""
Iteratively wait for objects to become ready.
Parameters:
- objects: iterable of objects to wait on
- timeout: float, maximum time to wait in seconds
- count: int, maximum number of objects to wait for
Yields:
Objects as they become ready
"""
def wait(objects, timeout=None, count=None) -> list:
"""
Wait for objects to become ready.
Parameters:
- objects: iterable of objects to wait on
- timeout: float, maximum time to wait in seconds
- count: int, maximum number of objects to wait for
Returns:
list of objects that became ready
"""import gevent
from gevent import event
# Create event
ready = event.Event()
def waiter(name):
print(f"{name} waiting for event")
ready.wait()
print(f"{name} proceeding")
def setter():
gevent.sleep(2)
print("Setting event")
ready.set()
# Start waiters and setter
gevent.joinall([
gevent.spawn(waiter, "Worker 1"),
gevent.spawn(waiter, "Worker 2"),
gevent.spawn(setter)
])import gevent
from gevent import event
def producer(result):
# Simulate work
gevent.sleep(1)
result.set("Produced data")
def consumer(result):
data = result.get() # Blocks until available
print(f"Consumed: {data}")
result = event.AsyncResult()
gevent.joinall([
gevent.spawn(producer, result),
gevent.spawn(consumer, result)
])import gevent
from gevent import lock
# Limit to 3 concurrent workers
pool = lock.Semaphore(3)
def worker(name):
with pool: # Context manager support
print(f"{name} acquired resource")
gevent.sleep(1) # Simulate work
print(f"{name} releasing resource")
# Start more workers than pool size
workers = [gevent.spawn(worker, f"Worker {i}") for i in range(10)]
gevent.joinall(workers)Install with Tessl CLI
npx tessl i tessl/pypi-gevent