0
# Queue Operations
1
2
FIFO queue implementation using promises for producer-consumer patterns and asynchronous data streaming.
3
4
## Capabilities
5
6
### Queue Constructor
7
8
Creates a FIFO queue that uses promises for asynchronous data handling.
9
10
```javascript { .api }
11
/**
12
* FIFO queue constructor using promises
13
* @returns New Queue instance
14
*/
15
function Queue();
16
17
// Queue instance methods
18
interface Queue {
19
put(value: any): void;
20
get(): Promise<any>;
21
}
22
```
23
24
**Usage Examples:**
25
26
```javascript
27
const Queue = require("./queue"); // Import from queue.js module (relative path)
28
// OR when installed as npm package:
29
// const Queue = require("./queue"); // or require("q/queue") when installed
30
31
// Create a new queue
32
const messageQueue = new Queue();
33
34
// Producer: Add items to queue
35
messageQueue.put("First message");
36
messageQueue.put("Second message");
37
messageQueue.put("Third message");
38
39
// Consumer: Get items from queue
40
messageQueue.get()
41
.then(message => console.log("Received:", message)); // "First message"
42
43
messageQueue.get()
44
.then(message => console.log("Received:", message)); // "Second message"
45
46
// Queue is FIFO (First In, First Out)
47
messageQueue.get()
48
.then(message => console.log("Received:", message)); // "Third message"
49
50
// Getting from empty queue returns pending promise
51
const emptyPromise = messageQueue.get(); // This will wait
52
53
setTimeout(() => {
54
messageQueue.put("Delayed message");
55
// emptyPromise will now resolve with "Delayed message"
56
}, 1000);
57
58
emptyPromise.then(message => {
59
console.log("Finally received:", message); // "Delayed message"
60
});
61
```
62
63
### Producer-Consumer Patterns
64
65
Implementing producer-consumer patterns with promise-based queues.
66
67
```javascript
68
const Queue = require("./queue"); // or require("q/queue") when installed
69
const Q = require("q");
70
71
// Single producer, single consumer
72
function singleProducerConsumerExample() {
73
const queue = new Queue();
74
75
// Producer function
76
function producer() {
77
let count = 0;
78
const interval = setInterval(() => {
79
if (count < 5) {
80
queue.put(`Item ${count + 1}`);
81
console.log(`Produced: Item ${count + 1}`);
82
count++;
83
} else {
84
clearInterval(interval);
85
queue.put(null); // Sentinel value to indicate end
86
}
87
}, 500);
88
}
89
90
// Consumer function
91
function consumer() {
92
function processNext() {
93
return queue.get().then(item => {
94
if (item === null) {
95
console.log("Consumer finished");
96
return;
97
}
98
99
console.log(`Consumed: ${item}`);
100
return Q.delay(200).then(() => processNext()); // Simulate processing time
101
});
102
}
103
104
return processNext();
105
}
106
107
// Start producer and consumer
108
producer();
109
return consumer();
110
}
111
112
// Multiple producers, single consumer
113
function multipleProducersExample() {
114
const queue = new Queue();
115
let activeProducers = 0;
116
117
// Producer factory
118
function createProducer(name, items) {
119
activeProducers++;
120
121
return Q.async(function* () {
122
for (let item of items) {
123
yield Q.delay(Math.random() * 1000); // Random delay
124
queue.put(`${name}: ${item}`);
125
console.log(`${name} produced: ${item}`);
126
}
127
128
activeProducers--;
129
if (activeProducers === 0) {
130
queue.put(null); // Signal end when all producers done
131
}
132
})();
133
}
134
135
// Consumer
136
function consumer() {
137
function processNext() {
138
return queue.get().then(item => {
139
if (item === null) {
140
console.log("All producers finished");
141
return;
142
}
143
144
console.log(`Consumed: ${item}`);
145
return processNext();
146
});
147
}
148
149
return processNext();
150
}
151
152
// Start multiple producers
153
createProducer("Producer A", ["Item 1", "Item 2", "Item 3"]);
154
createProducer("Producer B", ["Data X", "Data Y"]);
155
createProducer("Producer C", ["Message Alpha", "Message Beta", "Message Gamma"]);
156
157
return consumer();
158
}
159
160
// Usage
161
singleProducerConsumerExample()
162
.then(() => console.log("Single producer-consumer completed"))
163
.then(() => multipleProducersExample())
164
.then(() => console.log("Multiple producers example completed"));
165
```
166
167
### Queue-Based Data Processing
168
169
Advanced patterns for processing data streams using promise queues.
170
171
```javascript
172
const Queue = require("./queue"); // or require("q/queue") when installed
173
const Q = require("q");
174
175
// Batch processing queue
176
class BatchQueue {
177
constructor(batchSize = 5, processingDelay = 1000) {
178
this.queue = new Queue();
179
this.batchSize = batchSize;
180
this.processingDelay = processingDelay;
181
this.batch = [];
182
this.isProcessing = false;
183
184
this.startBatchProcessor();
185
}
186
187
add(item) {
188
this.queue.put(item);
189
}
190
191
startBatchProcessor() {
192
const processBatch = () => {
193
this.queue.get().then(item => {
194
if (item === null) {
195
// Process final batch if not empty
196
if (this.batch.length > 0) {
197
this.processBatch(this.batch.slice());
198
this.batch = [];
199
}
200
return;
201
}
202
203
this.batch.push(item);
204
205
if (this.batch.length >= this.batchSize) {
206
this.processBatch(this.batch.slice());
207
this.batch = [];
208
}
209
210
return processBatch();
211
}).catch(error => {
212
console.error("Batch processing error:", error);
213
});
214
};
215
216
processBatch();
217
}
218
219
processBatch(batch) {
220
console.log(`Processing batch of ${batch.length} items:`, batch);
221
222
return Q.delay(this.processingDelay).then(() => {
223
console.log(`Batch processed: ${batch.join(", ")}`);
224
});
225
}
226
227
finish() {
228
this.queue.put(null);
229
}
230
}
231
232
// Priority queue implementation
233
class PriorityQueue {
234
constructor() {
235
this.queues = new Map();
236
this.processing = false;
237
}
238
239
put(item, priority = 0) {
240
if (!this.queues.has(priority)) {
241
this.queues.set(priority, new Queue());
242
}
243
244
this.queues.get(priority).put(item);
245
246
if (!this.processing) {
247
this.processNext();
248
}
249
}
250
251
processNext() {
252
this.processing = true;
253
254
// Get highest priority (lowest number)
255
const priorities = Array.from(this.queues.keys()).sort((a, b) => a - b);
256
257
const tryNextPriority = (index) => {
258
if (index >= priorities.length) {
259
this.processing = false;
260
return Q.resolve();
261
}
262
263
const priority = priorities[index];
264
const queue = this.queues.get(priority);
265
266
// Try to get item from this priority queue
267
const inspection = queue.get().inspect();
268
269
if (inspection.state === "fulfilled") {
270
// Item available immediately
271
console.log(`Processing priority ${priority} item:`, inspection.value);
272
return Q.delay(100).then(() => this.processNext());
273
} else {
274
// No item at this priority, try next
275
return tryNextPriority(index + 1);
276
}
277
};
278
279
return tryNextPriority(0);
280
}
281
}
282
283
// Task queue with worker pool
284
class WorkerQueue {
285
constructor(workerCount = 3) {
286
this.queue = new Queue();
287
this.workerCount = workerCount;
288
this.activeWorkers = 0;
289
290
this.startWorkers();
291
}
292
293
addTask(task) {
294
this.queue.put(task);
295
}
296
297
startWorkers() {
298
for (let i = 0; i < this.workerCount; i++) {
299
this.startWorker(`Worker-${i + 1}`);
300
}
301
}
302
303
startWorker(name) {
304
this.activeWorkers++;
305
306
const processTask = () => {
307
return this.queue.get().then(task => {
308
if (task === null) {
309
console.log(`${name} shutting down`);
310
this.activeWorkers--;
311
return;
312
}
313
314
console.log(`${name} processing task:`, task.name);
315
316
return Q.delay(task.duration || 1000).then(() => {
317
console.log(`${name} completed task:`, task.name);
318
return processTask();
319
});
320
}).catch(error => {
321
console.error(`${name} error:`, error);
322
return processTask();
323
});
324
};
325
326
processTask();
327
}
328
329
shutdown() {
330
// Send shutdown signal to all workers
331
for (let i = 0; i < this.workerCount; i++) {
332
this.queue.put(null);
333
}
334
}
335
336
waitForCompletion() {
337
const checkCompletion = () => {
338
if (this.activeWorkers === 0) {
339
return Q.resolve();
340
}
341
return Q.delay(100).then(checkCompletion);
342
};
343
344
return checkCompletion();
345
}
346
}
347
348
// Usage examples
349
function demonstrateQueuePatterns() {
350
console.log("=== Batch Queue Demo ===");
351
const batchQueue = new BatchQueue(3, 500);
352
353
// Add items to batch queue
354
for (let i = 1; i <= 10; i++) {
355
batchQueue.add(`Item ${i}`);
356
}
357
batchQueue.finish();
358
359
return Q.delay(5000).then(() => {
360
console.log("\n=== Worker Queue Demo ===");
361
const workerQueue = new WorkerQueue(2);
362
363
// Add tasks to worker queue
364
const tasks = [
365
{ name: "Task A", duration: 800 },
366
{ name: "Task B", duration: 1200 },
367
{ name: "Task C", duration: 600 },
368
{ name: "Task D", duration: 1000 },
369
{ name: "Task E", duration: 500 }
370
];
371
372
tasks.forEach(task => workerQueue.addTask(task));
373
374
// Shutdown after delay
375
setTimeout(() => workerQueue.shutdown(), 100);
376
377
return workerQueue.waitForCompletion();
378
});
379
}
380
381
// Run demonstrations
382
demonstrateQueuePatterns()
383
.then(() => console.log("\nAll queue demonstrations completed"))
384
.catch(error => console.error("Demo error:", error));
385
```
386
387
### Integration Patterns
388
389
Common patterns for integrating queues with other Q promise features.
390
391
```javascript
392
const Queue = require("./queue"); // or require("q/queue") when installed
393
const Q = require("q");
394
395
// Queue with error handling and retry logic
396
class RobustQueue {
397
constructor(maxRetries = 3) {
398
this.queue = new Queue();
399
this.maxRetries = maxRetries;
400
this.errorQueue = new Queue();
401
}
402
403
put(item) {
404
this.queue.put({ item, retries: 0 });
405
}
406
407
process(processor) {
408
const processNext = () => {
409
return this.queue.get().then(({ item, retries }) => {
410
return Q.try(() => processor(item))
411
.then(result => {
412
console.log("Successfully processed:", item);
413
return processNext();
414
})
415
.catch(error => {
416
console.error(`Processing failed for ${item}:`, error.message);
417
418
if (retries < this.maxRetries) {
419
console.log(`Retrying ${item} (attempt ${retries + 1}/${this.maxRetries})`);
420
this.queue.put({ item, retries: retries + 1 });
421
} else {
422
console.error(`Max retries exceeded for ${item}, moving to error queue`);
423
this.errorQueue.put({ item, error, retries });
424
}
425
426
return processNext();
427
});
428
});
429
};
430
431
return processNext();
432
}
433
434
getErrors() {
435
return this.errorQueue.get();
436
}
437
}
438
439
// Usage
440
const robustQueue = new RobustQueue(2);
441
442
// Add items
443
robustQueue.put("good-item");
444
robustQueue.put("bad-item");
445
robustQueue.put("another-good-item");
446
447
// Process with flaky processor
448
robustQueue.process(item => {
449
if (item === "bad-item" && Math.random() > 0.7) {
450
throw new Error("Random processing failure");
451
}
452
return Q.delay(100).then(() => `Processed: ${item}`);
453
});
454
```