or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Queuelib

1

2

A Python library that implements object collections which are stored in memory or persisted to disk, providing a simple API and fast performance. Queuelib provides collections for FIFO queues, LIFO stacks, priority queues, and round-robin queues with support for both memory-based and disk-based storage.

3

4

## Package Information

5

6

- **Package Name**: queuelib

7

- **Language**: Python

8

- **Installation**: `pip install queuelib`

9

- **Python Version**: 3.9+

10

- **Dependencies**: None (zero dependencies)

11

12

## Core Imports

13

14

Main queue implementations:

15

16

```python

17

from queuelib import FifoDiskQueue, LifoDiskQueue, PriorityQueue, RoundRobinQueue

18

```

19

20

Additional queue implementations (from submodules):

21

22

```python

23

from queuelib.queue import FifoMemoryQueue, LifoMemoryQueue, FifoSQLiteQueue, LifoSQLiteQueue, BaseQueue

24

```

25

26

## Basic Usage

27

28

```python

29

from queuelib import FifoDiskQueue, PriorityQueue

30

31

# Basic FIFO disk queue usage

32

queue = FifoDiskQueue("./queue-data")

33

queue.push(b'hello')

34

queue.push(b'world')

35

item = queue.pop() # Returns b'hello'

36

queue.close()

37

38

# Priority queue with disk-based storage

39

def queue_factory(priority):

40

return FifoDiskQueue(f"./priority-queue-{priority}")

41

42

pq = PriorityQueue(queue_factory)

43

pq.push(b'low priority', 5)

44

pq.push(b'high priority', 1)

45

pq.push(b'medium priority', 3)

46

47

item = pq.pop() # Returns b'high priority' (priority 1)

48

pq.close()

49

```

50

51

## Architecture

52

53

Queuelib implements a modular queue architecture:

54

55

- **Base Interface**: All queues implement push/pop/peek/close/__len__ methods

56

- **Memory Queues**: Fast in-memory implementations using Python deque

57

- **Disk Queues**: Persistent file-based storage with chunk management

58

- **SQLite Queues**: Database-backed persistent storage

59

- **Composite Queues**: Priority and round-robin queues built on internal queue instances

60

61

All persistent queues automatically handle cleanup when empty and provide crash-safe storage through proper file management and metadata tracking.

62

63

## Capabilities

64

65

### FIFO Disk Queue

66

67

Persistent first-in-first-out queue stored on disk with automatic chunk management for efficient large queue handling.

68

69

```python { .api }

70

class FifoDiskQueue:

71

def __init__(self, path: str | os.PathLike[str], chunksize: int = 100000):

72

"""

73

Create a persistent FIFO queue.

74

75

Parameters:

76

- path: Directory path for queue storage

77

- chunksize: Number of items per chunk file (default 100000)

78

"""

79

80

def push(self, string: bytes) -> None:

81

"""

82

Add bytes object to the end of the queue.

83

84

Parameters:

85

- string: Bytes object to add to queue

86

87

Raises:

88

- TypeError: If string is not bytes

89

"""

90

91

def pop(self) -> bytes | None:

92

"""

93

Remove and return bytes object from front of queue.

94

95

Returns:

96

- bytes: Object from front of queue, None if empty

97

"""

98

99

def peek(self) -> bytes | None:

100

"""

101

Return bytes object from front of queue without removing.

102

103

Returns:

104

- bytes: Object from front of queue, None if empty

105

"""

106

107

def close(self) -> None:

108

"""Close queue and save state, cleanup if empty."""

109

110

def __len__(self) -> int:

111

"""Return number of items in queue."""

112

```

113

114

### LIFO Disk Queue

115

116

Persistent last-in-first-out queue (stack) stored on disk in a single file.

117

118

```python { .api }

119

class LifoDiskQueue:

120

def __init__(self, path: str | os.PathLike[str]):

121

"""

122

Create a persistent LIFO queue.

123

124

Parameters:

125

- path: File path for queue storage

126

"""

127

128

def push(self, string: bytes) -> None:

129

"""

130

Add bytes object to the end of the queue.

131

132

Parameters:

133

- string: Bytes object to add to queue

134

135

Raises:

136

- TypeError: If string is not bytes

137

"""

138

139

def pop(self) -> bytes | None:

140

"""

141

Remove and return bytes object from end of queue.

142

143

Returns:

144

- bytes: Object from end of queue, None if empty

145

"""

146

147

def peek(self) -> bytes | None:

148

"""

149

Return bytes object from end of queue without removing.

150

151

Returns:

152

- bytes: Object from end of queue, None if empty

153

"""

154

155

def close(self) -> None:

156

"""Close queue, cleanup if empty."""

157

158

def __len__(self) -> int:

159

"""Return number of items in queue."""

160

```

161

162

### Priority Queue

163

164

A priority queue implemented using multiple internal queues, one per priority level. Lower numbers indicate higher priorities.

165

166

```python { .api }

167

class PriorityQueue:

168

def __init__(self, qfactory: Callable[[int], BaseQueue], startprios: Iterable[int] = ()):

169

"""

170

Create a priority queue using internal queues.

171

172

Parameters:

173

- qfactory: Callable that creates queue instance for given priority

174

- startprios: Sequence of priorities to start with (for queue restoration)

175

"""

176

177

def push(self, obj: Any, priority: int = 0) -> None:

178

"""

179

Add object with specified priority.

180

181

Parameters:

182

- obj: Object to add to queue

183

- priority: Priority level (lower numbers = higher priority, default 0)

184

"""

185

186

def pop(self) -> Any | None:

187

"""

188

Remove and return highest priority object.

189

190

Returns:

191

- Any: Highest priority object, None if empty

192

"""

193

194

def peek(self) -> Any | None:

195

"""

196

Return highest priority object without removing.

197

198

Returns:

199

- Any: Highest priority object, None if empty

200

"""

201

202

def close(self) -> list[int]:

203

"""

204

Close all internal queues.

205

206

Returns:

207

- list[int]: List of priorities that had non-empty queues

208

"""

209

210

def __len__(self) -> int:

211

"""Return total number of items across all priorities."""

212

```

213

214

### Round Robin Queue

215

216

A round-robin queue that cycles through keys when popping items, ensuring fair distribution of processing across different keys.

217

218

```python { .api }

219

class RoundRobinQueue:

220

def __init__(self, qfactory: Callable[[Hashable], BaseQueue], start_domains: Iterable[Hashable] = ()):

221

"""

222

Create a round-robin queue using internal queues.

223

224

Parameters:

225

- qfactory: Callable that creates queue instance for given key

226

- start_domains: Sequence of domains/keys to start with (for queue restoration)

227

"""

228

229

def push(self, obj: Any, key: Hashable) -> None:

230

"""

231

Add object associated with specified key.

232

233

Parameters:

234

- obj: Object to add to queue

235

- key: Hashable key to associate object with

236

"""

237

238

def pop(self) -> Any | None:

239

"""

240

Remove and return object using round-robin key selection.

241

242

Returns:

243

- Any: Object from next key in rotation, None if empty

244

"""

245

246

def peek(self) -> Any | None:

247

"""

248

Return object from current key without removing.

249

250

Returns:

251

- Any: Object from current key, None if empty

252

"""

253

254

def close(self) -> list[Hashable]:

255

"""

256

Close all internal queues.

257

258

Returns:

259

- list[Hashable]: List of keys that had non-empty queues

260

"""

261

262

def __len__(self) -> int:

263

"""Return total number of items across all keys."""

264

```

265

266

### Memory Queues

267

268

Fast in-memory queue implementations for temporary storage needs.

269

270

```python { .api }

271

class FifoMemoryQueue:

272

def __init__(self):

273

"""Create an in-memory FIFO queue."""

274

275

def push(self, obj: Any) -> None:

276

"""Add object to end of queue."""

277

278

def pop(self) -> Any | None:

279

"""Remove and return object from front of queue."""

280

281

def peek(self) -> Any | None:

282

"""Return object from front without removing."""

283

284

def close(self) -> None:

285

"""Close queue (no-op for memory queue)."""

286

287

def __len__(self) -> int:

288

"""Return number of items in queue."""

289

290

class LifoMemoryQueue(FifoMemoryQueue):

291

def pop(self) -> Any | None:

292

"""Remove and return object from end of queue."""

293

294

def peek(self) -> Any | None:

295

"""Return object from end without removing."""

296

```

297

298

### SQLite Queues

299

300

Database-backed persistent queues using SQLite for storage.

301

302

```python { .api }

303

class FifoSQLiteQueue:

304

def __init__(self, path: str | os.PathLike[str]):

305

"""

306

Create a persistent FIFO queue using SQLite.

307

308

Parameters:

309

- path: File path for SQLite database

310

"""

311

312

def push(self, item: bytes) -> None:

313

"""

314

Add bytes object to queue.

315

316

Parameters:

317

- item: Bytes object to add

318

319

Raises:

320

- TypeError: If item is not bytes

321

"""

322

323

def pop(self) -> bytes | None:

324

"""Remove and return bytes object from front of queue."""

325

326

def peek(self) -> bytes | None:

327

"""Return bytes object from front without removing."""

328

329

def close(self) -> None:

330

"""Close database connection, cleanup if empty."""

331

332

def __len__(self) -> int:

333

"""Return number of items in queue."""

334

335

class LifoSQLiteQueue(FifoSQLiteQueue):

336

"""

337

LIFO SQLite queue that inherits from FifoSQLiteQueue but overrides

338

the SQL query to implement last-in-first-out ordering.

339

340

Overrides:

341

- _sql_pop: Uses "ORDER BY id DESC" to get the most recently added item

342

"""

343

```

344

345

## Base Queue Interface

346

347

Abstract base class defining the standard queue interface.

348

349

```python { .api }

350

class BaseQueue:

351

def push(self, obj: Any) -> None:

352

"""Add object to queue."""

353

354

def pop(self) -> Any | None:

355

"""Remove and return object from queue."""

356

357

def peek(self) -> Any | None:

358

"""Return object without removing."""

359

360

def __len__(self) -> int:

361

"""Return queue size."""

362

363

def close(self) -> None:

364

"""Close queue and cleanup resources."""

365

```

366

367

## Types

368

369

```python { .api }

370

from typing import Any, Callable, Hashable, Iterable

371

import os

372

373

# Type aliases for clarity

374

BaseQueue = Any # Queue implementing the base interface

375

```

376

377

## Usage Examples

378

379

### Priority Queue with Disk Storage

380

381

```python

382

from queuelib import PriorityQueue, FifoDiskQueue

383

384

def create_disk_queue(priority):

385

return FifoDiskQueue(f"./queue-priority-{priority}")

386

387

pq = PriorityQueue(create_disk_queue)

388

389

# Add items with different priorities

390

pq.push(b'urgent task', 1)

391

pq.push(b'normal task', 5)

392

pq.push(b'high priority task', 2)

393

394

# Process items by priority

395

while len(pq) > 0:

396

task = pq.pop()

397

print(f"Processing: {task}")

398

399

# Get remaining priorities when closing

400

remaining_priorities = pq.close()

401

```

402

403

### Round Robin Processing

404

405

```python

406

from queuelib import RoundRobinQueue, FifoMemoryQueue

407

408

def create_memory_queue(domain):

409

return FifoMemoryQueue()

410

411

rr = RoundRobinQueue(create_memory_queue)

412

413

# Add requests for different domains

414

rr.push(b'request1', 'domain1.com')

415

rr.push(b'request2', 'domain1.com')

416

rr.push(b'request3', 'domain2.com')

417

rr.push(b'request4', 'domain2.com')

418

419

# Process requests in round-robin fashion

420

while len(rr) > 0:

421

request = rr.pop()

422

print(f"Processing: {request}")

423

# Output alternates between domains for fair processing

424

425

remaining_domains = rr.close()

426

```

427

428

### Persistent Queue with Error Handling

429

430

```python

431

from queuelib import FifoDiskQueue

432

import os

433

434

try:

435

queue = FifoDiskQueue("./data/queue")

436

437

# Only bytes objects are accepted

438

queue.push(b'valid data')

439

440

# This will raise TypeError

441

try:

442

queue.push('string data') # TypeError: Unsupported type: str

443

except TypeError as e:

444

print(f"Error: {e}")

445

446

# Safe queue operations

447

item = queue.pop()

448

if item is not None:

449

print(f"Got item: {item}")

450

451

# Peek without removing

452

next_item = queue.peek()

453

if next_item is not None:

454

print(f"Next item: {next_item}")

455

456

finally:

457

queue.close() # Always close to save state and cleanup

458

```

459

460

## Error Handling

461

462

Common exceptions and error conditions:

463

464

- **TypeError**: Raised when pushing non-bytes objects to disk or SQLite queues

465

- **OSError/IOError**: May occur with disk-based queues during filesystem operations

466

- **sqlite3.Error**: May occur with SQLite queues during database operations

467

468

## Thread Safety

469

470

**Important**: Queuelib collections are **not thread-safe**. Use external synchronization mechanisms when accessing queues from multiple threads.

471

472

## Performance Characteristics

473

474

- **Memory queues**: Fastest, O(1) operations, no persistence

475

- **Disk queues**: Slower, persistent, efficient for large datasets with chunk management

476

- **SQLite queues**: Moderate speed, persistent, ACID guarantees

477

- **Priority/Round-robin queues**: Performance depends on internal queue implementation