or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-janus

Mixed sync-async queue to interoperate between asyncio tasks and classic threads

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/janus@2.0.x

To install, run

npx @tessl/cli install tessl/pypi-janus@2.0.0

index.mddocs/

Janus

Mixed sync-async queue to interoperate between asyncio tasks and classic threads. Like the Roman god Janus with two faces, each queue instance provides both synchronous and asynchronous interfaces, enabling seamless communication between traditional threaded code and modern asyncio-based applications.

Package Information

  • Package Name: janus
  • Language: Python
  • Installation: pip install janus
  • Python Version: 3.9+

Python Version Compatibility

  • Python 3.9-3.12: Full compatibility with custom exception classes
  • Python 3.13+: Uses built-in queue.ShutDown and asyncio.QueueShutDown exceptions
  • Python 3.10.0: Contains a specific workaround for an asyncio bug in this version
  • Behavior differences: Event loop binding occurs at different times depending on Python version

Core Imports

import janus

Common patterns:

from janus import Queue, PriorityQueue, LifoQueue

Alternative individual imports:

import janus
# All exports available via janus.Queue, janus.SyncQueueEmpty, etc.

For type hints:

from janus import SyncQueue, AsyncQueue, BaseQueue

For exceptions:

from janus import (
    SyncQueueEmpty, SyncQueueFull, SyncQueueShutDown,
    AsyncQueueEmpty, AsyncQueueFull, AsyncQueueShutDown
)

Basic Usage

import asyncio
import janus


def threaded_producer(sync_q: janus.SyncQueue[int]) -> None:
    """Synchronous producer running in a thread"""
    for i in range(100):
        sync_q.put(i)
    sync_q.join()


async def async_consumer(async_q: janus.AsyncQueue[int]) -> None:
    """Asynchronous consumer running in the event loop"""
    for i in range(100):
        value = await async_q.get()
        print(f"Consumed: {value}")
        async_q.task_done()


async def main() -> None:
    # Create a mixed queue
    queue: janus.Queue[int] = janus.Queue()
    
    # Start threaded producer
    loop = asyncio.get_running_loop()
    producer_task = loop.run_in_executor(None, threaded_producer, queue.sync_q)
    
    # Run async consumer
    await async_consumer(queue.async_q)
    
    # Wait for producer to complete
    await producer_task
    
    # Properly close the queue
    await queue.aclose()


if __name__ == "__main__":
    asyncio.run(main())

Architecture

Janus implements a dual-interface design where each queue instance maintains both synchronous and asynchronous views:

  • Queue: Main container with internal state and thread synchronization
  • SyncQueueProxy: Synchronous interface compatible with Python's standard queue module
  • AsyncQueueProxy: Asynchronous interface compatible with asyncio.Queue
  • Thread Safety: Uses threading locks and asyncio primitives for safe cross-thread communication
  • Event Loop Binding: Automatically binds to the running event loop for async operations

Event Loop Binding

Queues automatically bind to the currently running event loop when first accessed:

  • On Python 3.10+: Event loop binding occurs on first async operation
  • On Python < 3.10: Event loop binding occurs during queue initialization
  • Important: Once bound, a queue cannot be used with a different event loop - this will raise RuntimeError
  • Each queue instance is tied to a specific event loop for its entire lifetime

Thread Safety Model

The library uses a dual-locking mechanism:

  • Synchronous locks: threading.Lock and threading.Condition for sync operations
  • Asynchronous locks: asyncio.Lock and asyncio.Condition for async operations
  • Cross-thread notifications use loop.call_soon_threadsafe() to safely communicate between threads and the event loop

Capabilities

Queue Creation

Create mixed sync-async queues with different ordering behaviors.

class Queue(Generic[T]):
    def __init__(self, maxsize: int = 0) -> None:
        """
        Create a FIFO queue.
        
        Args:
            maxsize: Maximum queue size (0 = unlimited)
        """

class PriorityQueue(Queue[T]):
    def __init__(self, maxsize: int = 0) -> None:
        """
        Create a priority queue (lowest priority first).
        
        Args:
            maxsize: Maximum queue size (0 = unlimited)
        
        Note:
            Items should be tuples of (priority, data)
        """

class LifoQueue(Queue[T]):
    def __init__(self, maxsize: int = 0) -> None:
        """
        Create a LIFO (stack) queue.
        
        Args:
            maxsize: Maximum queue size (0 = unlimited)
        """

Queue Properties

Access queue metadata and interfaces.

@property
def maxsize(self) -> int:
    """Maximum queue size (0 = unlimited)"""

@property
def closed(self) -> bool:
    """Whether the queue is shutdown and all operations complete"""

@property
def sync_q(self) -> SyncQueue[T]:
    """Synchronous interface compatible with standard queue module"""

@property
def async_q(self) -> AsyncQueue[T]:
    """Asynchronous interface compatible with asyncio.Queue"""

Queue Lifecycle

Manage queue shutdown and cleanup.

def shutdown(self, immediate: bool = False) -> None:
    """
    Shut down the queue, making gets and puts raise exceptions.
    
    Args:
        immediate: If True, immediately mark remaining items as done
    """

def close(self) -> None:
    """Close the queue (shortcut for shutdown(immediate=True))"""

async def aclose(self) -> None:
    """Async close and wait for all operations to complete"""

async def wait_closed(self) -> None:
    """Wait for all pending operations to complete"""

Synchronous Interface

Thread-safe synchronous operations compatible with standard queue module.

class SyncQueue(BaseQueue[T], Protocol[T]):
    def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
        """
        Put item into queue.
        
        Args:
            item: Item to put
            block: Whether to block if queue is full
            timeout: Maximum time to wait (None = forever)
            
        Raises:
            SyncQueueFull: If queue is full and block=False or timeout exceeded
            SyncQueueShutDown: If queue is shutdown
        """
    
    def get(self, block: bool = True, timeout: OptFloat = None) -> T:
        """
        Remove and return item from queue.
        
        Args:
            block: Whether to block if queue is empty
            timeout: Maximum time to wait (None = forever)
            
        Returns:
            Item from queue
            
        Raises:
            SyncQueueEmpty: If queue is empty and block=False or timeout exceeded
            SyncQueueShutDown: If queue is shutdown
        """
    
    def put_nowait(self, item: T) -> None:
        """
        Put item without blocking.
        
        Args:
            item: Item to put
            
        Raises:
            SyncQueueFull: If queue is full
        """
    
    def get_nowait(self) -> T:
        """
        Get item without blocking.
        
        Returns:
            Item from queue
            
        Raises:
            SyncQueueEmpty: If queue is empty
        """
    
    def join(self) -> None:
        """Block until all items have been processed (task_done called)"""
    
    def task_done(self) -> None:
        """
        Mark a task as done.
        
        Raises:
            ValueError: If called more times than items were put
        """

Asynchronous Interface

Async/await operations compatible with asyncio.Queue.

class AsyncQueue(BaseQueue[T], Protocol[T]):
    async def put(self, item: T) -> None:
        """
        Put item into queue (async).
        
        Args:
            item: Item to put
            
        Raises:
            AsyncQueueShutDown: If queue is shutdown
        """
    
    async def get(self) -> T:
        """
        Remove and return item from queue (async).
        
        Returns:
            Item from queue
            
        Raises:
            AsyncQueueShutDown: If queue is shutdown
        """
    
    def put_nowait(self, item: T) -> None:
        """
        Put item without blocking.
        
        Args:
            item: Item to put
            
        Raises:
            AsyncQueueFull: If queue is full
        """
    
    def get_nowait(self) -> T:
        """
        Get item without blocking.
        
        Returns:
            Item from queue
            
        Raises:
            AsyncQueueEmpty: If queue is empty
        """
    
    async def join(self) -> None:
        """Wait until all items have been processed (task_done called)"""
    
    def task_done(self) -> None:
        """
        Mark a task as done.
        
        Raises:
            ValueError: If called more times than items were put
        """

Common Operations

Operations available on both interfaces.

class BaseQueue(Protocol[T]):
    def qsize(self) -> int:
        """Return approximate queue size (not reliable due to threading)"""
    
    def empty(self) -> bool:
        """Return True if queue appears empty (not reliable due to threading)"""
    
    def full(self) -> bool:
        """Return True if queue appears full (not reliable due to threading)"""
    
    @property
    def unfinished_tasks(self) -> int:
        """Number of items that haven't had task_done() called"""
    
    def shutdown(self, immediate: bool = False) -> None:
        """Shutdown the queue"""

Exception Handling

Synchronous Exceptions

class SyncQueueEmpty(Exception):
    """Raised when sync queue get operations fail due to empty queue"""

class SyncQueueFull(Exception):
    """Raised when sync queue put operations fail due to full queue"""

class SyncQueueShutDown(Exception):
    """Raised when operations are attempted on shutdown sync queue
    
    Note: On Python 3.13+, this is an alias to queue.ShutDown.
    On earlier versions, this is a custom exception class.
    """

Asynchronous Exceptions

class AsyncQueueEmpty(Exception):
    """Raised when async queue get operations fail due to empty queue"""

class AsyncQueueFull(Exception):
    """Raised when async queue put operations fail due to full queue"""

class AsyncQueueShutDown(Exception):
    """Raised when operations are attempted on shutdown async queue
    
    Note: On Python 3.13+, this is an alias to asyncio.QueueShutDown.
    On earlier versions, this is a custom exception class.
    """

Types

from typing import Protocol, TypeVar, Optional, Generic

T = TypeVar('T')
OptFloat = Optional[float]  # Type alias used throughout the API for optional timeout values

class BaseQueue(Protocol[T]):
    """Base protocol for all queue interfaces"""
    ...

class SyncQueue(BaseQueue[T], Protocol[T]):
    """Protocol for synchronous queue interface"""
    ...

class AsyncQueue(BaseQueue[T], Protocol[T]):
    """Protocol for asynchronous queue interface"""
    ...

Usage Examples

Producer-Consumer with Threading

import asyncio
import threading
import janus


def sync_producer(q: janus.SyncQueue[str]) -> None:
    for i in range(5):
        message = f"Message {i}"
        q.put(message)
        print(f"Produced: {message}")
    q.put(None)  # Sentinel value


async def async_consumer(q: janus.AsyncQueue[str]) -> None:
    while True:
        message = await q.get()
        if message is None:
            q.task_done()
            break
        print(f"Consumed: {message}")
        q.task_done()


async def main():
    queue = janus.Queue[str]()
    
    # Start producer in thread
    producer_thread = threading.Thread(
        target=sync_producer, 
        args=(queue.sync_q,)
    )
    producer_thread.start()
    
    # Consume asynchronously
    await async_consumer(queue.async_q)
    
    # Wait for producer thread
    producer_thread.join()
    
    # Clean up
    await queue.aclose()

asyncio.run(main())

Priority Queue Usage

import asyncio
import janus


async def priority_example():
    pq = janus.PriorityQueue[tuple[int, str]]()
    
    # Add items with priorities (lower number = higher priority)
    await pq.async_q.put((3, "Low priority"))
    await pq.async_q.put((1, "High priority"))
    await pq.async_q.put((2, "Medium priority"))
    
    # Items come out in priority order
    while not pq.async_q.empty():
        priority, message = await pq.async_q.get()
        print(f"Priority {priority}: {message}")
        pq.async_q.task_done()
    
    await pq.aclose()

asyncio.run(priority_example())

Error Handling

import asyncio
import janus


async def error_handling_example():
    queue = janus.Queue[int](maxsize=2)
    
    try:
        # Fill the queue
        queue.async_q.put_nowait(1)
        queue.async_q.put_nowait(2)
        
        # This will raise AsyncQueueFull
        queue.async_q.put_nowait(3)
    except janus.AsyncQueueFull:
        print("Queue is full!")
    
    try:
        # Empty the queue
        queue.async_q.get_nowait()
        queue.async_q.get_nowait()
        
        # This will raise AsyncQueueEmpty
        queue.async_q.get_nowait()
    except janus.AsyncQueueEmpty:
        print("Queue is empty!")
    
    await queue.aclose()

asyncio.run(error_handling_example())