or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-queuelib

Collection of persistent (disk-based) and non-persistent (memory-based) queues

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/queuelib@1.8.x

To install, run

npx @tessl/cli install tessl/pypi-queuelib@1.8.0

index.mddocs/

Queuelib

A Python library that implements object collections which are stored in memory or persisted to disk, providing a simple API and fast performance. Queuelib provides collections for FIFO queues, LIFO stacks, priority queues, and round-robin queues with support for both memory-based and disk-based storage.

Package Information

  • Package Name: queuelib
  • Language: Python
  • Installation: pip install queuelib
  • Python Version: 3.9+
  • Dependencies: None (zero dependencies)

Core Imports

Main queue implementations:

from queuelib import FifoDiskQueue, LifoDiskQueue, PriorityQueue, RoundRobinQueue

Additional queue implementations (from submodules):

from queuelib.queue import FifoMemoryQueue, LifoMemoryQueue, FifoSQLiteQueue, LifoSQLiteQueue, BaseQueue

Basic Usage

from queuelib import FifoDiskQueue, PriorityQueue

# Basic FIFO disk queue usage
queue = FifoDiskQueue("./queue-data")
queue.push(b'hello')
queue.push(b'world')
item = queue.pop()  # Returns b'hello'
queue.close()

# Priority queue with disk-based storage
def queue_factory(priority):
    return FifoDiskQueue(f"./priority-queue-{priority}")

pq = PriorityQueue(queue_factory)
pq.push(b'low priority', 5)
pq.push(b'high priority', 1)
pq.push(b'medium priority', 3)

item = pq.pop()  # Returns b'high priority' (priority 1)
pq.close()

Architecture

Queuelib implements a modular queue architecture:

  • Base Interface: All queues implement push/pop/peek/close/len methods
  • Memory Queues: Fast in-memory implementations using Python deque
  • Disk Queues: Persistent file-based storage with chunk management
  • SQLite Queues: Database-backed persistent storage
  • Composite Queues: Priority and round-robin queues built on internal queue instances

All persistent queues automatically handle cleanup when empty and provide crash-safe storage through proper file management and metadata tracking.

Capabilities

FIFO Disk Queue

Persistent first-in-first-out queue stored on disk with automatic chunk management for efficient large queue handling.

class FifoDiskQueue:
    def __init__(self, path: str | os.PathLike[str], chunksize: int = 100000):
        """
        Create a persistent FIFO queue.
        
        Parameters:
        - path: Directory path for queue storage
        - chunksize: Number of items per chunk file (default 100000)
        """
    
    def push(self, string: bytes) -> None:
        """
        Add bytes object to the end of the queue.
        
        Parameters:
        - string: Bytes object to add to queue
        
        Raises:
        - TypeError: If string is not bytes
        """
    
    def pop(self) -> bytes | None:
        """
        Remove and return bytes object from front of queue.
        
        Returns:
        - bytes: Object from front of queue, None if empty
        """
    
    def peek(self) -> bytes | None:
        """
        Return bytes object from front of queue without removing.
        
        Returns:
        - bytes: Object from front of queue, None if empty
        """
    
    def close(self) -> None:
        """Close queue and save state, cleanup if empty."""
    
    def __len__(self) -> int:
        """Return number of items in queue."""

LIFO Disk Queue

Persistent last-in-first-out queue (stack) stored on disk in a single file.

class LifoDiskQueue:
    def __init__(self, path: str | os.PathLike[str]):
        """
        Create a persistent LIFO queue.
        
        Parameters:
        - path: File path for queue storage
        """
    
    def push(self, string: bytes) -> None:
        """
        Add bytes object to the end of the queue.
        
        Parameters:
        - string: Bytes object to add to queue
        
        Raises:
        - TypeError: If string is not bytes
        """
    
    def pop(self) -> bytes | None:
        """
        Remove and return bytes object from end of queue.
        
        Returns:
        - bytes: Object from end of queue, None if empty
        """
    
    def peek(self) -> bytes | None:
        """
        Return bytes object from end of queue without removing.
        
        Returns:
        - bytes: Object from end of queue, None if empty
        """
    
    def close(self) -> None:
        """Close queue, cleanup if empty."""
    
    def __len__(self) -> int:
        """Return number of items in queue."""

Priority Queue

A priority queue implemented using multiple internal queues, one per priority level. Lower numbers indicate higher priorities.

class PriorityQueue:
    def __init__(self, qfactory: Callable[[int], BaseQueue], startprios: Iterable[int] = ()):
        """
        Create a priority queue using internal queues.
        
        Parameters:
        - qfactory: Callable that creates queue instance for given priority
        - startprios: Sequence of priorities to start with (for queue restoration)
        """
    
    def push(self, obj: Any, priority: int = 0) -> None:
        """
        Add object with specified priority.
        
        Parameters:
        - obj: Object to add to queue
        - priority: Priority level (lower numbers = higher priority, default 0)
        """
    
    def pop(self) -> Any | None:
        """
        Remove and return highest priority object.
        
        Returns:
        - Any: Highest priority object, None if empty
        """
    
    def peek(self) -> Any | None:
        """
        Return highest priority object without removing.
        
        Returns:
        - Any: Highest priority object, None if empty
        """
    
    def close(self) -> list[int]:
        """
        Close all internal queues.
        
        Returns:
        - list[int]: List of priorities that had non-empty queues
        """
    
    def __len__(self) -> int:
        """Return total number of items across all priorities."""

Round Robin Queue

A round-robin queue that cycles through keys when popping items, ensuring fair distribution of processing across different keys.

class RoundRobinQueue:
    def __init__(self, qfactory: Callable[[Hashable], BaseQueue], start_domains: Iterable[Hashable] = ()):
        """
        Create a round-robin queue using internal queues.
        
        Parameters:
        - qfactory: Callable that creates queue instance for given key
        - start_domains: Sequence of domains/keys to start with (for queue restoration)
        """
    
    def push(self, obj: Any, key: Hashable) -> None:
        """
        Add object associated with specified key.
        
        Parameters:
        - obj: Object to add to queue
        - key: Hashable key to associate object with
        """
    
    def pop(self) -> Any | None:
        """
        Remove and return object using round-robin key selection.
        
        Returns:
        - Any: Object from next key in rotation, None if empty
        """
    
    def peek(self) -> Any | None:
        """
        Return object from current key without removing.
        
        Returns:
        - Any: Object from current key, None if empty
        """
    
    def close(self) -> list[Hashable]:
        """
        Close all internal queues.
        
        Returns:
        - list[Hashable]: List of keys that had non-empty queues
        """
    
    def __len__(self) -> int:
        """Return total number of items across all keys."""

Memory Queues

Fast in-memory queue implementations for temporary storage needs.

class FifoMemoryQueue:
    def __init__(self):
        """Create an in-memory FIFO queue."""
    
    def push(self, obj: Any) -> None:
        """Add object to end of queue."""
    
    def pop(self) -> Any | None:
        """Remove and return object from front of queue."""
    
    def peek(self) -> Any | None:
        """Return object from front without removing."""
    
    def close(self) -> None:
        """Close queue (no-op for memory queue)."""
    
    def __len__(self) -> int:
        """Return number of items in queue."""

class LifoMemoryQueue(FifoMemoryQueue):
    def pop(self) -> Any | None:
        """Remove and return object from end of queue."""
    
    def peek(self) -> Any | None:
        """Return object from end without removing."""

SQLite Queues

Database-backed persistent queues using SQLite for storage.

class FifoSQLiteQueue:
    def __init__(self, path: str | os.PathLike[str]):
        """
        Create a persistent FIFO queue using SQLite.
        
        Parameters:
        - path: File path for SQLite database
        """
    
    def push(self, item: bytes) -> None:
        """
        Add bytes object to queue.
        
        Parameters:
        - item: Bytes object to add
        
        Raises:
        - TypeError: If item is not bytes
        """
    
    def pop(self) -> bytes | None:
        """Remove and return bytes object from front of queue."""
    
    def peek(self) -> bytes | None:
        """Return bytes object from front without removing."""
    
    def close(self) -> None:
        """Close database connection, cleanup if empty."""
    
    def __len__(self) -> int:
        """Return number of items in queue."""

class LifoSQLiteQueue(FifoSQLiteQueue):
    """
    LIFO SQLite queue that inherits from FifoSQLiteQueue but overrides
    the SQL query to implement last-in-first-out ordering.
    
    Overrides:
    - _sql_pop: Uses "ORDER BY id DESC" to get the most recently added item
    """

Base Queue Interface

Abstract base class defining the standard queue interface.

class BaseQueue:
    def push(self, obj: Any) -> None:
        """Add object to queue."""
    
    def pop(self) -> Any | None:
        """Remove and return object from queue."""
    
    def peek(self) -> Any | None:
        """Return object without removing."""
    
    def __len__(self) -> int:
        """Return queue size."""
    
    def close(self) -> None:
        """Close queue and cleanup resources."""

Types

from typing import Any, Callable, Hashable, Iterable
import os

# Type aliases for clarity
BaseQueue = Any  # Queue implementing the base interface

Usage Examples

Priority Queue with Disk Storage

from queuelib import PriorityQueue, FifoDiskQueue

def create_disk_queue(priority):
    return FifoDiskQueue(f"./queue-priority-{priority}")

pq = PriorityQueue(create_disk_queue)

# Add items with different priorities
pq.push(b'urgent task', 1)
pq.push(b'normal task', 5)
pq.push(b'high priority task', 2)

# Process items by priority
while len(pq) > 0:
    task = pq.pop()
    print(f"Processing: {task}")

# Get remaining priorities when closing
remaining_priorities = pq.close()

Round Robin Processing

from queuelib import RoundRobinQueue, FifoMemoryQueue

def create_memory_queue(domain):
    return FifoMemoryQueue()

rr = RoundRobinQueue(create_memory_queue)

# Add requests for different domains
rr.push(b'request1', 'domain1.com')
rr.push(b'request2', 'domain1.com')
rr.push(b'request3', 'domain2.com')
rr.push(b'request4', 'domain2.com')

# Process requests in round-robin fashion
while len(rr) > 0:
    request = rr.pop()
    print(f"Processing: {request}")
    # Output alternates between domains for fair processing

remaining_domains = rr.close()

Persistent Queue with Error Handling

from queuelib import FifoDiskQueue
import os

try:
    queue = FifoDiskQueue("./data/queue")
    
    # Only bytes objects are accepted
    queue.push(b'valid data')
    
    # This will raise TypeError
    try:
        queue.push('string data')  # TypeError: Unsupported type: str
    except TypeError as e:
        print(f"Error: {e}")
    
    # Safe queue operations
    item = queue.pop()
    if item is not None:
        print(f"Got item: {item}")
    
    # Peek without removing
    next_item = queue.peek()
    if next_item is not None:
        print(f"Next item: {next_item}")
    
finally:
    queue.close()  # Always close to save state and cleanup

Error Handling

Common exceptions and error conditions:

  • TypeError: Raised when pushing non-bytes objects to disk or SQLite queues
  • OSError/IOError: May occur with disk-based queues during filesystem operations
  • sqlite3.Error: May occur with SQLite queues during database operations

Thread Safety

Important: Queuelib collections are not thread-safe. Use external synchronization mechanisms when accessing queues from multiple threads.

Performance Characteristics

  • Memory queues: Fastest, O(1) operations, no persistence
  • Disk queues: Slower, persistent, efficient for large datasets with chunk management
  • SQLite queues: Moderate speed, persistent, ACID guarantees
  • Priority/Round-robin queues: Performance depends on internal queue implementation