or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-greenlets.mdindex.mdmonkey-patching.mdnetworking.mdpooling.mdqueues.mdservers.mdsynchronization.mdtimeouts.md

queues.mddocs/

0

# Queues

1

2

Message passing and data structures for greenlet communication including FIFO, LIFO, and priority queues with cooperative blocking behavior. These queues provide thread-safe communication channels between greenlets.

3

4

## Capabilities

5

6

### Standard Queue

7

8

FIFO queue with optional size limits and task tracking.

9

10

```python { .api }

11

class Queue:

12

"""

13

FIFO queue for passing data between greenlets.

14

"""

15

16

def __init__(self, maxsize=None, items=None, unfinished_tasks=None):

17

"""

18

Create a queue.

19

20

Parameters:

21

- maxsize: int, maximum queue size (None for unlimited)

22

- items: iterable, initial items for queue

23

- unfinished_tasks: int, initial unfinished task count

24

"""

25

26

def put(self, item, block=True, timeout=None):

27

"""

28

Put item into queue.

29

30

Parameters:

31

- item: object to add to queue

32

- block: bool, whether to block if queue is full

33

- timeout: float, maximum time to wait if blocking

34

35

Raises:

36

Full: if queue is full and block=False or timeout expires

37

"""

38

39

def get(self, block=True, timeout=None):

40

"""

41

Remove and return item from queue.

42

43

Parameters:

44

- block: bool, whether to block if queue is empty

45

- timeout: float, maximum time to wait if blocking

46

47

Returns:

48

Item from queue

49

50

Raises:

51

Empty: if queue is empty and block=False or timeout expires

52

"""

53

54

def put_nowait(self, item):

55

"""

56

Put item without blocking.

57

58

Parameters:

59

- item: object to add to queue

60

61

Raises:

62

Full: if queue is full

63

"""

64

65

def get_nowait(self):

66

"""

67

Get item without blocking.

68

69

Returns:

70

Item from queue

71

72

Raises:

73

Empty: if queue is empty

74

"""

75

76

def empty(self) -> bool:

77

"""

78

Check if queue is empty.

79

80

Returns:

81

bool, True if queue is empty

82

"""

83

84

def full(self) -> bool:

85

"""

86

Check if queue is full.

87

88

Returns:

89

bool, True if queue is full

90

"""

91

92

def qsize(self) -> int:

93

"""

94

Get approximate queue size.

95

96

Returns:

97

int, number of items in queue

98

"""

99

100

def task_done(self):

101

"""

102

Mark a task as done.

103

104

Returns:

105

None

106

107

Raises:

108

ValueError: if called more times than items were placed

109

"""

110

111

def join(self, timeout=None):

112

"""

113

Wait for all tasks to be marked done.

114

115

Parameters:

116

- timeout: float, maximum time to wait

117

118

Returns:

119

None

120

"""

121

122

# Legacy alias

123

JoinableQueue = Queue

124

```

125

126

### Simple Queue

127

128

Unbounded FIFO queue with simpler interface.

129

130

```python { .api }

131

class SimpleQueue:

132

"""

133

Unbounded FIFO queue with simple interface.

134

"""

135

136

def put(self, item, block=True, timeout=None):

137

"""

138

Put item into queue.

139

140

Parameters:

141

- item: object to add

142

- block: bool, ignored (always succeeds)

143

- timeout: float, ignored

144

145

Returns:

146

None

147

"""

148

149

def get(self, block=True, timeout=None):

150

"""

151

Get item from queue.

152

153

Parameters:

154

- block: bool, whether to block if empty

155

- timeout: float, maximum time to wait

156

157

Returns:

158

Item from queue

159

"""

160

161

def empty(self) -> bool:

162

"""

163

Check if queue is empty.

164

165

Returns:

166

bool, True if empty

167

"""

168

169

def qsize(self) -> int:

170

"""

171

Get queue size.

172

173

Returns:

174

int, number of items

175

"""

176

```

177

178

### Priority Queue

179

180

Queue where items are retrieved in priority order.

181

182

```python { .api }

183

class PriorityQueue(Queue):

184

"""

185

Priority queue where lowest valued entries are retrieved first.

186

"""

187

188

def put(self, item, block=True, timeout=None):

189

"""

190

Put item into priority queue.

191

192

Parameters:

193

- item: (priority, data) tuple or comparable object

194

- block: bool, whether to block if full

195

- timeout: float, maximum time to wait

196

197

Returns:

198

None

199

"""

200

201

def get(self, block=True, timeout=None):

202

"""

203

Get highest priority item.

204

205

Parameters:

206

- block: bool, whether to block if empty

207

- timeout: float, maximum time to wait

208

209

Returns:

210

Highest priority item

211

"""

212

```

213

214

### LIFO Queue

215

216

Last-in-first-out queue (stack).

217

218

```python { .api }

219

class LifoQueue(Queue):

220

"""

221

LIFO queue (stack) where last item put is first retrieved.

222

"""

223

224

def put(self, item, block=True, timeout=None):

225

"""

226

Put item onto stack.

227

228

Parameters:

229

- item: object to add

230

- block: bool, whether to block if full

231

- timeout: float, maximum time to wait

232

233

Returns:

234

None

235

"""

236

237

def get(self, block=True, timeout=None):

238

"""

239

Get most recent item from stack.

240

241

Parameters:

242

- block: bool, whether to block if empty

243

- timeout: float, maximum time to wait

244

245

Returns:

246

Most recently added item

247

"""

248

```

249

250

### Channel

251

252

Synchronous queue for CSP-style communication.

253

254

```python { .api }

255

class Channel:

256

"""

257

Synchronous channel for CSP-style communication.

258

"""

259

260

def put(self, item, block=True, timeout=None):

261

"""

262

Send item through channel.

263

264

Parameters:

265

- item: object to send

266

- block: bool, whether to block until received

267

- timeout: float, maximum time to wait

268

269

Returns:

270

None

271

"""

272

273

def get(self, block=True, timeout=None):

274

"""

275

Receive item from channel.

276

277

Parameters:

278

- block: bool, whether to block until available

279

- timeout: float, maximum time to wait

280

281

Returns:

282

Item from channel

283

"""

284

```

285

286

### Queue Exceptions

287

288

```python { .api }

289

class Empty(Exception):

290

"""Exception raised when queue is empty."""

291

292

class Full(Exception):

293

"""Exception raised when queue is full."""

294

295

class ShutDown(Exception):

296

"""Exception raised when queue is shut down."""

297

```

298

299

## Usage Examples

300

301

### Producer-Consumer Pattern

302

303

```python

304

import gevent

305

from gevent import queue

306

307

# Create a queue

308

work_queue = queue.Queue()

309

310

def producer(name, count):

311

for i in range(count):

312

item = f"{name}-item-{i}"

313

print(f"Producing {item}")

314

work_queue.put(item)

315

gevent.sleep(0.1) # Simulate work

316

317

print(f"Producer {name} finished")

318

319

def consumer(name):

320

while True:

321

try:

322

item = work_queue.get(timeout=2)

323

print(f"Consumer {name} processing {item}")

324

gevent.sleep(0.2) # Simulate processing

325

work_queue.task_done()

326

except queue.Empty:

327

print(f"Consumer {name} timed out, exiting")

328

break

329

330

# Start producers and consumers

331

greenlets = [

332

gevent.spawn(producer, "P1", 5),

333

gevent.spawn(producer, "P2", 3),

334

gevent.spawn(consumer, "C1"),

335

gevent.spawn(consumer, "C2"),

336

]

337

338

# Wait for producers to finish

339

gevent.joinall(greenlets[:2])

340

341

# Wait for all work to be processed

342

work_queue.join()

343

344

# Kill consumers

345

gevent.killall(greenlets[2:])

346

```

347

348

### Priority Queue Example

349

350

```python

351

import gevent

352

from gevent import queue

353

354

# Create priority queue

355

pq = queue.PriorityQueue()

356

357

def add_tasks():

358

# Add tasks with priorities (lower number = higher priority)

359

tasks = [

360

(1, "High priority task"),

361

(3, "Low priority task"),

362

(2, "Medium priority task"),

363

(1, "Another high priority task"),

364

]

365

366

for priority, task in tasks:

367

pq.put((priority, task))

368

print(f"Added: {task} (priority {priority})")

369

370

def process_tasks():

371

while not pq.empty():

372

priority, task = pq.get()

373

print(f"Processing: {task} (priority {priority})")

374

gevent.sleep(0.5)

375

376

gevent.joinall([

377

gevent.spawn(add_tasks),

378

gevent.spawn(process_tasks)

379

])

380

```

381

382

### Channel Communication

383

384

```python

385

import gevent

386

from gevent import queue

387

388

def sender(ch, values):

389

for value in values:

390

print(f"Sending: {value}")

391

ch.put(value) # Blocks until receiver gets it

392

print(f"Sent: {value}")

393

394

def receiver(ch, count):

395

for _ in range(count):

396

value = ch.get() # Blocks until sender puts something

397

print(f"Received: {value}")

398

gevent.sleep(0.5) # Simulate processing

399

400

# Create synchronous channel

401

channel = queue.Channel()

402

403

# Start sender and receiver

404

gevent.joinall([

405

gevent.spawn(sender, channel, ['A', 'B', 'C']),

406

gevent.spawn(receiver, channel, 3)

407

])

408

```

409

410

### Queue with Size Limits

411

412

```python

413

import gevent

414

from gevent import queue

415

416

# Create bounded queue

417

bounded_queue = queue.Queue(maxsize=2)

418

419

def fast_producer():

420

for i in range(10):

421

try:

422

item = f"item-{i}"

423

print(f"Trying to put {item}")

424

bounded_queue.put(item, timeout=1)

425

print(f"Put {item}")

426

except queue.Full:

427

print(f"Queue full, couldn't put item-{i}")

428

429

def slow_consumer():

430

while True:

431

try:

432

item = bounded_queue.get(timeout=3)

433

print(f"Got {item}")

434

gevent.sleep(2) # Slow processing

435

except queue.Empty:

436

print("No more items, consumer exiting")

437

break

438

439

gevent.joinall([

440

gevent.spawn(fast_producer),

441

gevent.spawn(slow_consumer)

442

])

443

```