A coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of libev event loop
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Message passing and data structures for greenlet communication including FIFO, LIFO, and priority queues with cooperative blocking behavior. These queues provide thread-safe communication channels between greenlets.
FIFO queue with optional size limits and task tracking.
class Queue:
"""
FIFO queue for passing data between greenlets.
"""
def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
"""
Create a queue.
Parameters:
- maxsize: int, maximum queue size (None for unlimited)
- items: iterable, initial items for queue
- unfinished_tasks: int, initial unfinished task count
"""
def put(self, item, block=True, timeout=None):
"""
Put item into queue.
Parameters:
- item: object to add to queue
- block: bool, whether to block if queue is full
- timeout: float, maximum time to wait if blocking
Raises:
Full: if queue is full and block=False or timeout expires
"""
def get(self, block=True, timeout=None):
"""
Remove and return item from queue.
Parameters:
- block: bool, whether to block if queue is empty
- timeout: float, maximum time to wait if blocking
Returns:
Item from queue
Raises:
Empty: if queue is empty and block=False or timeout expires
"""
def put_nowait(self, item):
"""
Put item without blocking.
Parameters:
- item: object to add to queue
Raises:
Full: if queue is full
"""
def get_nowait(self):
"""
Get item without blocking.
Returns:
Item from queue
Raises:
Empty: if queue is empty
"""
def empty(self) -> bool:
"""
Check if queue is empty.
Returns:
bool, True if queue is empty
"""
def full(self) -> bool:
"""
Check if queue is full.
Returns:
bool, True if queue is full
"""
def qsize(self) -> int:
"""
Get approximate queue size.
Returns:
int, number of items in queue
"""
def task_done(self):
"""
Mark a task as done.
Returns:
None
Raises:
ValueError: if called more times than items were placed
"""
def join(self, timeout=None):
"""
Wait for all tasks to be marked done.
Parameters:
- timeout: float, maximum time to wait
Returns:
None
"""
# Legacy alias
JoinableQueue = QueueUnbounded FIFO queue with simpler interface.
class SimpleQueue:
"""
Unbounded FIFO queue with simple interface.
"""
def put(self, item, block=True, timeout=None):
"""
Put item into queue.
Parameters:
- item: object to add
- block: bool, ignored (always succeeds)
- timeout: float, ignored
Returns:
None
"""
def get(self, block=True, timeout=None):
"""
Get item from queue.
Parameters:
- block: bool, whether to block if empty
- timeout: float, maximum time to wait
Returns:
Item from queue
"""
def empty(self) -> bool:
"""
Check if queue is empty.
Returns:
bool, True if empty
"""
def qsize(self) -> int:
"""
Get queue size.
Returns:
int, number of items
"""Queue where items are retrieved in priority order.
class PriorityQueue(Queue):
"""
Priority queue where lowest valued entries are retrieved first.
"""
def put(self, item, block=True, timeout=None):
"""
Put item into priority queue.
Parameters:
- item: (priority, data) tuple or comparable object
- block: bool, whether to block if full
- timeout: float, maximum time to wait
Returns:
None
"""
def get(self, block=True, timeout=None):
"""
Get highest priority item.
Parameters:
- block: bool, whether to block if empty
- timeout: float, maximum time to wait
Returns:
Highest priority item
"""Last-in-first-out queue (stack).
class LifoQueue(Queue):
"""
LIFO queue (stack) where last item put is first retrieved.
"""
def put(self, item, block=True, timeout=None):
"""
Put item onto stack.
Parameters:
- item: object to add
- block: bool, whether to block if full
- timeout: float, maximum time to wait
Returns:
None
"""
def get(self, block=True, timeout=None):
"""
Get most recent item from stack.
Parameters:
- block: bool, whether to block if empty
- timeout: float, maximum time to wait
Returns:
Most recently added item
"""Synchronous queue for CSP-style communication.
class Channel:
"""
Synchronous channel for CSP-style communication.
"""
def put(self, item, block=True, timeout=None):
"""
Send item through channel.
Parameters:
- item: object to send
- block: bool, whether to block until received
- timeout: float, maximum time to wait
Returns:
None
"""
def get(self, block=True, timeout=None):
"""
Receive item from channel.
Parameters:
- block: bool, whether to block until available
- timeout: float, maximum time to wait
Returns:
Item from channel
"""class Empty(Exception):
"""Exception raised when queue is empty."""
class Full(Exception):
"""Exception raised when queue is full."""
class ShutDown(Exception):
"""Exception raised when queue is shut down."""import gevent
from gevent import queue
# Create a queue
work_queue = queue.Queue()
def producer(name, count):
for i in range(count):
item = f"{name}-item-{i}"
print(f"Producing {item}")
work_queue.put(item)
gevent.sleep(0.1) # Simulate work
print(f"Producer {name} finished")
def consumer(name):
while True:
try:
item = work_queue.get(timeout=2)
print(f"Consumer {name} processing {item}")
gevent.sleep(0.2) # Simulate processing
work_queue.task_done()
except queue.Empty:
print(f"Consumer {name} timed out, exiting")
break
# Start producers and consumers
greenlets = [
gevent.spawn(producer, "P1", 5),
gevent.spawn(producer, "P2", 3),
gevent.spawn(consumer, "C1"),
gevent.spawn(consumer, "C2"),
]
# Wait for producers to finish
gevent.joinall(greenlets[:2])
# Wait for all work to be processed
work_queue.join()
# Kill consumers
gevent.killall(greenlets[2:])import gevent
from gevent import queue
# Create priority queue
pq = queue.PriorityQueue()
def add_tasks():
# Add tasks with priorities (lower number = higher priority)
tasks = [
(1, "High priority task"),
(3, "Low priority task"),
(2, "Medium priority task"),
(1, "Another high priority task"),
]
for priority, task in tasks:
pq.put((priority, task))
print(f"Added: {task} (priority {priority})")
def process_tasks():
while not pq.empty():
priority, task = pq.get()
print(f"Processing: {task} (priority {priority})")
gevent.sleep(0.5)
gevent.joinall([
gevent.spawn(add_tasks),
gevent.spawn(process_tasks)
])import gevent
from gevent import queue
def sender(ch, values):
for value in values:
print(f"Sending: {value}")
ch.put(value) # Blocks until receiver gets it
print(f"Sent: {value}")
def receiver(ch, count):
for _ in range(count):
value = ch.get() # Blocks until sender puts something
print(f"Received: {value}")
gevent.sleep(0.5) # Simulate processing
# Create synchronous channel
channel = queue.Channel()
# Start sender and receiver
gevent.joinall([
gevent.spawn(sender, channel, ['A', 'B', 'C']),
gevent.spawn(receiver, channel, 3)
])import gevent
from gevent import queue
# Create bounded queue
bounded_queue = queue.Queue(maxsize=2)
def fast_producer():
for i in range(10):
try:
item = f"item-{i}"
print(f"Trying to put {item}")
bounded_queue.put(item, timeout=1)
print(f"Put {item}")
except queue.Full:
print(f"Queue full, couldn't put item-{i}")
def slow_consumer():
while True:
try:
item = bounded_queue.get(timeout=3)
print(f"Got {item}")
gevent.sleep(2) # Slow processing
except queue.Empty:
print("No more items, consumer exiting")
break
gevent.joinall([
gevent.spawn(fast_producer),
gevent.spawn(slow_consumer)
])Install with Tessl CLI
npx tessl i tessl/pypi-gevent