0
# Janus
1
2
Mixed sync-async queue to interoperate between asyncio tasks and classic threads. Like the Roman god Janus with two faces, each queue instance provides both synchronous and asynchronous interfaces, enabling seamless communication between traditional threaded code and modern asyncio-based applications.
3
4
## Package Information
5
6
- **Package Name**: janus
7
- **Language**: Python
8
- **Installation**: `pip install janus`
9
- **Python Version**: 3.9+
10
11
### Python Version Compatibility
12
13
- **Python 3.9-3.12**: Full compatibility with custom exception classes
14
- **Python 3.13+**: Uses built-in `queue.ShutDown` and `asyncio.QueueShutDown` exceptions
15
- **Python 3.10.0**: Contains a specific workaround for an asyncio bug in this version
16
- **Behavior differences**: Event loop binding occurs at different times depending on Python version
17
18
## Core Imports
19
20
```python
21
import janus
22
```
23
24
Common patterns:
25
26
```python
27
from janus import Queue, PriorityQueue, LifoQueue
28
```
29
30
Alternative individual imports:
31
32
```python
33
import janus
34
# All exports available via janus.Queue, janus.SyncQueueEmpty, etc.
35
```
36
37
For type hints:
38
39
```python
40
from janus import SyncQueue, AsyncQueue, BaseQueue
41
```
42
43
For exceptions:
44
45
```python
46
from janus import (
47
SyncQueueEmpty, SyncQueueFull, SyncQueueShutDown,
48
AsyncQueueEmpty, AsyncQueueFull, AsyncQueueShutDown
49
)
50
```
51
52
## Basic Usage
53
54
```python
55
import asyncio
56
import janus
57
58
59
def threaded_producer(sync_q: janus.SyncQueue[int]) -> None:
60
"""Synchronous producer running in a thread"""
61
for i in range(100):
62
sync_q.put(i)
63
sync_q.join()
64
65
66
async def async_consumer(async_q: janus.AsyncQueue[int]) -> None:
67
"""Asynchronous consumer running in the event loop"""
68
for i in range(100):
69
value = await async_q.get()
70
print(f"Consumed: {value}")
71
async_q.task_done()
72
73
74
async def main() -> None:
75
# Create a mixed queue
76
queue: janus.Queue[int] = janus.Queue()
77
78
# Start threaded producer
79
loop = asyncio.get_running_loop()
80
producer_task = loop.run_in_executor(None, threaded_producer, queue.sync_q)
81
82
# Run async consumer
83
await async_consumer(queue.async_q)
84
85
# Wait for producer to complete
86
await producer_task
87
88
# Properly close the queue
89
await queue.aclose()
90
91
92
if __name__ == "__main__":
93
asyncio.run(main())
94
```
95
96
## Architecture
97
98
Janus implements a dual-interface design where each queue instance maintains both synchronous and asynchronous views:
99
100
- **Queue**: Main container with internal state and thread synchronization
101
- **SyncQueueProxy**: Synchronous interface compatible with Python's standard `queue` module
102
- **AsyncQueueProxy**: Asynchronous interface compatible with `asyncio.Queue`
103
- **Thread Safety**: Uses threading locks and asyncio primitives for safe cross-thread communication
104
- **Event Loop Binding**: Automatically binds to the running event loop for async operations
105
106
### Event Loop Binding
107
108
Queues automatically bind to the currently running event loop when first accessed:
109
110
- On Python 3.10+: Event loop binding occurs on first async operation
111
- On Python < 3.10: Event loop binding occurs during queue initialization
112
- **Important**: Once bound, a queue cannot be used with a different event loop - this will raise `RuntimeError`
113
- Each queue instance is tied to a specific event loop for its entire lifetime
114
115
### Thread Safety Model
116
117
The library uses a dual-locking mechanism:
118
119
- **Synchronous locks**: `threading.Lock` and `threading.Condition` for sync operations
120
- **Asynchronous locks**: `asyncio.Lock` and `asyncio.Condition` for async operations
121
- Cross-thread notifications use `loop.call_soon_threadsafe()` to safely communicate between threads and the event loop
122
123
## Capabilities
124
125
### Queue Creation
126
127
Create mixed sync-async queues with different ordering behaviors.
128
129
```python { .api }
130
class Queue(Generic[T]):
131
def __init__(self, maxsize: int = 0) -> None:
132
"""
133
Create a FIFO queue.
134
135
Args:
136
maxsize: Maximum queue size (0 = unlimited)
137
"""
138
139
class PriorityQueue(Queue[T]):
140
def __init__(self, maxsize: int = 0) -> None:
141
"""
142
Create a priority queue (lowest priority first).
143
144
Args:
145
maxsize: Maximum queue size (0 = unlimited)
146
147
Note:
148
Items should be tuples of (priority, data)
149
"""
150
151
class LifoQueue(Queue[T]):
152
def __init__(self, maxsize: int = 0) -> None:
153
"""
154
Create a LIFO (stack) queue.
155
156
Args:
157
maxsize: Maximum queue size (0 = unlimited)
158
"""
159
```
160
161
### Queue Properties
162
163
Access queue metadata and interfaces.
164
165
```python { .api }
166
@property
167
def maxsize(self) -> int:
168
"""Maximum queue size (0 = unlimited)"""
169
170
@property
171
def closed(self) -> bool:
172
"""Whether the queue is shutdown and all operations complete"""
173
174
@property
175
def sync_q(self) -> SyncQueue[T]:
176
"""Synchronous interface compatible with standard queue module"""
177
178
@property
179
def async_q(self) -> AsyncQueue[T]:
180
"""Asynchronous interface compatible with asyncio.Queue"""
181
```
182
183
### Queue Lifecycle
184
185
Manage queue shutdown and cleanup.
186
187
```python { .api }
188
def shutdown(self, immediate: bool = False) -> None:
189
"""
190
Shut down the queue, making gets and puts raise exceptions.
191
192
Args:
193
immediate: If True, immediately mark remaining items as done
194
"""
195
196
def close(self) -> None:
197
"""Close the queue (shortcut for shutdown(immediate=True))"""
198
199
async def aclose(self) -> None:
200
"""Async close and wait for all operations to complete"""
201
202
async def wait_closed(self) -> None:
203
"""Wait for all pending operations to complete"""
204
```
205
206
### Synchronous Interface
207
208
Thread-safe synchronous operations compatible with standard queue module.
209
210
```python { .api }
211
class SyncQueue(BaseQueue[T], Protocol[T]):
212
def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:
213
"""
214
Put item into queue.
215
216
Args:
217
item: Item to put
218
block: Whether to block if queue is full
219
timeout: Maximum time to wait (None = forever)
220
221
Raises:
222
SyncQueueFull: If queue is full and block=False or timeout exceeded
223
SyncQueueShutDown: If queue is shutdown
224
"""
225
226
def get(self, block: bool = True, timeout: OptFloat = None) -> T:
227
"""
228
Remove and return item from queue.
229
230
Args:
231
block: Whether to block if queue is empty
232
timeout: Maximum time to wait (None = forever)
233
234
Returns:
235
Item from queue
236
237
Raises:
238
SyncQueueEmpty: If queue is empty and block=False or timeout exceeded
239
SyncQueueShutDown: If queue is shutdown
240
"""
241
242
def put_nowait(self, item: T) -> None:
243
"""
244
Put item without blocking.
245
246
Args:
247
item: Item to put
248
249
Raises:
250
SyncQueueFull: If queue is full
251
"""
252
253
def get_nowait(self) -> T:
254
"""
255
Get item without blocking.
256
257
Returns:
258
Item from queue
259
260
Raises:
261
SyncQueueEmpty: If queue is empty
262
"""
263
264
def join(self) -> None:
265
"""Block until all items have been processed (task_done called)"""
266
267
def task_done(self) -> None:
268
"""
269
Mark a task as done.
270
271
Raises:
272
ValueError: If called more times than items were put
273
"""
274
```
275
276
### Asynchronous Interface
277
278
Async/await operations compatible with asyncio.Queue.
279
280
```python { .api }
281
class AsyncQueue(BaseQueue[T], Protocol[T]):
282
async def put(self, item: T) -> None:
283
"""
284
Put item into queue (async).
285
286
Args:
287
item: Item to put
288
289
Raises:
290
AsyncQueueShutDown: If queue is shutdown
291
"""
292
293
async def get(self) -> T:
294
"""
295
Remove and return item from queue (async).
296
297
Returns:
298
Item from queue
299
300
Raises:
301
AsyncQueueShutDown: If queue is shutdown
302
"""
303
304
def put_nowait(self, item: T) -> None:
305
"""
306
Put item without blocking.
307
308
Args:
309
item: Item to put
310
311
Raises:
312
AsyncQueueFull: If queue is full
313
"""
314
315
def get_nowait(self) -> T:
316
"""
317
Get item without blocking.
318
319
Returns:
320
Item from queue
321
322
Raises:
323
AsyncQueueEmpty: If queue is empty
324
"""
325
326
async def join(self) -> None:
327
"""Wait until all items have been processed (task_done called)"""
328
329
def task_done(self) -> None:
330
"""
331
Mark a task as done.
332
333
Raises:
334
ValueError: If called more times than items were put
335
"""
336
```
337
338
### Common Operations
339
340
Operations available on both interfaces.
341
342
```python { .api }
343
class BaseQueue(Protocol[T]):
344
def qsize(self) -> int:
345
"""Return approximate queue size (not reliable due to threading)"""
346
347
def empty(self) -> bool:
348
"""Return True if queue appears empty (not reliable due to threading)"""
349
350
def full(self) -> bool:
351
"""Return True if queue appears full (not reliable due to threading)"""
352
353
@property
354
def unfinished_tasks(self) -> int:
355
"""Number of items that haven't had task_done() called"""
356
357
def shutdown(self, immediate: bool = False) -> None:
358
"""Shutdown the queue"""
359
```
360
361
## Exception Handling
362
363
### Synchronous Exceptions
364
365
```python { .api }
366
class SyncQueueEmpty(Exception):
367
"""Raised when sync queue get operations fail due to empty queue"""
368
369
class SyncQueueFull(Exception):
370
"""Raised when sync queue put operations fail due to full queue"""
371
372
class SyncQueueShutDown(Exception):
373
"""Raised when operations are attempted on shutdown sync queue
374
375
Note: On Python 3.13+, this is an alias to queue.ShutDown.
376
On earlier versions, this is a custom exception class.
377
"""
378
```
379
380
### Asynchronous Exceptions
381
382
```python { .api }
383
class AsyncQueueEmpty(Exception):
384
"""Raised when async queue get operations fail due to empty queue"""
385
386
class AsyncQueueFull(Exception):
387
"""Raised when async queue put operations fail due to full queue"""
388
389
class AsyncQueueShutDown(Exception):
390
"""Raised when operations are attempted on shutdown async queue
391
392
Note: On Python 3.13+, this is an alias to asyncio.QueueShutDown.
393
On earlier versions, this is a custom exception class.
394
"""
395
```
396
397
## Types
398
399
```python { .api }
400
from typing import Protocol, TypeVar, Optional, Generic
401
402
T = TypeVar('T')
403
OptFloat = Optional[float] # Type alias used throughout the API for optional timeout values
404
405
class BaseQueue(Protocol[T]):
406
"""Base protocol for all queue interfaces"""
407
...
408
409
class SyncQueue(BaseQueue[T], Protocol[T]):
410
"""Protocol for synchronous queue interface"""
411
...
412
413
class AsyncQueue(BaseQueue[T], Protocol[T]):
414
"""Protocol for asynchronous queue interface"""
415
...
416
```
417
418
## Usage Examples
419
420
### Producer-Consumer with Threading
421
422
```python
423
import asyncio
424
import threading
425
import janus
426
427
428
def sync_producer(q: janus.SyncQueue[str]) -> None:
429
for i in range(5):
430
message = f"Message {i}"
431
q.put(message)
432
print(f"Produced: {message}")
433
q.put(None) # Sentinel value
434
435
436
async def async_consumer(q: janus.AsyncQueue[str]) -> None:
437
while True:
438
message = await q.get()
439
if message is None:
440
q.task_done()
441
break
442
print(f"Consumed: {message}")
443
q.task_done()
444
445
446
async def main():
447
queue = janus.Queue[str]()
448
449
# Start producer in thread
450
producer_thread = threading.Thread(
451
target=sync_producer,
452
args=(queue.sync_q,)
453
)
454
producer_thread.start()
455
456
# Consume asynchronously
457
await async_consumer(queue.async_q)
458
459
# Wait for producer thread
460
producer_thread.join()
461
462
# Clean up
463
await queue.aclose()
464
465
asyncio.run(main())
466
```
467
468
### Priority Queue Usage
469
470
```python
471
import asyncio
472
import janus
473
474
475
async def priority_example():
476
pq = janus.PriorityQueue[tuple[int, str]]()
477
478
# Add items with priorities (lower number = higher priority)
479
await pq.async_q.put((3, "Low priority"))
480
await pq.async_q.put((1, "High priority"))
481
await pq.async_q.put((2, "Medium priority"))
482
483
# Items come out in priority order
484
while not pq.async_q.empty():
485
priority, message = await pq.async_q.get()
486
print(f"Priority {priority}: {message}")
487
pq.async_q.task_done()
488
489
await pq.aclose()
490
491
asyncio.run(priority_example())
492
```
493
494
### Error Handling
495
496
```python
497
import asyncio
498
import janus
499
500
501
async def error_handling_example():
502
queue = janus.Queue[int](maxsize=2)
503
504
try:
505
# Fill the queue
506
queue.async_q.put_nowait(1)
507
queue.async_q.put_nowait(2)
508
509
# This will raise AsyncQueueFull
510
queue.async_q.put_nowait(3)
511
except janus.AsyncQueueFull:
512
print("Queue is full!")
513
514
try:
515
# Empty the queue
516
queue.async_q.get_nowait()
517
queue.async_q.get_nowait()
518
519
# This will raise AsyncQueueEmpty
520
queue.async_q.get_nowait()
521
except janus.AsyncQueueEmpty:
522
print("Queue is empty!")
523
524
await queue.aclose()
525
526
asyncio.run(error_handling_example())
527
```