0
# Queues
1
2
Message passing and data structures for greenlet communication including FIFO, LIFO, and priority queues with cooperative blocking behavior. These queues provide thread-safe communication channels between greenlets.
3
4
## Capabilities
5
6
### Standard Queue
7
8
FIFO queue with optional size limits and task tracking.
9
10
```python { .api }
11
class Queue:
12
"""
13
FIFO queue for passing data between greenlets.
14
"""
15
16
def __init__(self, maxsize=None, items=None, unfinished_tasks=None):
17
"""
18
Create a queue.
19
20
Parameters:
21
- maxsize: int, maximum queue size (None for unlimited)
22
- items: iterable, initial items for queue
23
- unfinished_tasks: int, initial unfinished task count
24
"""
25
26
def put(self, item, block=True, timeout=None):
27
"""
28
Put item into queue.
29
30
Parameters:
31
- item: object to add to queue
32
- block: bool, whether to block if queue is full
33
- timeout: float, maximum time to wait if blocking
34
35
Raises:
36
Full: if queue is full and block=False or timeout expires
37
"""
38
39
def get(self, block=True, timeout=None):
40
"""
41
Remove and return item from queue.
42
43
Parameters:
44
- block: bool, whether to block if queue is empty
45
- timeout: float, maximum time to wait if blocking
46
47
Returns:
48
Item from queue
49
50
Raises:
51
Empty: if queue is empty and block=False or timeout expires
52
"""
53
54
def put_nowait(self, item):
55
"""
56
Put item without blocking.
57
58
Parameters:
59
- item: object to add to queue
60
61
Raises:
62
Full: if queue is full
63
"""
64
65
def get_nowait(self):
66
"""
67
Get item without blocking.
68
69
Returns:
70
Item from queue
71
72
Raises:
73
Empty: if queue is empty
74
"""
75
76
def empty(self) -> bool:
77
"""
78
Check if queue is empty.
79
80
Returns:
81
bool, True if queue is empty
82
"""
83
84
def full(self) -> bool:
85
"""
86
Check if queue is full.
87
88
Returns:
89
bool, True if queue is full
90
"""
91
92
def qsize(self) -> int:
93
"""
94
Get approximate queue size.
95
96
Returns:
97
int, number of items in queue
98
"""
99
100
def task_done(self):
101
"""
102
Mark a task as done.
103
104
Returns:
105
None
106
107
Raises:
108
ValueError: if called more times than items were placed
109
"""
110
111
def join(self, timeout=None):
112
"""
113
Wait for all tasks to be marked done.
114
115
Parameters:
116
- timeout: float, maximum time to wait
117
118
Returns:
119
None
120
"""
121
122
# Legacy alias
123
JoinableQueue = Queue
124
```
125
126
### Simple Queue
127
128
Unbounded FIFO queue with simpler interface.
129
130
```python { .api }
131
class SimpleQueue:
132
"""
133
Unbounded FIFO queue with simple interface.
134
"""
135
136
def put(self, item, block=True, timeout=None):
137
"""
138
Put item into queue.
139
140
Parameters:
141
- item: object to add
142
- block: bool, ignored (always succeeds)
143
- timeout: float, ignored
144
145
Returns:
146
None
147
"""
148
149
def get(self, block=True, timeout=None):
150
"""
151
Get item from queue.
152
153
Parameters:
154
- block: bool, whether to block if empty
155
- timeout: float, maximum time to wait
156
157
Returns:
158
Item from queue
159
"""
160
161
def empty(self) -> bool:
162
"""
163
Check if queue is empty.
164
165
Returns:
166
bool, True if empty
167
"""
168
169
def qsize(self) -> int:
170
"""
171
Get queue size.
172
173
Returns:
174
int, number of items
175
"""
176
```
177
178
### Priority Queue
179
180
Queue where items are retrieved in priority order.
181
182
```python { .api }
183
class PriorityQueue(Queue):
184
"""
185
Priority queue where lowest valued entries are retrieved first.
186
"""
187
188
def put(self, item, block=True, timeout=None):
189
"""
190
Put item into priority queue.
191
192
Parameters:
193
- item: (priority, data) tuple or comparable object
194
- block: bool, whether to block if full
195
- timeout: float, maximum time to wait
196
197
Returns:
198
None
199
"""
200
201
def get(self, block=True, timeout=None):
202
"""
203
Get highest priority item.
204
205
Parameters:
206
- block: bool, whether to block if empty
207
- timeout: float, maximum time to wait
208
209
Returns:
210
Highest priority item
211
"""
212
```
213
214
### LIFO Queue
215
216
Last-in-first-out queue (stack).
217
218
```python { .api }
219
class LifoQueue(Queue):
220
"""
221
LIFO queue (stack) where last item put is first retrieved.
222
"""
223
224
def put(self, item, block=True, timeout=None):
225
"""
226
Put item onto stack.
227
228
Parameters:
229
- item: object to add
230
- block: bool, whether to block if full
231
- timeout: float, maximum time to wait
232
233
Returns:
234
None
235
"""
236
237
def get(self, block=True, timeout=None):
238
"""
239
Get most recent item from stack.
240
241
Parameters:
242
- block: bool, whether to block if empty
243
- timeout: float, maximum time to wait
244
245
Returns:
246
Most recently added item
247
"""
248
```
249
250
### Channel
251
252
Synchronous queue for CSP-style communication.
253
254
```python { .api }
255
class Channel:
256
"""
257
Synchronous channel for CSP-style communication.
258
"""
259
260
def put(self, item, block=True, timeout=None):
261
"""
262
Send item through channel.
263
264
Parameters:
265
- item: object to send
266
- block: bool, whether to block until received
267
- timeout: float, maximum time to wait
268
269
Returns:
270
None
271
"""
272
273
def get(self, block=True, timeout=None):
274
"""
275
Receive item from channel.
276
277
Parameters:
278
- block: bool, whether to block until available
279
- timeout: float, maximum time to wait
280
281
Returns:
282
Item from channel
283
"""
284
```
285
286
### Queue Exceptions
287
288
```python { .api }
289
class Empty(Exception):
290
"""Exception raised when queue is empty."""
291
292
class Full(Exception):
293
"""Exception raised when queue is full."""
294
295
class ShutDown(Exception):
296
"""Exception raised when queue is shut down."""
297
```
298
299
## Usage Examples
300
301
### Producer-Consumer Pattern
302
303
```python
304
import gevent
305
from gevent import queue
306
307
# Create a queue
308
work_queue = queue.Queue()
309
310
def producer(name, count):
311
for i in range(count):
312
item = f"{name}-item-{i}"
313
print(f"Producing {item}")
314
work_queue.put(item)
315
gevent.sleep(0.1) # Simulate work
316
317
print(f"Producer {name} finished")
318
319
def consumer(name):
320
while True:
321
try:
322
item = work_queue.get(timeout=2)
323
print(f"Consumer {name} processing {item}")
324
gevent.sleep(0.2) # Simulate processing
325
work_queue.task_done()
326
except queue.Empty:
327
print(f"Consumer {name} timed out, exiting")
328
break
329
330
# Start producers and consumers
331
greenlets = [
332
gevent.spawn(producer, "P1", 5),
333
gevent.spawn(producer, "P2", 3),
334
gevent.spawn(consumer, "C1"),
335
gevent.spawn(consumer, "C2"),
336
]
337
338
# Wait for producers to finish
339
gevent.joinall(greenlets[:2])
340
341
# Wait for all work to be processed
342
work_queue.join()
343
344
# Kill consumers
345
gevent.killall(greenlets[2:])
346
```
347
348
### Priority Queue Example
349
350
```python
351
import gevent
352
from gevent import queue
353
354
# Create priority queue
355
pq = queue.PriorityQueue()
356
357
def add_tasks():
358
# Add tasks with priorities (lower number = higher priority)
359
tasks = [
360
(1, "High priority task"),
361
(3, "Low priority task"),
362
(2, "Medium priority task"),
363
(1, "Another high priority task"),
364
]
365
366
for priority, task in tasks:
367
pq.put((priority, task))
368
print(f"Added: {task} (priority {priority})")
369
370
def process_tasks():
371
while not pq.empty():
372
priority, task = pq.get()
373
print(f"Processing: {task} (priority {priority})")
374
gevent.sleep(0.5)
375
376
gevent.joinall([
377
gevent.spawn(add_tasks),
378
gevent.spawn(process_tasks)
379
])
380
```
381
382
### Channel Communication
383
384
```python
385
import gevent
386
from gevent import queue
387
388
def sender(ch, values):
389
for value in values:
390
print(f"Sending: {value}")
391
ch.put(value) # Blocks until receiver gets it
392
print(f"Sent: {value}")
393
394
def receiver(ch, count):
395
for _ in range(count):
396
value = ch.get() # Blocks until sender puts something
397
print(f"Received: {value}")
398
gevent.sleep(0.5) # Simulate processing
399
400
# Create synchronous channel
401
channel = queue.Channel()
402
403
# Start sender and receiver
404
gevent.joinall([
405
gevent.spawn(sender, channel, ['A', 'B', 'C']),
406
gevent.spawn(receiver, channel, 3)
407
])
408
```
409
410
### Queue with Size Limits
411
412
```python
413
import gevent
414
from gevent import queue
415
416
# Create bounded queue
417
bounded_queue = queue.Queue(maxsize=2)
418
419
def fast_producer():
420
for i in range(10):
421
try:
422
item = f"item-{i}"
423
print(f"Trying to put {item}")
424
bounded_queue.put(item, timeout=1)
425
print(f"Put {item}")
426
except queue.Full:
427
print(f"Queue full, couldn't put item-{i}")
428
429
def slow_consumer():
430
while True:
431
try:
432
item = bounded_queue.get(timeout=3)
433
print(f"Got {item}")
434
gevent.sleep(2) # Slow processing
435
except queue.Empty:
436
print("No more items, consumer exiting")
437
break
438
439
gevent.joinall([
440
gevent.spawn(fast_producer),
441
gevent.spawn(slow_consumer)
442
])
443
```