or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-management.mdindex.mdmanagers.mdprocess-management.mdprocess-pools.mdqueues.mdshared-memory.mdsynchronization.md

queues.mddocs/

0

# Queues

1

2

Thread-safe queues for inter-process communication using pipes, with support for task completion tracking and various queue implementations optimized for different use cases.

3

4

## Capabilities

5

6

### Standard Queue

7

8

Thread-safe FIFO queue implementation using pipes for robust inter-process communication.

9

10

```python { .api }

11

class Queue:

12

"""

13

A thread-safe queue implementation using pipes.

14

"""

15

def __init__(self, maxsize=0, ctx=None):

16

"""

17

Create a queue.

18

19

Parameters:

20

- maxsize: maximum size (0 for unlimited)

21

- ctx: multiprocessing context

22

"""

23

24

def put(self, obj, block=True, timeout=None):

25

"""

26

Put an item into the queue.

27

28

Parameters:

29

- obj: object to put in queue

30

- block: whether to block if queue is full

31

- timeout: timeout in seconds (None for no timeout)

32

33

Raises:

34

- Full: if queue is full and block=False or timeout exceeded

35

"""

36

37

def get(self, block=True, timeout=None):

38

"""

39

Remove and return an item from the queue.

40

41

Parameters:

42

- block: whether to block if queue is empty

43

- timeout: timeout in seconds (None for no timeout)

44

45

Returns:

46

Item from queue

47

48

Raises:

49

- Empty: if queue is empty and block=False or timeout exceeded

50

"""

51

52

def put_nowait(self, obj):

53

"""

54

Equivalent to put(obj, block=False).

55

56

Parameters:

57

- obj: object to put in queue

58

59

Raises:

60

- Full: if queue is full

61

"""

62

63

def get_nowait(self):

64

"""

65

Equivalent to get(block=False).

66

67

Returns:

68

Item from queue

69

70

Raises:

71

- Empty: if queue is empty

72

"""

73

74

def qsize(self) -> int:

75

"""

76

Return approximate size of queue.

77

78

Returns:

79

Approximate number of items in queue

80

"""

81

82

def empty(self) -> bool:

83

"""

84

Return True if queue is empty.

85

86

Returns:

87

True if queue appears empty

88

"""

89

90

def full(self) -> bool:

91

"""

92

Return True if queue is full.

93

94

Returns:

95

True if queue appears full

96

"""

97

98

def close(self):

99

"""

100

Close the queue and prevent further use.

101

"""

102

103

def join_thread(self):

104

"""

105

Join the background thread.

106

"""

107

108

def cancel_join_thread(self):

109

"""

110

Cancel join_thread().

111

"""

112

```

113

114

Usage example:

115

116

```python

117

from billiard import Process, Queue

118

import time

119

import queue

120

121

def producer(q, items):

122

"""Producer process that puts items in queue"""

123

for item in items:

124

print(f"Producing: {item}")

125

q.put(item)

126

time.sleep(0.1)

127

q.put(None) # Sentinel to signal completion

128

129

def consumer(q, consumer_id):

130

"""Consumer process that gets items from queue"""

131

while True:

132

try:

133

item = q.get(timeout=1)

134

if item is None:

135

q.put(None) # Re-queue sentinel for other consumers

136

break

137

print(f"Consumer {consumer_id} consumed: {item}")

138

time.sleep(0.2)

139

except queue.Empty:

140

print(f"Consumer {consumer_id} timed out")

141

break

142

143

if __name__ == '__main__':

144

# Create queue with max size

145

q = Queue(maxsize=5)

146

147

# Check initial state

148

print(f"Queue size: {q.qsize()}")

149

print(f"Queue empty: {q.empty()}")

150

print(f"Queue full: {q.full()}")

151

152

# Start producer and consumers

153

items = list(range(10))

154

processes = [

155

Process(target=producer, args=(q, items)),

156

Process(target=consumer, args=(q, 1)),

157

Process(target=consumer, args=(q, 2))

158

]

159

160

for p in processes:

161

p.start()

162

163

for p in processes:

164

p.join()

165

166

# Clean up

167

q.close()

168

```

169

170

### Joinable Queue

171

172

Queue with task completion tracking, allowing producers to wait for all tasks to be processed.

173

174

```python { .api }

175

class JoinableQueue(Queue):

176

"""

177

A Queue subclass that additionally tracks unfinished tasks.

178

"""

179

def task_done(self):

180

"""

181

Indicate that a formerly enqueued task is complete.

182

183

Used by queue consumers. For each get() used to fetch a task,

184

a subsequent call to task_done() tells the queue that processing

185

is complete.

186

187

Raises:

188

- ValueError: if called more times than there were items in queue

189

"""

190

191

def join(self):

192

"""

193

Block until all items in queue have been gotten and processed.

194

195

The count of unfinished tasks goes up whenever an item is added

196

to the queue and goes down whenever task_done() is called.

197

"""

198

```

199

200

Usage example:

201

202

```python

203

from billiard import Process, JoinableQueue

204

import time

205

206

def worker(q):

207

"""Worker that processes tasks from queue"""

208

while True:

209

item = q.get()

210

if item is None:

211

break

212

213

print(f"Processing {item}...")

214

time.sleep(0.5) # Simulate work

215

print(f"Completed {item}")

216

217

q.task_done() # Mark task as done

218

219

def producer_with_join(q, items):

220

"""Producer that waits for all tasks to complete"""

221

# Add tasks to queue

222

for item in items:

223

print(f"Adding task: {item}")

224

q.put(item)

225

226

# Wait for all tasks to be processed

227

print("Waiting for all tasks to complete...")

228

q.join()

229

print("All tasks completed!")

230

231

# Signal workers to stop

232

q.put(None)

233

q.put(None)

234

235

if __name__ == '__main__':

236

# Create joinable queue

237

q = JoinableQueue()

238

239

# Start workers

240

workers = [

241

Process(target=worker, args=(q,)),

242

Process(target=worker, args=(q,))

243

]

244

245

for w in workers:

246

w.start()

247

248

# Start producer

249

tasks = ['task1', 'task2', 'task3', 'task4', 'task5']

250

producer_process = Process(target=producer_with_join, args=(q, tasks))

251

producer_process.start()

252

producer_process.join()

253

254

# Stop workers

255

for w in workers:

256

w.join()

257

```

258

259

### Simple Queue

260

261

Simplified queue implementation with basic put/get operations and minimal overhead.

262

263

```python { .api }

264

class SimpleQueue:

265

"""

266

A simplified queue implementation.

267

"""

268

def get(self):

269

"""

270

Remove and return an item from the queue (blocks until available).

271

272

Returns:

273

Item from queue

274

"""

275

276

def put(self, obj):

277

"""

278

Put an item into the queue.

279

280

Parameters:

281

- obj: object to put in queue

282

"""

283

284

def empty(self) -> bool:

285

"""

286

Return True if queue is empty.

287

288

Returns:

289

True if queue appears empty

290

"""

291

292

def close(self):

293

"""

294

Close the queue.

295

"""

296

```

297

298

Usage example:

299

300

```python

301

from billiard import Process, SimpleQueue

302

import time

303

304

def simple_worker(q, worker_id):

305

"""Simple worker using SimpleQueue"""

306

while True:

307

if q.empty():

308

time.sleep(0.1)

309

continue

310

311

try:

312

item = q.get()

313

if item is None:

314

break

315

316

print(f"Worker {worker_id} got: {item}")

317

time.sleep(0.2)

318

except:

319

break

320

321

def simple_producer(q, items):

322

"""Simple producer"""

323

for item in items:

324

q.put(item)

325

time.sleep(0.1)

326

327

# Signal completion

328

q.put(None)

329

q.put(None)

330

331

if __name__ == '__main__':

332

# Create simple queue

333

q = SimpleQueue()

334

335

# Start processes

336

processes = [

337

Process(target=simple_producer, args=(q, ['A', 'B', 'C', 'D'])),

338

Process(target=simple_worker, args=(q, 1)),

339

Process(target=simple_worker, args=(q, 2))

340

]

341

342

for p in processes:

343

p.start()

344

345

for p in processes:

346

p.join()

347

348

q.close()

349

```

350

351

### Queue Exceptions

352

353

Exception classes for queue operations.

354

355

```python { .api }

356

class Empty(Exception):

357

"""

358

Exception raised by Queue.get(block=False) when queue is empty.

359

"""

360

361

class Full(Exception):

362

"""

363

Exception raised by Queue.put(block=False) when queue is full.

364

"""

365

```

366

367

Usage example:

368

369

```python

370

from billiard import Queue

371

from billiard.queues import Empty, Full

372

import time

373

374

def handle_queue_exceptions():

375

"""Demonstrate queue exception handling"""

376

q = Queue(maxsize=2)

377

378

try:

379

# Fill the queue

380

q.put("item1")

381

q.put("item2")

382

print("Queue filled")

383

384

# Try to put more (will raise Full)

385

q.put_nowait("item3")

386

except Full:

387

print("Queue is full!")

388

389

try:

390

# Empty the queue

391

print("Item:", q.get_nowait())

392

print("Item:", q.get_nowait())

393

print("Queue emptied")

394

395

# Try to get more (will raise Empty)

396

q.get_nowait()

397

except Empty:

398

print("Queue is empty!")

399

400

# With timeouts

401

try:

402

q.put("timeout_test", timeout=0.1)

403

result = q.get(timeout=0.1)

404

print("Got with timeout:", result)

405

406

# This will timeout

407

q.get(timeout=0.1)

408

except Empty:

409

print("Get operation timed out")

410

411

if __name__ == '__main__':

412

handle_queue_exceptions()

413

```

414

415

## Queue Selection Guidelines

416

417

- **Queue**: Use for general-purpose inter-process communication with flow control (maxsize)

418

- **JoinableQueue**: Use when you need to wait for all queued tasks to be processed

419

- **SimpleQueue**: Use for minimal overhead scenarios where you don't need size limits or advanced features

420

421

All queue types are process-safe and thread-safe, making them suitable for complex producer-consumer scenarios in multiprocessing applications.