Mixed sync-async queue to interoperate between asyncio tasks and classic threads
npx @tessl/cli install tessl/pypi-janus@2.0.0Mixed sync-async queue to interoperate between asyncio tasks and classic threads. Like the Roman god Janus with two faces, each queue instance provides both synchronous and asynchronous interfaces, enabling seamless communication between traditional threaded code and modern asyncio-based applications.
pip install janusqueue.ShutDown and asyncio.QueueShutDown exceptionsimport janusCommon patterns:
from janus import Queue, PriorityQueue, LifoQueueAlternative individual imports:
import janus
# All exports available via janus.Queue, janus.SyncQueueEmpty, etc.For type hints:
from janus import SyncQueue, AsyncQueue, BaseQueueFor exceptions:
from janus import (
SyncQueueEmpty, SyncQueueFull, SyncQueueShutDown,
AsyncQueueEmpty, AsyncQueueFull, AsyncQueueShutDown
)import asyncio
import janus
def threaded_producer(sync_q: janus.SyncQueue[int]) -> None:
"""Synchronous producer running in a thread"""
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_consumer(async_q: janus.AsyncQueue[int]) -> None:
"""Asynchronous consumer running in the event loop"""
for i in range(100):
value = await async_q.get()
print(f"Consumed: {value}")
async_q.task_done()
async def main() -> None:
# Create a mixed queue
queue: janus.Queue[int] = janus.Queue()
# Start threaded producer
loop = asyncio.get_running_loop()
producer_task = loop.run_in_executor(None, threaded_producer, queue.sync_q)
# Run async consumer
await async_consumer(queue.async_q)
# Wait for producer to complete
await producer_task
# Properly close the queue
await queue.aclose()
if __name__ == "__main__":
asyncio.run(main())Janus implements a dual-interface design where each queue instance maintains both synchronous and asynchronous views:
queue moduleasyncio.QueueQueues automatically bind to the currently running event loop when first accessed:
RuntimeErrorThe library uses a dual-locking mechanism:
threading.Lock and threading.Condition for sync operationsasyncio.Lock and asyncio.Condition for async operationsloop.call_soon_threadsafe() to safely communicate between threads and the event loopCreate mixed sync-async queues with different ordering behaviors.
class Queue(Generic[T]):
def __init__(self, maxsize: int = 0) -> None:
"""
Create a FIFO queue.
Args:
maxsize: Maximum queue size (0 = unlimited)
"""
class PriorityQueue(Queue[T]):
def __init__(self, maxsize: int = 0) -> None:
"""
Create a priority queue (lowest priority first).
Args:
maxsize: Maximum queue size (0 = unlimited)
Note:
Items should be tuples of (priority, data)
"""
class LifoQueue(Queue[T]):
def __init__(self, maxsize: int = 0) -> None:
"""
Create a LIFO (stack) queue.
Args:
maxsize: Maximum queue size (0 = unlimited)
"""Access queue metadata and interfaces.
@property
def maxsize(self) -> int:
"""Maximum queue size (0 = unlimited)"""
@property
def closed(self) -> bool:
"""Whether the queue is shutdown and all operations complete"""
@property
def sync_q(self) -> SyncQueue[T]:
"""Synchronous interface compatible with standard queue module"""
@property
def async_q(self) -> AsyncQueue[T]:
"""Asynchronous interface compatible with asyncio.Queue"""Manage queue shutdown and cleanup.
def shutdown(self, immediate: bool = False) -> None:
"""
Shut down the queue, making gets and puts raise exceptions.
Args:
immediate: If True, immediately mark remaining items as done
"""
def close(self) -> None:
"""Close the queue (shortcut for shutdown(immediate=True))"""
async def aclose(self) -> None:
"""Async close and wait for all operations to complete"""
async def wait_closed(self) -> None:
"""Wait for all pending operations to complete"""Thread-safe synchronous operations compatible with standard queue module.
class SyncQueue(BaseQueue[T], Protocol[T]):
def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
"""
Put item into queue.
Args:
item: Item to put
block: Whether to block if queue is full
timeout: Maximum time to wait (None = forever)
Raises:
SyncQueueFull: If queue is full and block=False or timeout exceeded
SyncQueueShutDown: If queue is shutdown
"""
def get(self, block: bool = True, timeout: OptFloat = None) -> T:
"""
Remove and return item from queue.
Args:
block: Whether to block if queue is empty
timeout: Maximum time to wait (None = forever)
Returns:
Item from queue
Raises:
SyncQueueEmpty: If queue is empty and block=False or timeout exceeded
SyncQueueShutDown: If queue is shutdown
"""
def put_nowait(self, item: T) -> None:
"""
Put item without blocking.
Args:
item: Item to put
Raises:
SyncQueueFull: If queue is full
"""
def get_nowait(self) -> T:
"""
Get item without blocking.
Returns:
Item from queue
Raises:
SyncQueueEmpty: If queue is empty
"""
def join(self) -> None:
"""Block until all items have been processed (task_done called)"""
def task_done(self) -> None:
"""
Mark a task as done.
Raises:
ValueError: If called more times than items were put
"""Async/await operations compatible with asyncio.Queue.
class AsyncQueue(BaseQueue[T], Protocol[T]):
async def put(self, item: T) -> None:
"""
Put item into queue (async).
Args:
item: Item to put
Raises:
AsyncQueueShutDown: If queue is shutdown
"""
async def get(self) -> T:
"""
Remove and return item from queue (async).
Returns:
Item from queue
Raises:
AsyncQueueShutDown: If queue is shutdown
"""
def put_nowait(self, item: T) -> None:
"""
Put item without blocking.
Args:
item: Item to put
Raises:
AsyncQueueFull: If queue is full
"""
def get_nowait(self) -> T:
"""
Get item without blocking.
Returns:
Item from queue
Raises:
AsyncQueueEmpty: If queue is empty
"""
async def join(self) -> None:
"""Wait until all items have been processed (task_done called)"""
def task_done(self) -> None:
"""
Mark a task as done.
Raises:
ValueError: If called more times than items were put
"""Operations available on both interfaces.
class BaseQueue(Protocol[T]):
def qsize(self) -> int:
"""Return approximate queue size (not reliable due to threading)"""
def empty(self) -> bool:
"""Return True if queue appears empty (not reliable due to threading)"""
def full(self) -> bool:
"""Return True if queue appears full (not reliable due to threading)"""
@property
def unfinished_tasks(self) -> int:
"""Number of items that haven't had task_done() called"""
def shutdown(self, immediate: bool = False) -> None:
"""Shutdown the queue"""class SyncQueueEmpty(Exception):
"""Raised when sync queue get operations fail due to empty queue"""
class SyncQueueFull(Exception):
"""Raised when sync queue put operations fail due to full queue"""
class SyncQueueShutDown(Exception):
"""Raised when operations are attempted on shutdown sync queue
Note: On Python 3.13+, this is an alias to queue.ShutDown.
On earlier versions, this is a custom exception class.
"""class AsyncQueueEmpty(Exception):
"""Raised when async queue get operations fail due to empty queue"""
class AsyncQueueFull(Exception):
"""Raised when async queue put operations fail due to full queue"""
class AsyncQueueShutDown(Exception):
"""Raised when operations are attempted on shutdown async queue
Note: On Python 3.13+, this is an alias to asyncio.QueueShutDown.
On earlier versions, this is a custom exception class.
"""from typing import Protocol, TypeVar, Optional, Generic
T = TypeVar('T')
OptFloat = Optional[float] # Type alias used throughout the API for optional timeout values
class BaseQueue(Protocol[T]):
"""Base protocol for all queue interfaces"""
...
class SyncQueue(BaseQueue[T], Protocol[T]):
"""Protocol for synchronous queue interface"""
...
class AsyncQueue(BaseQueue[T], Protocol[T]):
"""Protocol for asynchronous queue interface"""
...import asyncio
import threading
import janus
def sync_producer(q: janus.SyncQueue[str]) -> None:
for i in range(5):
message = f"Message {i}"
q.put(message)
print(f"Produced: {message}")
q.put(None) # Sentinel value
async def async_consumer(q: janus.AsyncQueue[str]) -> None:
while True:
message = await q.get()
if message is None:
q.task_done()
break
print(f"Consumed: {message}")
q.task_done()
async def main():
queue = janus.Queue[str]()
# Start producer in thread
producer_thread = threading.Thread(
target=sync_producer,
args=(queue.sync_q,)
)
producer_thread.start()
# Consume asynchronously
await async_consumer(queue.async_q)
# Wait for producer thread
producer_thread.join()
# Clean up
await queue.aclose()
asyncio.run(main())import asyncio
import janus
async def priority_example():
pq = janus.PriorityQueue[tuple[int, str]]()
# Add items with priorities (lower number = higher priority)
await pq.async_q.put((3, "Low priority"))
await pq.async_q.put((1, "High priority"))
await pq.async_q.put((2, "Medium priority"))
# Items come out in priority order
while not pq.async_q.empty():
priority, message = await pq.async_q.get()
print(f"Priority {priority}: {message}")
pq.async_q.task_done()
await pq.aclose()
asyncio.run(priority_example())import asyncio
import janus
async def error_handling_example():
queue = janus.Queue[int](maxsize=2)
try:
# Fill the queue
queue.async_q.put_nowait(1)
queue.async_q.put_nowait(2)
# This will raise AsyncQueueFull
queue.async_q.put_nowait(3)
except janus.AsyncQueueFull:
print("Queue is full!")
try:
# Empty the queue
queue.async_q.get_nowait()
queue.async_q.get_nowait()
# This will raise AsyncQueueEmpty
queue.async_q.get_nowait()
except janus.AsyncQueueEmpty:
print("Queue is empty!")
await queue.aclose()
asyncio.run(error_handling_example())