0
# Queuelib
1
2
A Python library that implements object collections which are stored in memory or persisted to disk, providing a simple API and fast performance. Queuelib provides collections for FIFO queues, LIFO stacks, priority queues, and round-robin queues with support for both memory-based and disk-based storage.
3
4
## Package Information
5
6
- **Package Name**: queuelib
7
- **Language**: Python
8
- **Installation**: `pip install queuelib`
9
- **Python Version**: 3.9+
10
- **Dependencies**: None (zero dependencies)
11
12
## Core Imports
13
14
Main queue implementations:
15
16
```python
17
from queuelib import FifoDiskQueue, LifoDiskQueue, PriorityQueue, RoundRobinQueue
18
```
19
20
Additional queue implementations (from submodules):
21
22
```python
23
from queuelib.queue import FifoMemoryQueue, LifoMemoryQueue, FifoSQLiteQueue, LifoSQLiteQueue, BaseQueue
24
```
25
26
## Basic Usage
27
28
```python
29
from queuelib import FifoDiskQueue, PriorityQueue
30
31
# Basic FIFO disk queue usage
32
queue = FifoDiskQueue("./queue-data")
33
queue.push(b'hello')
34
queue.push(b'world')
35
item = queue.pop() # Returns b'hello'
36
queue.close()
37
38
# Priority queue with disk-based storage
39
def queue_factory(priority):
40
return FifoDiskQueue(f"./priority-queue-{priority}")
41
42
pq = PriorityQueue(queue_factory)
43
pq.push(b'low priority', 5)
44
pq.push(b'high priority', 1)
45
pq.push(b'medium priority', 3)
46
47
item = pq.pop() # Returns b'high priority' (priority 1)
48
pq.close()
49
```
50
51
## Architecture
52
53
Queuelib implements a modular queue architecture:
54
55
- **Base Interface**: All queues implement push/pop/peek/close/__len__ methods
56
- **Memory Queues**: Fast in-memory implementations using Python deque
57
- **Disk Queues**: Persistent file-based storage with chunk management
58
- **SQLite Queues**: Database-backed persistent storage
59
- **Composite Queues**: Priority and round-robin queues built on internal queue instances
60
61
All persistent queues automatically handle cleanup when empty and provide crash-safe storage through proper file management and metadata tracking.
62
63
## Capabilities
64
65
### FIFO Disk Queue
66
67
Persistent first-in-first-out queue stored on disk with automatic chunk management for efficient large queue handling.
68
69
```python { .api }
70
class FifoDiskQueue:
71
def __init__(self, path: str | os.PathLike[str], chunksize: int = 100000):
72
"""
73
Create a persistent FIFO queue.
74
75
Parameters:
76
- path: Directory path for queue storage
77
- chunksize: Number of items per chunk file (default 100000)
78
"""
79
80
def push(self, string: bytes) -> None:
81
"""
82
Add bytes object to the end of the queue.
83
84
Parameters:
85
- string: Bytes object to add to queue
86
87
Raises:
88
- TypeError: If string is not bytes
89
"""
90
91
def pop(self) -> bytes | None:
92
"""
93
Remove and return bytes object from front of queue.
94
95
Returns:
96
- bytes: Object from front of queue, None if empty
97
"""
98
99
def peek(self) -> bytes | None:
100
"""
101
Return bytes object from front of queue without removing.
102
103
Returns:
104
- bytes: Object from front of queue, None if empty
105
"""
106
107
def close(self) -> None:
108
"""Close queue and save state, cleanup if empty."""
109
110
def __len__(self) -> int:
111
"""Return number of items in queue."""
112
```
113
114
### LIFO Disk Queue
115
116
Persistent last-in-first-out queue (stack) stored on disk in a single file.
117
118
```python { .api }
119
class LifoDiskQueue:
120
def __init__(self, path: str | os.PathLike[str]):
121
"""
122
Create a persistent LIFO queue.
123
124
Parameters:
125
- path: File path for queue storage
126
"""
127
128
def push(self, string: bytes) -> None:
129
"""
130
Add bytes object to the end of the queue.
131
132
Parameters:
133
- string: Bytes object to add to queue
134
135
Raises:
136
- TypeError: If string is not bytes
137
"""
138
139
def pop(self) -> bytes | None:
140
"""
141
Remove and return bytes object from end of queue.
142
143
Returns:
144
- bytes: Object from end of queue, None if empty
145
"""
146
147
def peek(self) -> bytes | None:
148
"""
149
Return bytes object from end of queue without removing.
150
151
Returns:
152
- bytes: Object from end of queue, None if empty
153
"""
154
155
def close(self) -> None:
156
"""Close queue, cleanup if empty."""
157
158
def __len__(self) -> int:
159
"""Return number of items in queue."""
160
```
161
162
### Priority Queue
163
164
A priority queue implemented using multiple internal queues, one per priority level. Lower numbers indicate higher priorities.
165
166
```python { .api }
167
class PriorityQueue:
168
def __init__(self, qfactory: Callable[[int], BaseQueue], startprios: Iterable[int] = ()):
169
"""
170
Create a priority queue using internal queues.
171
172
Parameters:
173
- qfactory: Callable that creates queue instance for given priority
174
- startprios: Sequence of priorities to start with (for queue restoration)
175
"""
176
177
def push(self, obj: Any, priority: int = 0) -> None:
178
"""
179
Add object with specified priority.
180
181
Parameters:
182
- obj: Object to add to queue
183
- priority: Priority level (lower numbers = higher priority, default 0)
184
"""
185
186
def pop(self) -> Any | None:
187
"""
188
Remove and return highest priority object.
189
190
Returns:
191
- Any: Highest priority object, None if empty
192
"""
193
194
def peek(self) -> Any | None:
195
"""
196
Return highest priority object without removing.
197
198
Returns:
199
- Any: Highest priority object, None if empty
200
"""
201
202
def close(self) -> list[int]:
203
"""
204
Close all internal queues.
205
206
Returns:
207
- list[int]: List of priorities that had non-empty queues
208
"""
209
210
def __len__(self) -> int:
211
"""Return total number of items across all priorities."""
212
```
213
214
### Round Robin Queue
215
216
A round-robin queue that cycles through keys when popping items, ensuring fair distribution of processing across different keys.
217
218
```python { .api }
219
class RoundRobinQueue:
220
def __init__(self, qfactory: Callable[[Hashable], BaseQueue], start_domains: Iterable[Hashable] = ()):
221
"""
222
Create a round-robin queue using internal queues.
223
224
Parameters:
225
- qfactory: Callable that creates queue instance for given key
226
- start_domains: Sequence of domains/keys to start with (for queue restoration)
227
"""
228
229
def push(self, obj: Any, key: Hashable) -> None:
230
"""
231
Add object associated with specified key.
232
233
Parameters:
234
- obj: Object to add to queue
235
- key: Hashable key to associate object with
236
"""
237
238
def pop(self) -> Any | None:
239
"""
240
Remove and return object using round-robin key selection.
241
242
Returns:
243
- Any: Object from next key in rotation, None if empty
244
"""
245
246
def peek(self) -> Any | None:
247
"""
248
Return object from current key without removing.
249
250
Returns:
251
- Any: Object from current key, None if empty
252
"""
253
254
def close(self) -> list[Hashable]:
255
"""
256
Close all internal queues.
257
258
Returns:
259
- list[Hashable]: List of keys that had non-empty queues
260
"""
261
262
def __len__(self) -> int:
263
"""Return total number of items across all keys."""
264
```
265
266
### Memory Queues
267
268
Fast in-memory queue implementations for temporary storage needs.
269
270
```python { .api }
271
class FifoMemoryQueue:
272
def __init__(self):
273
"""Create an in-memory FIFO queue."""
274
275
def push(self, obj: Any) -> None:
276
"""Add object to end of queue."""
277
278
def pop(self) -> Any | None:
279
"""Remove and return object from front of queue."""
280
281
def peek(self) -> Any | None:
282
"""Return object from front without removing."""
283
284
def close(self) -> None:
285
"""Close queue (no-op for memory queue)."""
286
287
def __len__(self) -> int:
288
"""Return number of items in queue."""
289
290
class LifoMemoryQueue(FifoMemoryQueue):
291
def pop(self) -> Any | None:
292
"""Remove and return object from end of queue."""
293
294
def peek(self) -> Any | None:
295
"""Return object from end without removing."""
296
```
297
298
### SQLite Queues
299
300
Database-backed persistent queues using SQLite for storage.
301
302
```python { .api }
303
class FifoSQLiteQueue:
304
def __init__(self, path: str | os.PathLike[str]):
305
"""
306
Create a persistent FIFO queue using SQLite.
307
308
Parameters:
309
- path: File path for SQLite database
310
"""
311
312
def push(self, item: bytes) -> None:
313
"""
314
Add bytes object to queue.
315
316
Parameters:
317
- item: Bytes object to add
318
319
Raises:
320
- TypeError: If item is not bytes
321
"""
322
323
def pop(self) -> bytes | None:
324
"""Remove and return bytes object from front of queue."""
325
326
def peek(self) -> bytes | None:
327
"""Return bytes object from front without removing."""
328
329
def close(self) -> None:
330
"""Close database connection, cleanup if empty."""
331
332
def __len__(self) -> int:
333
"""Return number of items in queue."""
334
335
class LifoSQLiteQueue(FifoSQLiteQueue):
336
"""
337
LIFO SQLite queue that inherits from FifoSQLiteQueue but overrides
338
the SQL query to implement last-in-first-out ordering.
339
340
Overrides:
341
- _sql_pop: Uses "ORDER BY id DESC" to get the most recently added item
342
"""
343
```
344
345
## Base Queue Interface
346
347
Abstract base class defining the standard queue interface.
348
349
```python { .api }
350
class BaseQueue:
351
def push(self, obj: Any) -> None:
352
"""Add object to queue."""
353
354
def pop(self) -> Any | None:
355
"""Remove and return object from queue."""
356
357
def peek(self) -> Any | None:
358
"""Return object without removing."""
359
360
def __len__(self) -> int:
361
"""Return queue size."""
362
363
def close(self) -> None:
364
"""Close queue and cleanup resources."""
365
```
366
367
## Types
368
369
```python { .api }
370
from typing import Any, Callable, Hashable, Iterable
371
import os
372
373
# Type aliases for clarity
374
BaseQueue = Any # Queue implementing the base interface
375
```
376
377
## Usage Examples
378
379
### Priority Queue with Disk Storage
380
381
```python
382
from queuelib import PriorityQueue, FifoDiskQueue
383
384
def create_disk_queue(priority):
385
return FifoDiskQueue(f"./queue-priority-{priority}")
386
387
pq = PriorityQueue(create_disk_queue)
388
389
# Add items with different priorities
390
pq.push(b'urgent task', 1)
391
pq.push(b'normal task', 5)
392
pq.push(b'high priority task', 2)
393
394
# Process items by priority
395
while len(pq) > 0:
396
task = pq.pop()
397
print(f"Processing: {task}")
398
399
# Get remaining priorities when closing
400
remaining_priorities = pq.close()
401
```
402
403
### Round Robin Processing
404
405
```python
406
from queuelib import RoundRobinQueue, FifoMemoryQueue
407
408
def create_memory_queue(domain):
409
return FifoMemoryQueue()
410
411
rr = RoundRobinQueue(create_memory_queue)
412
413
# Add requests for different domains
414
rr.push(b'request1', 'domain1.com')
415
rr.push(b'request2', 'domain1.com')
416
rr.push(b'request3', 'domain2.com')
417
rr.push(b'request4', 'domain2.com')
418
419
# Process requests in round-robin fashion
420
while len(rr) > 0:
421
request = rr.pop()
422
print(f"Processing: {request}")
423
# Output alternates between domains for fair processing
424
425
remaining_domains = rr.close()
426
```
427
428
### Persistent Queue with Error Handling
429
430
```python
431
from queuelib import FifoDiskQueue
432
import os
433
434
try:
435
queue = FifoDiskQueue("./data/queue")
436
437
# Only bytes objects are accepted
438
queue.push(b'valid data')
439
440
# This will raise TypeError
441
try:
442
queue.push('string data') # TypeError: Unsupported type: str
443
except TypeError as e:
444
print(f"Error: {e}")
445
446
# Safe queue operations
447
item = queue.pop()
448
if item is not None:
449
print(f"Got item: {item}")
450
451
# Peek without removing
452
next_item = queue.peek()
453
if next_item is not None:
454
print(f"Next item: {next_item}")
455
456
finally:
457
queue.close() # Always close to save state and cleanup
458
```
459
460
## Error Handling
461
462
Common exceptions and error conditions:
463
464
- **TypeError**: Raised when pushing non-bytes objects to disk or SQLite queues
465
- **OSError/IOError**: May occur with disk-based queues during filesystem operations
466
- **sqlite3.Error**: May occur with SQLite queues during database operations
467
468
## Thread Safety
469
470
**Important**: Queuelib collections are **not thread-safe**. Use external synchronization mechanisms when accessing queues from multiple threads.
471
472
## Performance Characteristics
473
474
- **Memory queues**: Fastest, O(1) operations, no persistence
475
- **Disk queues**: Slower, persistent, efficient for large datasets with chunk management
476
- **SQLite queues**: Moderate speed, persistent, ACID guarantees
477
- **Priority/Round-robin queues**: Performance depends on internal queue implementation