CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eventlet

Highly concurrent networking library

Pending
Overview
Eval results
Files

synchronization.mddocs/

Synchronization

Thread-safe synchronization primitives for coordinating between greenthreads, including events, semaphores, queues, and timeouts.

Capabilities

Event Synchronization

Events provide a way for greenthreads to wait for and signal the completion of operations.

class Event:
    """
    A synchronization primitive that allows greenthreads to wait for an event
    to occur and to signal that event to waiting greenthreads.
    """
    
    def __init__(self):
        """Create a new Event object."""
    
    def send(self, result=None):
        """
        Signal the event and wake all waiting greenthreads.
        
        Parameters:
        - result: value to send to waiting greenthreads
        
        Returns:
        None
        """
    
    def wait(self):
        """
        Block until another greenthread calls send().
        
        Returns:
        The result value passed to send(), if any
        """
    
    def ready(self):
        """
        Check if the event has been sent.
        
        Returns:
        bool: True if send() has been called, False otherwise
        """
    
    def reset(self):
        """
        Reset the event to unsent state.
        
        Returns:
        None
        """
    
    def has_exception(self):
        """
        Check if the event was sent with an exception.
        
        Returns:
        bool: True if event has an exception, False otherwise
        """
    
    def has_result(self):
        """
        Check if the event was sent with a result (not an exception).
        
        Returns:
        bool: True if event has a result value, False otherwise
        """
    
    def poll(self, notready=None):
        """
        Return the result if ready, otherwise return notready value.
        
        Parameters:
        - notready: value to return if event is not ready
        
        Returns:
        Event result if ready, notready value otherwise
        """
    
    def poll_exception(self, notready=None):
        """
        Return the exception if available, otherwise return notready value.
        
        Parameters:
        - notready: value to return if no exception is available
        
        Returns:
        Exception if available, notready value otherwise
        """
    
    def poll_result(self, notready=None):
        """
        Return the result if available (and not an exception), otherwise return notready value.
        
        Parameters:
        - notready: value to return if no result is available
        
        Returns:
        Result value if available, notready value otherwise
        """
    
    def send_exception(self, *args):
        """
        Signal the event with an exception to wake waiting greenthreads.
        
        Parameters:
        - *args: exception arguments (same as for raise statement)
        
        Returns:
        None
        """

Semaphore Controls

Semaphores control access to resources by maintaining a counter that tracks available resources.

class Semaphore:
    """
    A semaphore manages a counter representing number of release() calls
    minus the number of acquire() calls, plus an initial value.
    """
    
    def __init__(self, value=1):
        """
        Create a semaphore.
        
        Parameters:
        - value: int, initial counter value (default: 1)
        """
    
    def acquire(self, blocking=True):
        """
        Acquire the semaphore, decrementing the counter.
        
        Parameters:
        - blocking: bool, whether to block if counter is zero
        
        Returns:
        bool: True if acquired, False if non-blocking and unavailable
        """
    
    def release(self):
        """
        Release the semaphore, incrementing the counter.
        
        Returns:
        None
        """
    
    def locked(self):
        """
        Check if the semaphore counter is zero.
        
        Returns:
        bool: True if counter is zero, False otherwise
        """
    
    @property
    def balance(self):
        """
        Get the current counter value.
        
        Returns:
        int: current semaphore counter value
        """

class BoundedSemaphore(Semaphore):
    """
    A semaphore that prevents release() from incrementing counter above
    the initial value.
    """
    
    def __init__(self, value=1):
        """
        Create a bounded semaphore.
        
        Parameters:
        - value: int, initial and maximum counter value
        """
    
    def release(self):
        """
        Release the semaphore if counter is below initial value.
        
        Raises:
        ValueError: if counter would exceed initial value
        """

class CappedSemaphore:
    """
    A blockingly bounded semaphore with configurable upper and lower bounds.
    """
    
    def __init__(self, balance, lower_bound=None, upper_bound=None):
        """
        Create a capped semaphore.
        
        Parameters:
        - balance: int, initial counter value
        - lower_bound: int, minimum counter value (default: 0)
        - upper_bound: int, maximum counter value (default: balance)
        """
    
    def acquire(self, blocking=True):
        """
        Acquire the semaphore, potentially blocking on lower bound.
        
        Parameters:
        - blocking: bool, whether to block if at lower bound
        
        Returns:
        bool: True if acquired, False if non-blocking and at bound
        """
    
    def release(self, blocking=True):
        """
        Release the semaphore, potentially blocking on upper bound.
        
        Parameters:
        - blocking: bool, whether to block if at upper bound
        
        Returns:
        bool: True if released, False if non-blocking and at bound
        """

Queue Operations

Thread-safe queues for passing data between greenthreads with various ordering semantics.

class Queue:
    """
    Multi-producer multi-consumer queue for greenthreads with task tracking.
    """
    
    def __init__(self, maxsize=0):
        """
        Create a queue.
        
        Parameters:
        - maxsize: int, maximum queue size (0 = unlimited)
        """
    
    def put(self, item, block=True, timeout=None):
        """
        Put an item into the queue.
        
        Parameters:
        - item: object to put in queue
        - block: bool, whether to block if queue is full
        - timeout: float, maximum time to wait if blocking
        
        Raises:
        Full: if queue is full and non-blocking or timeout exceeded
        """
    
    def get(self, block=True, timeout=None):
        """
        Get an item from the queue.
        
        Parameters:
        - block: bool, whether to block if queue is empty
        - timeout: float, maximum time to wait if blocking
        
        Returns:
        Item from the queue
        
        Raises:
        Empty: if queue is empty and non-blocking or timeout exceeded
        """
    
    def put_nowait(self, item):
        """
        Put item without blocking.
        
        Raises:
        Full: if queue is full
        """
    
    def get_nowait(self):
        """
        Get item without blocking.
        
        Returns:
        Item from queue
        
        Raises:
        Empty: if queue is empty
        """
    
    def task_done(self):
        """
        Indicate that a formerly enqueued task is complete.
        """
    
    def join(self):
        """
        Block until all items in the queue have been processed.
        """
    
    def qsize(self):
        """
        Return approximate queue size.
        
        Returns:
        int: number of items in queue
        """
    
    def empty(self):
        """
        Check if queue is empty.
        
        Returns:
        bool: True if empty, False otherwise
        """
    
    def full(self):
        """
        Check if queue is full.
        
        Returns:
        bool: True if full, False otherwise
        """

class PriorityQueue(Queue):
    """
    A queue where items are ordered by priority (lowest first).
    Items should be comparable or tuples of (priority, item).
    """

class LifoQueue(Queue):
    """
    A Last-In-First-Out (LIFO) queue implementation.
    """

class LightQueue:
    """
    A faster queue implementation without task_done() and join() support.
    """
    
    def __init__(self, maxsize=0):
        """
        Create a light queue.
        
        Parameters:
        - maxsize: int, maximum queue size (0 = unlimited)
        """
    
    def put(self, item, block=True, timeout=None):
        """Put an item into the queue."""
    
    def get(self, block=True, timeout=None):
        """Get an item from the queue."""

Timeout Management

Context managers and utilities for implementing timeouts in greenthread operations.

class Timeout:
    """
    A timeout context manager that raises an exception after a delay.
    """
    
    def __init__(self, seconds=None, exception=None):
        """
        Create a timeout.
        
        Parameters:
        - seconds: float, timeout duration in seconds
        - exception: exception instance to raise (default: Timeout)
        """
    
    def __enter__(self):
        """
        Start the timeout context.
        
        Returns:
        self
        """
    
    def __exit__(self, typ, value, tb):
        """
        Exit the timeout context and cancel timeout if active.
        
        Returns:
        bool: False to propagate exceptions, True to suppress timeout
        """
    
    def cancel(self):
        """
        Cancel the timeout if it hasn't triggered yet.
        """

def with_timeout(seconds, func, *args, **kwargs):
    """
    Call func with timeout protection.
    
    Parameters:
    - seconds: float, timeout duration
    - func: callable to invoke
    - *args: positional arguments for func
    - **kwargs: keyword arguments for func
    
    Returns:
    Return value of func
    
    Raises:
    Timeout: if func doesn't complete within timeout
    """

def is_timeout(obj):
    """
    Check if an object is a timeout exception.
    
    Parameters:
    - obj: object to check
    
    Returns:
    bool: True if obj is timeout-related exception
    """

def wrap_is_timeout(base_exception):
    """
    Add is_timeout=True attribute to exceptions created by base function.
    
    Parameters:
    - base_exception: exception class or instance
    
    Returns:
    Wrapped exception with is_timeout attribute
    """

Usage Examples

import eventlet

# Event synchronization example
event = eventlet.Event()

def waiter():
    print("Waiting for event...")
    result = event.wait()
    print(f"Got result: {result}")

def sender():
    eventlet.sleep(2)  # Simulate work
    event.send("Hello from sender!")

eventlet.spawn(waiter)
eventlet.spawn(sender)

# Semaphore example
sem = eventlet.Semaphore(2)  # Allow 2 concurrent accesses

def worker(name):
    with sem:  # Acquire on enter, release on exit
        print(f"Worker {name} accessing resource")
        eventlet.sleep(1)
        print(f"Worker {name} done")

for i in range(5):
    eventlet.spawn(worker, i)

# Queue example
q = eventlet.Queue()

def producer():
    for i in range(5):
        q.put(f"item {i}")
        eventlet.sleep(0.1)

def consumer():
    while True:
        item = q.get()
        print(f"Got: {item}")
        q.task_done()

eventlet.spawn(producer)
eventlet.spawn(consumer)
q.join()  # Wait for all tasks to complete

# Timeout example
try:
    with eventlet.Timeout(5.0):
        # This will timeout after 5 seconds
        eventlet.sleep(10)
except eventlet.Timeout:
    print("Operation timed out!")

# Function with timeout
try:
    result = eventlet.with_timeout(2.0, slow_function, arg1, arg2)
except eventlet.Timeout:
    print("Function call timed out!")

Exception Types

class Full(Exception):
    """Raised when a queue is full and put() is called with block=False."""
    pass

class Empty(Exception):
    """Raised when a queue is empty and get() is called with block=False."""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-eventlet

docs

core-concurrency.md

debugging.md

green-stdlib.md

index.md

monkey-patching.md

networking.md

resource-pooling.md

synchronization.md

thread-pools.md

web-server.md

tile.json