or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-janus

Mixed sync-async queue to interoperate between asyncio tasks and classic threads

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/janus@2.0.x

To install, run

npx @tessl/cli install tessl/pypi-janus@2.0.0

0

# Janus

1

2

Mixed sync-async queue to interoperate between asyncio tasks and classic threads. Like the Roman god Janus with two faces, each queue instance provides both synchronous and asynchronous interfaces, enabling seamless communication between traditional threaded code and modern asyncio-based applications.

3

4

## Package Information

5

6

- **Package Name**: janus

7

- **Language**: Python

8

- **Installation**: `pip install janus`

9

- **Python Version**: 3.9+

10

11

### Python Version Compatibility

12

13

- **Python 3.9-3.12**: Full compatibility with custom exception classes

14

- **Python 3.13+**: Uses built-in `queue.ShutDown` and `asyncio.QueueShutDown` exceptions

15

- **Python 3.10.0**: Contains a specific workaround for an asyncio bug in this version

16

- **Behavior differences**: Event loop binding occurs at different times depending on Python version

17

18

## Core Imports

19

20

```python

21

import janus

22

```

23

24

Common patterns:

25

26

```python

27

from janus import Queue, PriorityQueue, LifoQueue

28

```

29

30

Alternative individual imports:

31

32

```python

33

import janus

34

# All exports available via janus.Queue, janus.SyncQueueEmpty, etc.

35

```

36

37

For type hints:

38

39

```python

40

from janus import SyncQueue, AsyncQueue, BaseQueue

41

```

42

43

For exceptions:

44

45

```python

46

from janus import (

47

SyncQueueEmpty, SyncQueueFull, SyncQueueShutDown,

48

AsyncQueueEmpty, AsyncQueueFull, AsyncQueueShutDown

49

)

50

```

51

52

## Basic Usage

53

54

```python

55

import asyncio

56

import janus

57

58

59

def threaded_producer(sync_q: janus.SyncQueue[int]) -> None:

60

"""Synchronous producer running in a thread"""

61

for i in range(100):

62

sync_q.put(i)

63

sync_q.join()

64

65

66

async def async_consumer(async_q: janus.AsyncQueue[int]) -> None:

67

"""Asynchronous consumer running in the event loop"""

68

for i in range(100):

69

value = await async_q.get()

70

print(f"Consumed: {value}")

71

async_q.task_done()

72

73

74

async def main() -> None:

75

# Create a mixed queue

76

queue: janus.Queue[int] = janus.Queue()

77

78

# Start threaded producer

79

loop = asyncio.get_running_loop()

80

producer_task = loop.run_in_executor(None, threaded_producer, queue.sync_q)

81

82

# Run async consumer

83

await async_consumer(queue.async_q)

84

85

# Wait for producer to complete

86

await producer_task

87

88

# Properly close the queue

89

await queue.aclose()

90

91

92

if __name__ == "__main__":

93

asyncio.run(main())

94

```

95

96

## Architecture

97

98

Janus implements a dual-interface design where each queue instance maintains both synchronous and asynchronous views:

99

100

- **Queue**: Main container with internal state and thread synchronization

101

- **SyncQueueProxy**: Synchronous interface compatible with Python's standard `queue` module

102

- **AsyncQueueProxy**: Asynchronous interface compatible with `asyncio.Queue`

103

- **Thread Safety**: Uses threading locks and asyncio primitives for safe cross-thread communication

104

- **Event Loop Binding**: Automatically binds to the running event loop for async operations

105

106

### Event Loop Binding

107

108

Queues automatically bind to the currently running event loop when first accessed:

109

110

- On Python 3.10+: Event loop binding occurs on first async operation

111

- On Python < 3.10: Event loop binding occurs during queue initialization

112

- **Important**: Once bound, a queue cannot be used with a different event loop - this will raise `RuntimeError`

113

- Each queue instance is tied to a specific event loop for its entire lifetime

114

115

### Thread Safety Model

116

117

The library uses a dual-locking mechanism:

118

119

- **Synchronous locks**: `threading.Lock` and `threading.Condition` for sync operations

120

- **Asynchronous locks**: `asyncio.Lock` and `asyncio.Condition` for async operations

121

- Cross-thread notifications use `loop.call_soon_threadsafe()` to safely communicate between threads and the event loop

122

123

## Capabilities

124

125

### Queue Creation

126

127

Create mixed sync-async queues with different ordering behaviors.

128

129

```python { .api }

130

class Queue(Generic[T]):

131

def __init__(self, maxsize: int = 0) -> None:

132

"""

133

Create a FIFO queue.

134

135

Args:

136

maxsize: Maximum queue size (0 = unlimited)

137

"""

138

139

class PriorityQueue(Queue[T]):

140

def __init__(self, maxsize: int = 0) -> None:

141

"""

142

Create a priority queue (lowest priority first).

143

144

Args:

145

maxsize: Maximum queue size (0 = unlimited)

146

147

Note:

148

Items should be tuples of (priority, data)

149

"""

150

151

class LifoQueue(Queue[T]):

152

def __init__(self, maxsize: int = 0) -> None:

153

"""

154

Create a LIFO (stack) queue.

155

156

Args:

157

maxsize: Maximum queue size (0 = unlimited)

158

"""

159

```

160

161

### Queue Properties

162

163

Access queue metadata and interfaces.

164

165

```python { .api }

166

@property

167

def maxsize(self) -> int:

168

"""Maximum queue size (0 = unlimited)"""

169

170

@property

171

def closed(self) -> bool:

172

"""Whether the queue is shutdown and all operations complete"""

173

174

@property

175

def sync_q(self) -> SyncQueue[T]:

176

"""Synchronous interface compatible with standard queue module"""

177

178

@property

179

def async_q(self) -> AsyncQueue[T]:

180

"""Asynchronous interface compatible with asyncio.Queue"""

181

```

182

183

### Queue Lifecycle

184

185

Manage queue shutdown and cleanup.

186

187

```python { .api }

188

def shutdown(self, immediate: bool = False) -> None:

189

"""

190

Shut down the queue, making gets and puts raise exceptions.

191

192

Args:

193

immediate: If True, immediately mark remaining items as done

194

"""

195

196

def close(self) -> None:

197

"""Close the queue (shortcut for shutdown(immediate=True))"""

198

199

async def aclose(self) -> None:

200

"""Async close and wait for all operations to complete"""

201

202

async def wait_closed(self) -> None:

203

"""Wait for all pending operations to complete"""

204

```

205

206

### Synchronous Interface

207

208

Thread-safe synchronous operations compatible with standard queue module.

209

210

```python { .api }

211

class SyncQueue(BaseQueue[T], Protocol[T]):

212

def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None:

213

"""

214

Put item into queue.

215

216

Args:

217

item: Item to put

218

block: Whether to block if queue is full

219

timeout: Maximum time to wait (None = forever)

220

221

Raises:

222

SyncQueueFull: If queue is full and block=False or timeout exceeded

223

SyncQueueShutDown: If queue is shutdown

224

"""

225

226

def get(self, block: bool = True, timeout: OptFloat = None) -> T:

227

"""

228

Remove and return item from queue.

229

230

Args:

231

block: Whether to block if queue is empty

232

timeout: Maximum time to wait (None = forever)

233

234

Returns:

235

Item from queue

236

237

Raises:

238

SyncQueueEmpty: If queue is empty and block=False or timeout exceeded

239

SyncQueueShutDown: If queue is shutdown

240

"""

241

242

def put_nowait(self, item: T) -> None:

243

"""

244

Put item without blocking.

245

246

Args:

247

item: Item to put

248

249

Raises:

250

SyncQueueFull: If queue is full

251

"""

252

253

def get_nowait(self) -> T:

254

"""

255

Get item without blocking.

256

257

Returns:

258

Item from queue

259

260

Raises:

261

SyncQueueEmpty: If queue is empty

262

"""

263

264

def join(self) -> None:

265

"""Block until all items have been processed (task_done called)"""

266

267

def task_done(self) -> None:

268

"""

269

Mark a task as done.

270

271

Raises:

272

ValueError: If called more times than items were put

273

"""

274

```

275

276

### Asynchronous Interface

277

278

Async/await operations compatible with asyncio.Queue.

279

280

```python { .api }

281

class AsyncQueue(BaseQueue[T], Protocol[T]):

282

async def put(self, item: T) -> None:

283

"""

284

Put item into queue (async).

285

286

Args:

287

item: Item to put

288

289

Raises:

290

AsyncQueueShutDown: If queue is shutdown

291

"""

292

293

async def get(self) -> T:

294

"""

295

Remove and return item from queue (async).

296

297

Returns:

298

Item from queue

299

300

Raises:

301

AsyncQueueShutDown: If queue is shutdown

302

"""

303

304

def put_nowait(self, item: T) -> None:

305

"""

306

Put item without blocking.

307

308

Args:

309

item: Item to put

310

311

Raises:

312

AsyncQueueFull: If queue is full

313

"""

314

315

def get_nowait(self) -> T:

316

"""

317

Get item without blocking.

318

319

Returns:

320

Item from queue

321

322

Raises:

323

AsyncQueueEmpty: If queue is empty

324

"""

325

326

async def join(self) -> None:

327

"""Wait until all items have been processed (task_done called)"""

328

329

def task_done(self) -> None:

330

"""

331

Mark a task as done.

332

333

Raises:

334

ValueError: If called more times than items were put

335

"""

336

```

337

338

### Common Operations

339

340

Operations available on both interfaces.

341

342

```python { .api }

343

class BaseQueue(Protocol[T]):

344

def qsize(self) -> int:

345

"""Return approximate queue size (not reliable due to threading)"""

346

347

def empty(self) -> bool:

348

"""Return True if queue appears empty (not reliable due to threading)"""

349

350

def full(self) -> bool:

351

"""Return True if queue appears full (not reliable due to threading)"""

352

353

@property

354

def unfinished_tasks(self) -> int:

355

"""Number of items that haven't had task_done() called"""

356

357

def shutdown(self, immediate: bool = False) -> None:

358

"""Shutdown the queue"""

359

```

360

361

## Exception Handling

362

363

### Synchronous Exceptions

364

365

```python { .api }

366

class SyncQueueEmpty(Exception):

367

"""Raised when sync queue get operations fail due to empty queue"""

368

369

class SyncQueueFull(Exception):

370

"""Raised when sync queue put operations fail due to full queue"""

371

372

class SyncQueueShutDown(Exception):

373

"""Raised when operations are attempted on shutdown sync queue

374

375

Note: On Python 3.13+, this is an alias to queue.ShutDown.

376

On earlier versions, this is a custom exception class.

377

"""

378

```

379

380

### Asynchronous Exceptions

381

382

```python { .api }

383

class AsyncQueueEmpty(Exception):

384

"""Raised when async queue get operations fail due to empty queue"""

385

386

class AsyncQueueFull(Exception):

387

"""Raised when async queue put operations fail due to full queue"""

388

389

class AsyncQueueShutDown(Exception):

390

"""Raised when operations are attempted on shutdown async queue

391

392

Note: On Python 3.13+, this is an alias to asyncio.QueueShutDown.

393

On earlier versions, this is a custom exception class.

394

"""

395

```

396

397

## Types

398

399

```python { .api }

400

from typing import Protocol, TypeVar, Optional, Generic

401

402

T = TypeVar('T')

403

OptFloat = Optional[float] # Type alias used throughout the API for optional timeout values

404

405

class BaseQueue(Protocol[T]):

406

"""Base protocol for all queue interfaces"""

407

...

408

409

class SyncQueue(BaseQueue[T], Protocol[T]):

410

"""Protocol for synchronous queue interface"""

411

...

412

413

class AsyncQueue(BaseQueue[T], Protocol[T]):

414

"""Protocol for asynchronous queue interface"""

415

...

416

```

417

418

## Usage Examples

419

420

### Producer-Consumer with Threading

421

422

```python

423

import asyncio

424

import threading

425

import janus

426

427

428

def sync_producer(q: janus.SyncQueue[str]) -> None:

429

for i in range(5):

430

message = f"Message {i}"

431

q.put(message)

432

print(f"Produced: {message}")

433

q.put(None) # Sentinel value

434

435

436

async def async_consumer(q: janus.AsyncQueue[str]) -> None:

437

while True:

438

message = await q.get()

439

if message is None:

440

q.task_done()

441

break

442

print(f"Consumed: {message}")

443

q.task_done()

444

445

446

async def main():

447

queue = janus.Queue[str]()

448

449

# Start producer in thread

450

producer_thread = threading.Thread(

451

target=sync_producer,

452

args=(queue.sync_q,)

453

)

454

producer_thread.start()

455

456

# Consume asynchronously

457

await async_consumer(queue.async_q)

458

459

# Wait for producer thread

460

producer_thread.join()

461

462

# Clean up

463

await queue.aclose()

464

465

asyncio.run(main())

466

```

467

468

### Priority Queue Usage

469

470

```python

471

import asyncio

472

import janus

473

474

475

async def priority_example():

476

pq = janus.PriorityQueue[tuple[int, str]]()

477

478

# Add items with priorities (lower number = higher priority)

479

await pq.async_q.put((3, "Low priority"))

480

await pq.async_q.put((1, "High priority"))

481

await pq.async_q.put((2, "Medium priority"))

482

483

# Items come out in priority order

484

while not pq.async_q.empty():

485

priority, message = await pq.async_q.get()

486

print(f"Priority {priority}: {message}")

487

pq.async_q.task_done()

488

489

await pq.aclose()

490

491

asyncio.run(priority_example())

492

```

493

494

### Error Handling

495

496

```python

497

import asyncio

498

import janus

499

500

501

async def error_handling_example():

502

queue = janus.Queue[int](maxsize=2)

503

504

try:

505

# Fill the queue

506

queue.async_q.put_nowait(1)

507

queue.async_q.put_nowait(2)

508

509

# This will raise AsyncQueueFull

510

queue.async_q.put_nowait(3)

511

except janus.AsyncQueueFull:

512

print("Queue is full!")

513

514

try:

515

# Empty the queue

516

queue.async_q.get_nowait()

517

queue.async_q.get_nowait()

518

519

# This will raise AsyncQueueEmpty

520

queue.async_q.get_nowait()

521

except janus.AsyncQueueEmpty:

522

print("Queue is empty!")

523

524

await queue.aclose()

525

526

asyncio.run(error_handling_example())

527

```