Highly concurrent networking library
—
Thread-safe synchronization primitives for coordinating between greenthreads, including events, semaphores, queues, and timeouts.
Events provide a way for greenthreads to wait for and signal the completion of operations.
class Event:
"""
A synchronization primitive that allows greenthreads to wait for an event
to occur and to signal that event to waiting greenthreads.
"""
def __init__(self):
"""Create a new Event object."""
def send(self, result=None):
"""
Signal the event and wake all waiting greenthreads.
Parameters:
- result: value to send to waiting greenthreads
Returns:
None
"""
def wait(self):
"""
Block until another greenthread calls send().
Returns:
The result value passed to send(), if any
"""
def ready(self):
"""
Check if the event has been sent.
Returns:
bool: True if send() has been called, False otherwise
"""
def reset(self):
"""
Reset the event to unsent state.
Returns:
None
"""
def has_exception(self):
"""
Check if the event was sent with an exception.
Returns:
bool: True if event has an exception, False otherwise
"""
def has_result(self):
"""
Check if the event was sent with a result (not an exception).
Returns:
bool: True if event has a result value, False otherwise
"""
def poll(self, notready=None):
"""
Return the result if ready, otherwise return notready value.
Parameters:
- notready: value to return if event is not ready
Returns:
Event result if ready, notready value otherwise
"""
def poll_exception(self, notready=None):
"""
Return the exception if available, otherwise return notready value.
Parameters:
- notready: value to return if no exception is available
Returns:
Exception if available, notready value otherwise
"""
def poll_result(self, notready=None):
"""
Return the result if available (and not an exception), otherwise return notready value.
Parameters:
- notready: value to return if no result is available
Returns:
Result value if available, notready value otherwise
"""
def send_exception(self, *args):
"""
Signal the event with an exception to wake waiting greenthreads.
Parameters:
- *args: exception arguments (same as for raise statement)
Returns:
None
"""Semaphores control access to resources by maintaining a counter that tracks available resources.
class Semaphore:
"""
A semaphore manages a counter representing number of release() calls
minus the number of acquire() calls, plus an initial value.
"""
def __init__(self, value=1):
"""
Create a semaphore.
Parameters:
- value: int, initial counter value (default: 1)
"""
def acquire(self, blocking=True):
"""
Acquire the semaphore, decrementing the counter.
Parameters:
- blocking: bool, whether to block if counter is zero
Returns:
bool: True if acquired, False if non-blocking and unavailable
"""
def release(self):
"""
Release the semaphore, incrementing the counter.
Returns:
None
"""
def locked(self):
"""
Check if the semaphore counter is zero.
Returns:
bool: True if counter is zero, False otherwise
"""
@property
def balance(self):
"""
Get the current counter value.
Returns:
int: current semaphore counter value
"""
class BoundedSemaphore(Semaphore):
"""
A semaphore that prevents release() from incrementing counter above
the initial value.
"""
def __init__(self, value=1):
"""
Create a bounded semaphore.
Parameters:
- value: int, initial and maximum counter value
"""
def release(self):
"""
Release the semaphore if counter is below initial value.
Raises:
ValueError: if counter would exceed initial value
"""
class CappedSemaphore:
"""
A blockingly bounded semaphore with configurable upper and lower bounds.
"""
def __init__(self, balance, lower_bound=None, upper_bound=None):
"""
Create a capped semaphore.
Parameters:
- balance: int, initial counter value
- lower_bound: int, minimum counter value (default: 0)
- upper_bound: int, maximum counter value (default: balance)
"""
def acquire(self, blocking=True):
"""
Acquire the semaphore, potentially blocking on lower bound.
Parameters:
- blocking: bool, whether to block if at lower bound
Returns:
bool: True if acquired, False if non-blocking and at bound
"""
def release(self, blocking=True):
"""
Release the semaphore, potentially blocking on upper bound.
Parameters:
- blocking: bool, whether to block if at upper bound
Returns:
bool: True if released, False if non-blocking and at bound
"""Thread-safe queues for passing data between greenthreads with various ordering semantics.
class Queue:
"""
Multi-producer multi-consumer queue for greenthreads with task tracking.
"""
def __init__(self, maxsize=0):
"""
Create a queue.
Parameters:
- maxsize: int, maximum queue size (0 = unlimited)
"""
def put(self, item, block=True, timeout=None):
"""
Put an item into the queue.
Parameters:
- item: object to put in queue
- block: bool, whether to block if queue is full
- timeout: float, maximum time to wait if blocking
Raises:
Full: if queue is full and non-blocking or timeout exceeded
"""
def get(self, block=True, timeout=None):
"""
Get an item from the queue.
Parameters:
- block: bool, whether to block if queue is empty
- timeout: float, maximum time to wait if blocking
Returns:
Item from the queue
Raises:
Empty: if queue is empty and non-blocking or timeout exceeded
"""
def put_nowait(self, item):
"""
Put item without blocking.
Raises:
Full: if queue is full
"""
def get_nowait(self):
"""
Get item without blocking.
Returns:
Item from queue
Raises:
Empty: if queue is empty
"""
def task_done(self):
"""
Indicate that a formerly enqueued task is complete.
"""
def join(self):
"""
Block until all items in the queue have been processed.
"""
def qsize(self):
"""
Return approximate queue size.
Returns:
int: number of items in queue
"""
def empty(self):
"""
Check if queue is empty.
Returns:
bool: True if empty, False otherwise
"""
def full(self):
"""
Check if queue is full.
Returns:
bool: True if full, False otherwise
"""
class PriorityQueue(Queue):
"""
A queue where items are ordered by priority (lowest first).
Items should be comparable or tuples of (priority, item).
"""
class LifoQueue(Queue):
"""
A Last-In-First-Out (LIFO) queue implementation.
"""
class LightQueue:
"""
A faster queue implementation without task_done() and join() support.
"""
def __init__(self, maxsize=0):
"""
Create a light queue.
Parameters:
- maxsize: int, maximum queue size (0 = unlimited)
"""
def put(self, item, block=True, timeout=None):
"""Put an item into the queue."""
def get(self, block=True, timeout=None):
"""Get an item from the queue."""Context managers and utilities for implementing timeouts in greenthread operations.
class Timeout:
"""
A timeout context manager that raises an exception after a delay.
"""
def __init__(self, seconds=None, exception=None):
"""
Create a timeout.
Parameters:
- seconds: float, timeout duration in seconds
- exception: exception instance to raise (default: Timeout)
"""
def __enter__(self):
"""
Start the timeout context.
Returns:
self
"""
def __exit__(self, typ, value, tb):
"""
Exit the timeout context and cancel timeout if active.
Returns:
bool: False to propagate exceptions, True to suppress timeout
"""
def cancel(self):
"""
Cancel the timeout if it hasn't triggered yet.
"""
def with_timeout(seconds, func, *args, **kwargs):
"""
Call func with timeout protection.
Parameters:
- seconds: float, timeout duration
- func: callable to invoke
- *args: positional arguments for func
- **kwargs: keyword arguments for func
Returns:
Return value of func
Raises:
Timeout: if func doesn't complete within timeout
"""
def is_timeout(obj):
"""
Check if an object is a timeout exception.
Parameters:
- obj: object to check
Returns:
bool: True if obj is timeout-related exception
"""
def wrap_is_timeout(base_exception):
"""
Add is_timeout=True attribute to exceptions created by base function.
Parameters:
- base_exception: exception class or instance
Returns:
Wrapped exception with is_timeout attribute
"""import eventlet
# Event synchronization example
event = eventlet.Event()
def waiter():
print("Waiting for event...")
result = event.wait()
print(f"Got result: {result}")
def sender():
eventlet.sleep(2) # Simulate work
event.send("Hello from sender!")
eventlet.spawn(waiter)
eventlet.spawn(sender)
# Semaphore example
sem = eventlet.Semaphore(2) # Allow 2 concurrent accesses
def worker(name):
with sem: # Acquire on enter, release on exit
print(f"Worker {name} accessing resource")
eventlet.sleep(1)
print(f"Worker {name} done")
for i in range(5):
eventlet.spawn(worker, i)
# Queue example
q = eventlet.Queue()
def producer():
for i in range(5):
q.put(f"item {i}")
eventlet.sleep(0.1)
def consumer():
while True:
item = q.get()
print(f"Got: {item}")
q.task_done()
eventlet.spawn(producer)
eventlet.spawn(consumer)
q.join() # Wait for all tasks to complete
# Timeout example
try:
with eventlet.Timeout(5.0):
# This will timeout after 5 seconds
eventlet.sleep(10)
except eventlet.Timeout:
print("Operation timed out!")
# Function with timeout
try:
result = eventlet.with_timeout(2.0, slow_function, arg1, arg2)
except eventlet.Timeout:
print("Function call timed out!")class Full(Exception):
"""Raised when a queue is full and put() is called with block=False."""
pass
class Empty(Exception):
"""Raised when a queue is empty and get() is called with block=False."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-eventlet