or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mdcollections.mdcore-promises.mdflow-control.mdfunctional.mdindex.mdnodejs.mdpromise-chains.mdproperty-access.mdqueue.mdstate-inspection.md

queue.mddocs/

0

# Queue Operations

1

2

FIFO queue implementation using promises for producer-consumer patterns and asynchronous data streaming.

3

4

## Capabilities

5

6

### Queue Constructor

7

8

Creates a FIFO queue that uses promises for asynchronous data handling.

9

10

```javascript { .api }

11

/**

12

* FIFO queue constructor using promises

13

* @returns New Queue instance

14

*/

15

function Queue();

16

17

// Queue instance methods

18

interface Queue {

19

put(value: any): void;

20

get(): Promise<any>;

21

}

22

```

23

24

**Usage Examples:**

25

26

```javascript

27

const Queue = require("./queue"); // Import from queue.js module (relative path)

28

// OR when installed as npm package:

29

// const Queue = require("./queue"); // or require("q/queue") when installed

30

31

// Create a new queue

32

const messageQueue = new Queue();

33

34

// Producer: Add items to queue

35

messageQueue.put("First message");

36

messageQueue.put("Second message");

37

messageQueue.put("Third message");

38

39

// Consumer: Get items from queue

40

messageQueue.get()

41

.then(message => console.log("Received:", message)); // "First message"

42

43

messageQueue.get()

44

.then(message => console.log("Received:", message)); // "Second message"

45

46

// Queue is FIFO (First In, First Out)

47

messageQueue.get()

48

.then(message => console.log("Received:", message)); // "Third message"

49

50

// Getting from empty queue returns pending promise

51

const emptyPromise = messageQueue.get(); // This will wait

52

53

setTimeout(() => {

54

messageQueue.put("Delayed message");

55

// emptyPromise will now resolve with "Delayed message"

56

}, 1000);

57

58

emptyPromise.then(message => {

59

console.log("Finally received:", message); // "Delayed message"

60

});

61

```

62

63

### Producer-Consumer Patterns

64

65

Implementing producer-consumer patterns with promise-based queues.

66

67

```javascript

68

const Queue = require("./queue"); // or require("q/queue") when installed

69

const Q = require("q");

70

71

// Single producer, single consumer

72

function singleProducerConsumerExample() {

73

const queue = new Queue();

74

75

// Producer function

76

function producer() {

77

let count = 0;

78

const interval = setInterval(() => {

79

if (count < 5) {

80

queue.put(`Item ${count + 1}`);

81

console.log(`Produced: Item ${count + 1}`);

82

count++;

83

} else {

84

clearInterval(interval);

85

queue.put(null); // Sentinel value to indicate end

86

}

87

}, 500);

88

}

89

90

// Consumer function

91

function consumer() {

92

function processNext() {

93

return queue.get().then(item => {

94

if (item === null) {

95

console.log("Consumer finished");

96

return;

97

}

98

99

console.log(`Consumed: ${item}`);

100

return Q.delay(200).then(() => processNext()); // Simulate processing time

101

});

102

}

103

104

return processNext();

105

}

106

107

// Start producer and consumer

108

producer();

109

return consumer();

110

}

111

112

// Multiple producers, single consumer

113

function multipleProducersExample() {

114

const queue = new Queue();

115

let activeProducers = 0;

116

117

// Producer factory

118

function createProducer(name, items) {

119

activeProducers++;

120

121

return Q.async(function* () {

122

for (let item of items) {

123

yield Q.delay(Math.random() * 1000); // Random delay

124

queue.put(`${name}: ${item}`);

125

console.log(`${name} produced: ${item}`);

126

}

127

128

activeProducers--;

129

if (activeProducers === 0) {

130

queue.put(null); // Signal end when all producers done

131

}

132

})();

133

}

134

135

// Consumer

136

function consumer() {

137

function processNext() {

138

return queue.get().then(item => {

139

if (item === null) {

140

console.log("All producers finished");

141

return;

142

}

143

144

console.log(`Consumed: ${item}`);

145

return processNext();

146

});

147

}

148

149

return processNext();

150

}

151

152

// Start multiple producers

153

createProducer("Producer A", ["Item 1", "Item 2", "Item 3"]);

154

createProducer("Producer B", ["Data X", "Data Y"]);

155

createProducer("Producer C", ["Message Alpha", "Message Beta", "Message Gamma"]);

156

157

return consumer();

158

}

159

160

// Usage

161

singleProducerConsumerExample()

162

.then(() => console.log("Single producer-consumer completed"))

163

.then(() => multipleProducersExample())

164

.then(() => console.log("Multiple producers example completed"));

165

```

166

167

### Queue-Based Data Processing

168

169

Advanced patterns for processing data streams using promise queues.

170

171

```javascript

172

const Queue = require("./queue"); // or require("q/queue") when installed

173

const Q = require("q");

174

175

// Batch processing queue

176

class BatchQueue {

177

constructor(batchSize = 5, processingDelay = 1000) {

178

this.queue = new Queue();

179

this.batchSize = batchSize;

180

this.processingDelay = processingDelay;

181

this.batch = [];

182

this.isProcessing = false;

183

184

this.startBatchProcessor();

185

}

186

187

add(item) {

188

this.queue.put(item);

189

}

190

191

startBatchProcessor() {

192

const processBatch = () => {

193

this.queue.get().then(item => {

194

if (item === null) {

195

// Process final batch if not empty

196

if (this.batch.length > 0) {

197

this.processBatch(this.batch.slice());

198

this.batch = [];

199

}

200

return;

201

}

202

203

this.batch.push(item);

204

205

if (this.batch.length >= this.batchSize) {

206

this.processBatch(this.batch.slice());

207

this.batch = [];

208

}

209

210

return processBatch();

211

}).catch(error => {

212

console.error("Batch processing error:", error);

213

});

214

};

215

216

processBatch();

217

}

218

219

processBatch(batch) {

220

console.log(`Processing batch of ${batch.length} items:`, batch);

221

222

return Q.delay(this.processingDelay).then(() => {

223

console.log(`Batch processed: ${batch.join(", ")}`);

224

});

225

}

226

227

finish() {

228

this.queue.put(null);

229

}

230

}

231

232

// Priority queue implementation

233

class PriorityQueue {

234

constructor() {

235

this.queues = new Map();

236

this.processing = false;

237

}

238

239

put(item, priority = 0) {

240

if (!this.queues.has(priority)) {

241

this.queues.set(priority, new Queue());

242

}

243

244

this.queues.get(priority).put(item);

245

246

if (!this.processing) {

247

this.processNext();

248

}

249

}

250

251

processNext() {

252

this.processing = true;

253

254

// Get highest priority (lowest number)

255

const priorities = Array.from(this.queues.keys()).sort((a, b) => a - b);

256

257

const tryNextPriority = (index) => {

258

if (index >= priorities.length) {

259

this.processing = false;

260

return Q.resolve();

261

}

262

263

const priority = priorities[index];

264

const queue = this.queues.get(priority);

265

266

// Try to get item from this priority queue

267

const inspection = queue.get().inspect();

268

269

if (inspection.state === "fulfilled") {

270

// Item available immediately

271

console.log(`Processing priority ${priority} item:`, inspection.value);

272

return Q.delay(100).then(() => this.processNext());

273

} else {

274

// No item at this priority, try next

275

return tryNextPriority(index + 1);

276

}

277

};

278

279

return tryNextPriority(0);

280

}

281

}

282

283

// Task queue with worker pool

284

class WorkerQueue {

285

constructor(workerCount = 3) {

286

this.queue = new Queue();

287

this.workerCount = workerCount;

288

this.activeWorkers = 0;

289

290

this.startWorkers();

291

}

292

293

addTask(task) {

294

this.queue.put(task);

295

}

296

297

startWorkers() {

298

for (let i = 0; i < this.workerCount; i++) {

299

this.startWorker(`Worker-${i + 1}`);

300

}

301

}

302

303

startWorker(name) {

304

this.activeWorkers++;

305

306

const processTask = () => {

307

return this.queue.get().then(task => {

308

if (task === null) {

309

console.log(`${name} shutting down`);

310

this.activeWorkers--;

311

return;

312

}

313

314

console.log(`${name} processing task:`, task.name);

315

316

return Q.delay(task.duration || 1000).then(() => {

317

console.log(`${name} completed task:`, task.name);

318

return processTask();

319

});

320

}).catch(error => {

321

console.error(`${name} error:`, error);

322

return processTask();

323

});

324

};

325

326

processTask();

327

}

328

329

shutdown() {

330

// Send shutdown signal to all workers

331

for (let i = 0; i < this.workerCount; i++) {

332

this.queue.put(null);

333

}

334

}

335

336

waitForCompletion() {

337

const checkCompletion = () => {

338

if (this.activeWorkers === 0) {

339

return Q.resolve();

340

}

341

return Q.delay(100).then(checkCompletion);

342

};

343

344

return checkCompletion();

345

}

346

}

347

348

// Usage examples

349

function demonstrateQueuePatterns() {

350

console.log("=== Batch Queue Demo ===");

351

const batchQueue = new BatchQueue(3, 500);

352

353

// Add items to batch queue

354

for (let i = 1; i <= 10; i++) {

355

batchQueue.add(`Item ${i}`);

356

}

357

batchQueue.finish();

358

359

return Q.delay(5000).then(() => {

360

console.log("\n=== Worker Queue Demo ===");

361

const workerQueue = new WorkerQueue(2);

362

363

// Add tasks to worker queue

364

const tasks = [

365

{ name: "Task A", duration: 800 },

366

{ name: "Task B", duration: 1200 },

367

{ name: "Task C", duration: 600 },

368

{ name: "Task D", duration: 1000 },

369

{ name: "Task E", duration: 500 }

370

];

371

372

tasks.forEach(task => workerQueue.addTask(task));

373

374

// Shutdown after delay

375

setTimeout(() => workerQueue.shutdown(), 100);

376

377

return workerQueue.waitForCompletion();

378

});

379

}

380

381

// Run demonstrations

382

demonstrateQueuePatterns()

383

.then(() => console.log("\nAll queue demonstrations completed"))

384

.catch(error => console.error("Demo error:", error));

385

```

386

387

### Integration Patterns

388

389

Common patterns for integrating queues with other Q promise features.

390

391

```javascript

392

const Queue = require("./queue"); // or require("q/queue") when installed

393

const Q = require("q");

394

395

// Queue with error handling and retry logic

396

class RobustQueue {

397

constructor(maxRetries = 3) {

398

this.queue = new Queue();

399

this.maxRetries = maxRetries;

400

this.errorQueue = new Queue();

401

}

402

403

put(item) {

404

this.queue.put({ item, retries: 0 });

405

}

406

407

process(processor) {

408

const processNext = () => {

409

return this.queue.get().then(({ item, retries }) => {

410

return Q.try(() => processor(item))

411

.then(result => {

412

console.log("Successfully processed:", item);

413

return processNext();

414

})

415

.catch(error => {

416

console.error(`Processing failed for ${item}:`, error.message);

417

418

if (retries < this.maxRetries) {

419

console.log(`Retrying ${item} (attempt ${retries + 1}/${this.maxRetries})`);

420

this.queue.put({ item, retries: retries + 1 });

421

} else {

422

console.error(`Max retries exceeded for ${item}, moving to error queue`);

423

this.errorQueue.put({ item, error, retries });

424

}

425

426

return processNext();

427

});

428

});

429

};

430

431

return processNext();

432

}

433

434

getErrors() {

435

return this.errorQueue.get();

436

}

437

}

438

439

// Usage

440

const robustQueue = new RobustQueue(2);

441

442

// Add items

443

robustQueue.put("good-item");

444

robustQueue.put("bad-item");

445

robustQueue.put("another-good-item");

446

447

// Process with flaky processor

448

robustQueue.process(item => {

449

if (item === "bad-item" && Math.random() > 0.7) {

450

throw new Error("Random processing failure");

451

}

452

return Q.delay(100).then(() => `Processed: ${item}`);

453

});

454

```