Collection of persistent (disk-based) and non-persistent (memory-based) queues
npx @tessl/cli install tessl/pypi-queuelib@1.8.0A Python library that implements object collections which are stored in memory or persisted to disk, providing a simple API and fast performance. Queuelib provides collections for FIFO queues, LIFO stacks, priority queues, and round-robin queues with support for both memory-based and disk-based storage.
pip install queuelibMain queue implementations:
from queuelib import FifoDiskQueue, LifoDiskQueue, PriorityQueue, RoundRobinQueueAdditional queue implementations (from submodules):
from queuelib.queue import FifoMemoryQueue, LifoMemoryQueue, FifoSQLiteQueue, LifoSQLiteQueue, BaseQueuefrom queuelib import FifoDiskQueue, PriorityQueue
# Basic FIFO disk queue usage
queue = FifoDiskQueue("./queue-data")
queue.push(b'hello')
queue.push(b'world')
item = queue.pop() # Returns b'hello'
queue.close()
# Priority queue with disk-based storage
def queue_factory(priority):
return FifoDiskQueue(f"./priority-queue-{priority}")
pq = PriorityQueue(queue_factory)
pq.push(b'low priority', 5)
pq.push(b'high priority', 1)
pq.push(b'medium priority', 3)
item = pq.pop() # Returns b'high priority' (priority 1)
pq.close()Queuelib implements a modular queue architecture:
All persistent queues automatically handle cleanup when empty and provide crash-safe storage through proper file management and metadata tracking.
Persistent first-in-first-out queue stored on disk with automatic chunk management for efficient large queue handling.
class FifoDiskQueue:
def __init__(self, path: str | os.PathLike[str], chunksize: int = 100000):
"""
Create a persistent FIFO queue.
Parameters:
- path: Directory path for queue storage
- chunksize: Number of items per chunk file (default 100000)
"""
def push(self, string: bytes) -> None:
"""
Add bytes object to the end of the queue.
Parameters:
- string: Bytes object to add to queue
Raises:
- TypeError: If string is not bytes
"""
def pop(self) -> bytes | None:
"""
Remove and return bytes object from front of queue.
Returns:
- bytes: Object from front of queue, None if empty
"""
def peek(self) -> bytes | None:
"""
Return bytes object from front of queue without removing.
Returns:
- bytes: Object from front of queue, None if empty
"""
def close(self) -> None:
"""Close queue and save state, cleanup if empty."""
def __len__(self) -> int:
"""Return number of items in queue."""Persistent last-in-first-out queue (stack) stored on disk in a single file.
class LifoDiskQueue:
def __init__(self, path: str | os.PathLike[str]):
"""
Create a persistent LIFO queue.
Parameters:
- path: File path for queue storage
"""
def push(self, string: bytes) -> None:
"""
Add bytes object to the end of the queue.
Parameters:
- string: Bytes object to add to queue
Raises:
- TypeError: If string is not bytes
"""
def pop(self) -> bytes | None:
"""
Remove and return bytes object from end of queue.
Returns:
- bytes: Object from end of queue, None if empty
"""
def peek(self) -> bytes | None:
"""
Return bytes object from end of queue without removing.
Returns:
- bytes: Object from end of queue, None if empty
"""
def close(self) -> None:
"""Close queue, cleanup if empty."""
def __len__(self) -> int:
"""Return number of items in queue."""A priority queue implemented using multiple internal queues, one per priority level. Lower numbers indicate higher priorities.
class PriorityQueue:
def __init__(self, qfactory: Callable[[int], BaseQueue], startprios: Iterable[int] = ()):
"""
Create a priority queue using internal queues.
Parameters:
- qfactory: Callable that creates queue instance for given priority
- startprios: Sequence of priorities to start with (for queue restoration)
"""
def push(self, obj: Any, priority: int = 0) -> None:
"""
Add object with specified priority.
Parameters:
- obj: Object to add to queue
- priority: Priority level (lower numbers = higher priority, default 0)
"""
def pop(self) -> Any | None:
"""
Remove and return highest priority object.
Returns:
- Any: Highest priority object, None if empty
"""
def peek(self) -> Any | None:
"""
Return highest priority object without removing.
Returns:
- Any: Highest priority object, None if empty
"""
def close(self) -> list[int]:
"""
Close all internal queues.
Returns:
- list[int]: List of priorities that had non-empty queues
"""
def __len__(self) -> int:
"""Return total number of items across all priorities."""A round-robin queue that cycles through keys when popping items, ensuring fair distribution of processing across different keys.
class RoundRobinQueue:
def __init__(self, qfactory: Callable[[Hashable], BaseQueue], start_domains: Iterable[Hashable] = ()):
"""
Create a round-robin queue using internal queues.
Parameters:
- qfactory: Callable that creates queue instance for given key
- start_domains: Sequence of domains/keys to start with (for queue restoration)
"""
def push(self, obj: Any, key: Hashable) -> None:
"""
Add object associated with specified key.
Parameters:
- obj: Object to add to queue
- key: Hashable key to associate object with
"""
def pop(self) -> Any | None:
"""
Remove and return object using round-robin key selection.
Returns:
- Any: Object from next key in rotation, None if empty
"""
def peek(self) -> Any | None:
"""
Return object from current key without removing.
Returns:
- Any: Object from current key, None if empty
"""
def close(self) -> list[Hashable]:
"""
Close all internal queues.
Returns:
- list[Hashable]: List of keys that had non-empty queues
"""
def __len__(self) -> int:
"""Return total number of items across all keys."""Fast in-memory queue implementations for temporary storage needs.
class FifoMemoryQueue:
def __init__(self):
"""Create an in-memory FIFO queue."""
def push(self, obj: Any) -> None:
"""Add object to end of queue."""
def pop(self) -> Any | None:
"""Remove and return object from front of queue."""
def peek(self) -> Any | None:
"""Return object from front without removing."""
def close(self) -> None:
"""Close queue (no-op for memory queue)."""
def __len__(self) -> int:
"""Return number of items in queue."""
class LifoMemoryQueue(FifoMemoryQueue):
def pop(self) -> Any | None:
"""Remove and return object from end of queue."""
def peek(self) -> Any | None:
"""Return object from end without removing."""Database-backed persistent queues using SQLite for storage.
class FifoSQLiteQueue:
def __init__(self, path: str | os.PathLike[str]):
"""
Create a persistent FIFO queue using SQLite.
Parameters:
- path: File path for SQLite database
"""
def push(self, item: bytes) -> None:
"""
Add bytes object to queue.
Parameters:
- item: Bytes object to add
Raises:
- TypeError: If item is not bytes
"""
def pop(self) -> bytes | None:
"""Remove and return bytes object from front of queue."""
def peek(self) -> bytes | None:
"""Return bytes object from front without removing."""
def close(self) -> None:
"""Close database connection, cleanup if empty."""
def __len__(self) -> int:
"""Return number of items in queue."""
class LifoSQLiteQueue(FifoSQLiteQueue):
"""
LIFO SQLite queue that inherits from FifoSQLiteQueue but overrides
the SQL query to implement last-in-first-out ordering.
Overrides:
- _sql_pop: Uses "ORDER BY id DESC" to get the most recently added item
"""Abstract base class defining the standard queue interface.
class BaseQueue:
def push(self, obj: Any) -> None:
"""Add object to queue."""
def pop(self) -> Any | None:
"""Remove and return object from queue."""
def peek(self) -> Any | None:
"""Return object without removing."""
def __len__(self) -> int:
"""Return queue size."""
def close(self) -> None:
"""Close queue and cleanup resources."""from typing import Any, Callable, Hashable, Iterable
import os
# Type aliases for clarity
BaseQueue = Any # Queue implementing the base interfacefrom queuelib import PriorityQueue, FifoDiskQueue
def create_disk_queue(priority):
return FifoDiskQueue(f"./queue-priority-{priority}")
pq = PriorityQueue(create_disk_queue)
# Add items with different priorities
pq.push(b'urgent task', 1)
pq.push(b'normal task', 5)
pq.push(b'high priority task', 2)
# Process items by priority
while len(pq) > 0:
task = pq.pop()
print(f"Processing: {task}")
# Get remaining priorities when closing
remaining_priorities = pq.close()from queuelib import RoundRobinQueue, FifoMemoryQueue
def create_memory_queue(domain):
return FifoMemoryQueue()
rr = RoundRobinQueue(create_memory_queue)
# Add requests for different domains
rr.push(b'request1', 'domain1.com')
rr.push(b'request2', 'domain1.com')
rr.push(b'request3', 'domain2.com')
rr.push(b'request4', 'domain2.com')
# Process requests in round-robin fashion
while len(rr) > 0:
request = rr.pop()
print(f"Processing: {request}")
# Output alternates between domains for fair processing
remaining_domains = rr.close()from queuelib import FifoDiskQueue
import os
try:
queue = FifoDiskQueue("./data/queue")
# Only bytes objects are accepted
queue.push(b'valid data')
# This will raise TypeError
try:
queue.push('string data') # TypeError: Unsupported type: str
except TypeError as e:
print(f"Error: {e}")
# Safe queue operations
item = queue.pop()
if item is not None:
print(f"Got item: {item}")
# Peek without removing
next_item = queue.peek()
if next_item is not None:
print(f"Next item: {next_item}")
finally:
queue.close() # Always close to save state and cleanupCommon exceptions and error conditions:
Important: Queuelib collections are not thread-safe. Use external synchronization mechanisms when accessing queues from multiple threads.