CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gevent

A coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of libev event loop

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

queues.mddocs/

Queues

Message passing and data structures for greenlet communication including FIFO, LIFO, and priority queues with cooperative blocking behavior. These queues provide thread-safe communication channels between greenlets.

Capabilities

Standard Queue

FIFO queue with optional size limits and task tracking.

class Queue:
    """
    FIFO queue for passing data between greenlets.
    """
    
    def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
        """
        Create a queue.
        
        Parameters:
        - maxsize: int, maximum queue size (None for unlimited)
        - items: iterable, initial items for queue
        - unfinished_tasks: int, initial unfinished task count
        """
    
    def put(self, item, block=True, timeout=None):
        """
        Put item into queue.
        
        Parameters:
        - item: object to add to queue
        - block: bool, whether to block if queue is full
        - timeout: float, maximum time to wait if blocking
        
        Raises:
        Full: if queue is full and block=False or timeout expires
        """
    
    def get(self, block=True, timeout=None):
        """
        Remove and return item from queue.
        
        Parameters:
        - block: bool, whether to block if queue is empty
        - timeout: float, maximum time to wait if blocking
        
        Returns:
        Item from queue
        
        Raises:
        Empty: if queue is empty and block=False or timeout expires
        """
    
    def put_nowait(self, item):
        """
        Put item without blocking.
        
        Parameters:
        - item: object to add to queue
        
        Raises:
        Full: if queue is full
        """
    
    def get_nowait(self):
        """
        Get item without blocking.
        
        Returns:
        Item from queue
        
        Raises:
        Empty: if queue is empty
        """
    
    def empty(self) -> bool:
        """
        Check if queue is empty.
        
        Returns:
        bool, True if queue is empty
        """
    
    def full(self) -> bool:
        """
        Check if queue is full.
        
        Returns:
        bool, True if queue is full
        """
    
    def qsize(self) -> int:
        """
        Get approximate queue size.
        
        Returns:
        int, number of items in queue
        """
    
    def task_done(self):
        """
        Mark a task as done.
        
        Returns:
        None
        
        Raises:
        ValueError: if called more times than items were placed
        """
    
    def join(self, timeout=None):
        """
        Wait for all tasks to be marked done.
        
        Parameters:
        - timeout: float, maximum time to wait
        
        Returns:
        None
        """

# Legacy alias
JoinableQueue = Queue

Simple Queue

Unbounded FIFO queue with simpler interface.

class SimpleQueue:
    """
    Unbounded FIFO queue with simple interface.
    """
    
    def put(self, item, block=True, timeout=None):
        """
        Put item into queue.
        
        Parameters:
        - item: object to add
        - block: bool, ignored (always succeeds)
        - timeout: float, ignored
        
        Returns:
        None
        """
    
    def get(self, block=True, timeout=None):
        """
        Get item from queue.
        
        Parameters:
        - block: bool, whether to block if empty
        - timeout: float, maximum time to wait
        
        Returns:
        Item from queue
        """
    
    def empty(self) -> bool:
        """
        Check if queue is empty.
        
        Returns:
        bool, True if empty
        """
    
    def qsize(self) -> int:
        """
        Get queue size.
        
        Returns:
        int, number of items
        """

Priority Queue

Queue where items are retrieved in priority order.

class PriorityQueue(Queue):
    """
    Priority queue where lowest valued entries are retrieved first.
    """
    
    def put(self, item, block=True, timeout=None):
        """
        Put item into priority queue.
        
        Parameters:
        - item: (priority, data) tuple or comparable object
        - block: bool, whether to block if full
        - timeout: float, maximum time to wait
        
        Returns:
        None
        """
    
    def get(self, block=True, timeout=None):
        """
        Get highest priority item.
        
        Parameters:
        - block: bool, whether to block if empty
        - timeout: float, maximum time to wait
        
        Returns:
        Highest priority item
        """

LIFO Queue

Last-in-first-out queue (stack).

class LifoQueue(Queue):
    """
    LIFO queue (stack) where last item put is first retrieved.
    """
    
    def put(self, item, block=True, timeout=None):
        """
        Put item onto stack.
        
        Parameters:
        - item: object to add
        - block: bool, whether to block if full
        - timeout: float, maximum time to wait
        
        Returns:
        None
        """
    
    def get(self, block=True, timeout=None):
        """
        Get most recent item from stack.
        
        Parameters:
        - block: bool, whether to block if empty
        - timeout: float, maximum time to wait
        
        Returns:
        Most recently added item
        """

Channel

Synchronous queue for CSP-style communication.

class Channel:
    """
    Synchronous channel for CSP-style communication.
    """
    
    def put(self, item, block=True, timeout=None):
        """
        Send item through channel.
        
        Parameters:
        - item: object to send
        - block: bool, whether to block until received
        - timeout: float, maximum time to wait
        
        Returns:
        None
        """
    
    def get(self, block=True, timeout=None):
        """
        Receive item from channel.
        
        Parameters:
        - block: bool, whether to block until available
        - timeout: float, maximum time to wait
        
        Returns:
        Item from channel
        """

Queue Exceptions

class Empty(Exception):
    """Exception raised when queue is empty."""

class Full(Exception):
    """Exception raised when queue is full."""

class ShutDown(Exception):
    """Exception raised when queue is shut down."""

Usage Examples

Producer-Consumer Pattern

import gevent
from gevent import queue

# Create a queue
work_queue = queue.Queue()

def producer(name, count):
    for i in range(count):
        item = f"{name}-item-{i}"
        print(f"Producing {item}")
        work_queue.put(item)
        gevent.sleep(0.1)  # Simulate work
    
    print(f"Producer {name} finished")

def consumer(name):
    while True:
        try:
            item = work_queue.get(timeout=2)
            print(f"Consumer {name} processing {item}")
            gevent.sleep(0.2)  # Simulate processing
            work_queue.task_done()
        except queue.Empty:
            print(f"Consumer {name} timed out, exiting")
            break

# Start producers and consumers
greenlets = [
    gevent.spawn(producer, "P1", 5),
    gevent.spawn(producer, "P2", 3),
    gevent.spawn(consumer, "C1"),
    gevent.spawn(consumer, "C2"),
]

# Wait for producers to finish
gevent.joinall(greenlets[:2])

# Wait for all work to be processed
work_queue.join()

# Kill consumers
gevent.killall(greenlets[2:])

Priority Queue Example

import gevent
from gevent import queue

# Create priority queue
pq = queue.PriorityQueue()

def add_tasks():
    # Add tasks with priorities (lower number = higher priority)
    tasks = [
        (1, "High priority task"),
        (3, "Low priority task"),
        (2, "Medium priority task"),
        (1, "Another high priority task"),
    ]
    
    for priority, task in tasks:
        pq.put((priority, task))
        print(f"Added: {task} (priority {priority})")

def process_tasks():
    while not pq.empty():
        priority, task = pq.get()
        print(f"Processing: {task} (priority {priority})")
        gevent.sleep(0.5)

gevent.joinall([
    gevent.spawn(add_tasks),
    gevent.spawn(process_tasks)
])

Channel Communication

import gevent
from gevent import queue

def sender(ch, values):
    for value in values:
        print(f"Sending: {value}")
        ch.put(value)  # Blocks until receiver gets it
        print(f"Sent: {value}")

def receiver(ch, count):
    for _ in range(count):
        value = ch.get()  # Blocks until sender puts something
        print(f"Received: {value}")
        gevent.sleep(0.5)  # Simulate processing

# Create synchronous channel
channel = queue.Channel()

# Start sender and receiver
gevent.joinall([
    gevent.spawn(sender, channel, ['A', 'B', 'C']),
    gevent.spawn(receiver, channel, 3)
])

Queue with Size Limits

import gevent
from gevent import queue

# Create bounded queue
bounded_queue = queue.Queue(maxsize=2)

def fast_producer():
    for i in range(10):
        try:
            item = f"item-{i}"
            print(f"Trying to put {item}")
            bounded_queue.put(item, timeout=1)
            print(f"Put {item}")
        except queue.Full:
            print(f"Queue full, couldn't put item-{i}")

def slow_consumer():
    while True:
        try:
            item = bounded_queue.get(timeout=3)
            print(f"Got {item}")
            gevent.sleep(2)  # Slow processing
        except queue.Empty:
            print("No more items, consumer exiting")
            break

gevent.joinall([
    gevent.spawn(fast_producer),
    gevent.spawn(slow_consumer)
])

Install with Tessl CLI

npx tessl i tessl/pypi-gevent

docs

core-greenlets.md

index.md

monkey-patching.md

networking.md

pooling.md

queues.md

servers.md

synchronization.md

timeouts.md

tile.json