0
# Queues
1
2
Thread-safe queues for inter-process communication using pipes, with support for task completion tracking and various queue implementations optimized for different use cases.
3
4
## Capabilities
5
6
### Standard Queue
7
8
Thread-safe FIFO queue implementation using pipes for robust inter-process communication.
9
10
```python { .api }
11
class Queue:
12
"""
13
A thread-safe queue implementation using pipes.
14
"""
15
def __init__(self, maxsize=0, ctx=None):
16
"""
17
Create a queue.
18
19
Parameters:
20
- maxsize: maximum size (0 for unlimited)
21
- ctx: multiprocessing context
22
"""
23
24
def put(self, obj, block=True, timeout=None):
25
"""
26
Put an item into the queue.
27
28
Parameters:
29
- obj: object to put in queue
30
- block: whether to block if queue is full
31
- timeout: timeout in seconds (None for no timeout)
32
33
Raises:
34
- Full: if queue is full and block=False or timeout exceeded
35
"""
36
37
def get(self, block=True, timeout=None):
38
"""
39
Remove and return an item from the queue.
40
41
Parameters:
42
- block: whether to block if queue is empty
43
- timeout: timeout in seconds (None for no timeout)
44
45
Returns:
46
Item from queue
47
48
Raises:
49
- Empty: if queue is empty and block=False or timeout exceeded
50
"""
51
52
def put_nowait(self, obj):
53
"""
54
Equivalent to put(obj, block=False).
55
56
Parameters:
57
- obj: object to put in queue
58
59
Raises:
60
- Full: if queue is full
61
"""
62
63
def get_nowait(self):
64
"""
65
Equivalent to get(block=False).
66
67
Returns:
68
Item from queue
69
70
Raises:
71
- Empty: if queue is empty
72
"""
73
74
def qsize(self) -> int:
75
"""
76
Return approximate size of queue.
77
78
Returns:
79
Approximate number of items in queue
80
"""
81
82
def empty(self) -> bool:
83
"""
84
Return True if queue is empty.
85
86
Returns:
87
True if queue appears empty
88
"""
89
90
def full(self) -> bool:
91
"""
92
Return True if queue is full.
93
94
Returns:
95
True if queue appears full
96
"""
97
98
def close(self):
99
"""
100
Close the queue and prevent further use.
101
"""
102
103
def join_thread(self):
104
"""
105
Join the background thread.
106
"""
107
108
def cancel_join_thread(self):
109
"""
110
Cancel join_thread().
111
"""
112
```
113
114
Usage example:
115
116
```python
117
from billiard import Process, Queue
118
import time
119
import queue
120
121
def producer(q, items):
122
"""Producer process that puts items in queue"""
123
for item in items:
124
print(f"Producing: {item}")
125
q.put(item)
126
time.sleep(0.1)
127
q.put(None) # Sentinel to signal completion
128
129
def consumer(q, consumer_id):
130
"""Consumer process that gets items from queue"""
131
while True:
132
try:
133
item = q.get(timeout=1)
134
if item is None:
135
q.put(None) # Re-queue sentinel for other consumers
136
break
137
print(f"Consumer {consumer_id} consumed: {item}")
138
time.sleep(0.2)
139
except queue.Empty:
140
print(f"Consumer {consumer_id} timed out")
141
break
142
143
if __name__ == '__main__':
144
# Create queue with max size
145
q = Queue(maxsize=5)
146
147
# Check initial state
148
print(f"Queue size: {q.qsize()}")
149
print(f"Queue empty: {q.empty()}")
150
print(f"Queue full: {q.full()}")
151
152
# Start producer and consumers
153
items = list(range(10))
154
processes = [
155
Process(target=producer, args=(q, items)),
156
Process(target=consumer, args=(q, 1)),
157
Process(target=consumer, args=(q, 2))
158
]
159
160
for p in processes:
161
p.start()
162
163
for p in processes:
164
p.join()
165
166
# Clean up
167
q.close()
168
```
169
170
### Joinable Queue
171
172
Queue with task completion tracking, allowing producers to wait for all tasks to be processed.
173
174
```python { .api }
175
class JoinableQueue(Queue):
176
"""
177
A Queue subclass that additionally tracks unfinished tasks.
178
"""
179
def task_done(self):
180
"""
181
Indicate that a formerly enqueued task is complete.
182
183
Used by queue consumers. For each get() used to fetch a task,
184
a subsequent call to task_done() tells the queue that processing
185
is complete.
186
187
Raises:
188
- ValueError: if called more times than there were items in queue
189
"""
190
191
def join(self):
192
"""
193
Block until all items in queue have been gotten and processed.
194
195
The count of unfinished tasks goes up whenever an item is added
196
to the queue and goes down whenever task_done() is called.
197
"""
198
```
199
200
Usage example:
201
202
```python
203
from billiard import Process, JoinableQueue
204
import time
205
206
def worker(q):
207
"""Worker that processes tasks from queue"""
208
while True:
209
item = q.get()
210
if item is None:
211
break
212
213
print(f"Processing {item}...")
214
time.sleep(0.5) # Simulate work
215
print(f"Completed {item}")
216
217
q.task_done() # Mark task as done
218
219
def producer_with_join(q, items):
220
"""Producer that waits for all tasks to complete"""
221
# Add tasks to queue
222
for item in items:
223
print(f"Adding task: {item}")
224
q.put(item)
225
226
# Wait for all tasks to be processed
227
print("Waiting for all tasks to complete...")
228
q.join()
229
print("All tasks completed!")
230
231
# Signal workers to stop
232
q.put(None)
233
q.put(None)
234
235
if __name__ == '__main__':
236
# Create joinable queue
237
q = JoinableQueue()
238
239
# Start workers
240
workers = [
241
Process(target=worker, args=(q,)),
242
Process(target=worker, args=(q,))
243
]
244
245
for w in workers:
246
w.start()
247
248
# Start producer
249
tasks = ['task1', 'task2', 'task3', 'task4', 'task5']
250
producer_process = Process(target=producer_with_join, args=(q, tasks))
251
producer_process.start()
252
producer_process.join()
253
254
# Stop workers
255
for w in workers:
256
w.join()
257
```
258
259
### Simple Queue
260
261
Simplified queue implementation with basic put/get operations and minimal overhead.
262
263
```python { .api }
264
class SimpleQueue:
265
"""
266
A simplified queue implementation.
267
"""
268
def get(self):
269
"""
270
Remove and return an item from the queue (blocks until available).
271
272
Returns:
273
Item from queue
274
"""
275
276
def put(self, obj):
277
"""
278
Put an item into the queue.
279
280
Parameters:
281
- obj: object to put in queue
282
"""
283
284
def empty(self) -> bool:
285
"""
286
Return True if queue is empty.
287
288
Returns:
289
True if queue appears empty
290
"""
291
292
def close(self):
293
"""
294
Close the queue.
295
"""
296
```
297
298
Usage example:
299
300
```python
301
from billiard import Process, SimpleQueue
302
import time
303
304
def simple_worker(q, worker_id):
305
"""Simple worker using SimpleQueue"""
306
while True:
307
if q.empty():
308
time.sleep(0.1)
309
continue
310
311
try:
312
item = q.get()
313
if item is None:
314
break
315
316
print(f"Worker {worker_id} got: {item}")
317
time.sleep(0.2)
318
except:
319
break
320
321
def simple_producer(q, items):
322
"""Simple producer"""
323
for item in items:
324
q.put(item)
325
time.sleep(0.1)
326
327
# Signal completion
328
q.put(None)
329
q.put(None)
330
331
if __name__ == '__main__':
332
# Create simple queue
333
q = SimpleQueue()
334
335
# Start processes
336
processes = [
337
Process(target=simple_producer, args=(q, ['A', 'B', 'C', 'D'])),
338
Process(target=simple_worker, args=(q, 1)),
339
Process(target=simple_worker, args=(q, 2))
340
]
341
342
for p in processes:
343
p.start()
344
345
for p in processes:
346
p.join()
347
348
q.close()
349
```
350
351
### Queue Exceptions
352
353
Exception classes for queue operations.
354
355
```python { .api }
356
class Empty(Exception):
357
"""
358
Exception raised by Queue.get(block=False) when queue is empty.
359
"""
360
361
class Full(Exception):
362
"""
363
Exception raised by Queue.put(block=False) when queue is full.
364
"""
365
```
366
367
Usage example:
368
369
```python
370
from billiard import Queue
371
from billiard.queues import Empty, Full
372
import time
373
374
def handle_queue_exceptions():
375
"""Demonstrate queue exception handling"""
376
q = Queue(maxsize=2)
377
378
try:
379
# Fill the queue
380
q.put("item1")
381
q.put("item2")
382
print("Queue filled")
383
384
# Try to put more (will raise Full)
385
q.put_nowait("item3")
386
except Full:
387
print("Queue is full!")
388
389
try:
390
# Empty the queue
391
print("Item:", q.get_nowait())
392
print("Item:", q.get_nowait())
393
print("Queue emptied")
394
395
# Try to get more (will raise Empty)
396
q.get_nowait()
397
except Empty:
398
print("Queue is empty!")
399
400
# With timeouts
401
try:
402
q.put("timeout_test", timeout=0.1)
403
result = q.get(timeout=0.1)
404
print("Got with timeout:", result)
405
406
# This will timeout
407
q.get(timeout=0.1)
408
except Empty:
409
print("Get operation timed out")
410
411
if __name__ == '__main__':
412
handle_queue_exceptions()
413
```
414
415
## Queue Selection Guidelines
416
417
- **Queue**: Use for general-purpose inter-process communication with flow control (maxsize)
418
- **JoinableQueue**: Use when you need to wait for all queued tasks to be processed
419
- **SimpleQueue**: Use for minimal overhead scenarios where you don't need size limits or advanced features
420
421
All queue types are process-safe and thread-safe, making them suitable for complex producer-consumer scenarios in multiprocessing applications.