A comprehensive promise library implementing CommonJS Promises/A,B,D specifications for JavaScript
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
FIFO queue implementation using promises for producer-consumer patterns and asynchronous data streaming.
Creates a FIFO queue that uses promises for asynchronous data handling.
/**
* FIFO queue constructor using promises
* @returns New Queue instance
*/
function Queue();
// Queue instance methods
interface Queue {
put(value: any): void;
get(): Promise<any>;
}Usage Examples:
const Queue = require("./queue"); // Import from queue.js module (relative path)
// OR when installed as npm package:
// const Queue = require("./queue"); // or require("q/queue") when installed
// Create a new queue
const messageQueue = new Queue();
// Producer: Add items to queue
messageQueue.put("First message");
messageQueue.put("Second message");
messageQueue.put("Third message");
// Consumer: Get items from queue
messageQueue.get()
.then(message => console.log("Received:", message)); // "First message"
messageQueue.get()
.then(message => console.log("Received:", message)); // "Second message"
// Queue is FIFO (First In, First Out)
messageQueue.get()
.then(message => console.log("Received:", message)); // "Third message"
// Getting from empty queue returns pending promise
const emptyPromise = messageQueue.get(); // This will wait
setTimeout(() => {
messageQueue.put("Delayed message");
// emptyPromise will now resolve with "Delayed message"
}, 1000);
emptyPromise.then(message => {
console.log("Finally received:", message); // "Delayed message"
});Implementing producer-consumer patterns with promise-based queues.
const Queue = require("./queue"); // or require("q/queue") when installed
const Q = require("q");
// Single producer, single consumer
function singleProducerConsumerExample() {
const queue = new Queue();
// Producer function
function producer() {
let count = 0;
const interval = setInterval(() => {
if (count < 5) {
queue.put(`Item ${count + 1}`);
console.log(`Produced: Item ${count + 1}`);
count++;
} else {
clearInterval(interval);
queue.put(null); // Sentinel value to indicate end
}
}, 500);
}
// Consumer function
function consumer() {
function processNext() {
return queue.get().then(item => {
if (item === null) {
console.log("Consumer finished");
return;
}
console.log(`Consumed: ${item}`);
return Q.delay(200).then(() => processNext()); // Simulate processing time
});
}
return processNext();
}
// Start producer and consumer
producer();
return consumer();
}
// Multiple producers, single consumer
function multipleProducersExample() {
const queue = new Queue();
let activeProducers = 0;
// Producer factory
function createProducer(name, items) {
activeProducers++;
return Q.async(function* () {
for (let item of items) {
yield Q.delay(Math.random() * 1000); // Random delay
queue.put(`${name}: ${item}`);
console.log(`${name} produced: ${item}`);
}
activeProducers--;
if (activeProducers === 0) {
queue.put(null); // Signal end when all producers done
}
})();
}
// Consumer
function consumer() {
function processNext() {
return queue.get().then(item => {
if (item === null) {
console.log("All producers finished");
return;
}
console.log(`Consumed: ${item}`);
return processNext();
});
}
return processNext();
}
// Start multiple producers
createProducer("Producer A", ["Item 1", "Item 2", "Item 3"]);
createProducer("Producer B", ["Data X", "Data Y"]);
createProducer("Producer C", ["Message Alpha", "Message Beta", "Message Gamma"]);
return consumer();
}
// Usage
singleProducerConsumerExample()
.then(() => console.log("Single producer-consumer completed"))
.then(() => multipleProducersExample())
.then(() => console.log("Multiple producers example completed"));Advanced patterns for processing data streams using promise queues.
const Queue = require("./queue"); // or require("q/queue") when installed
const Q = require("q");
// Batch processing queue
class BatchQueue {
constructor(batchSize = 5, processingDelay = 1000) {
this.queue = new Queue();
this.batchSize = batchSize;
this.processingDelay = processingDelay;
this.batch = [];
this.isProcessing = false;
this.startBatchProcessor();
}
add(item) {
this.queue.put(item);
}
startBatchProcessor() {
const processBatch = () => {
this.queue.get().then(item => {
if (item === null) {
// Process final batch if not empty
if (this.batch.length > 0) {
this.processBatch(this.batch.slice());
this.batch = [];
}
return;
}
this.batch.push(item);
if (this.batch.length >= this.batchSize) {
this.processBatch(this.batch.slice());
this.batch = [];
}
return processBatch();
}).catch(error => {
console.error("Batch processing error:", error);
});
};
processBatch();
}
processBatch(batch) {
console.log(`Processing batch of ${batch.length} items:`, batch);
return Q.delay(this.processingDelay).then(() => {
console.log(`Batch processed: ${batch.join(", ")}`);
});
}
finish() {
this.queue.put(null);
}
}
// Priority queue implementation
class PriorityQueue {
constructor() {
this.queues = new Map();
this.processing = false;
}
put(item, priority = 0) {
if (!this.queues.has(priority)) {
this.queues.set(priority, new Queue());
}
this.queues.get(priority).put(item);
if (!this.processing) {
this.processNext();
}
}
processNext() {
this.processing = true;
// Get highest priority (lowest number)
const priorities = Array.from(this.queues.keys()).sort((a, b) => a - b);
const tryNextPriority = (index) => {
if (index >= priorities.length) {
this.processing = false;
return Q.resolve();
}
const priority = priorities[index];
const queue = this.queues.get(priority);
// Try to get item from this priority queue
const inspection = queue.get().inspect();
if (inspection.state === "fulfilled") {
// Item available immediately
console.log(`Processing priority ${priority} item:`, inspection.value);
return Q.delay(100).then(() => this.processNext());
} else {
// No item at this priority, try next
return tryNextPriority(index + 1);
}
};
return tryNextPriority(0);
}
}
// Task queue with worker pool
class WorkerQueue {
constructor(workerCount = 3) {
this.queue = new Queue();
this.workerCount = workerCount;
this.activeWorkers = 0;
this.startWorkers();
}
addTask(task) {
this.queue.put(task);
}
startWorkers() {
for (let i = 0; i < this.workerCount; i++) {
this.startWorker(`Worker-${i + 1}`);
}
}
startWorker(name) {
this.activeWorkers++;
const processTask = () => {
return this.queue.get().then(task => {
if (task === null) {
console.log(`${name} shutting down`);
this.activeWorkers--;
return;
}
console.log(`${name} processing task:`, task.name);
return Q.delay(task.duration || 1000).then(() => {
console.log(`${name} completed task:`, task.name);
return processTask();
});
}).catch(error => {
console.error(`${name} error:`, error);
return processTask();
});
};
processTask();
}
shutdown() {
// Send shutdown signal to all workers
for (let i = 0; i < this.workerCount; i++) {
this.queue.put(null);
}
}
waitForCompletion() {
const checkCompletion = () => {
if (this.activeWorkers === 0) {
return Q.resolve();
}
return Q.delay(100).then(checkCompletion);
};
return checkCompletion();
}
}
// Usage examples
function demonstrateQueuePatterns() {
console.log("=== Batch Queue Demo ===");
const batchQueue = new BatchQueue(3, 500);
// Add items to batch queue
for (let i = 1; i <= 10; i++) {
batchQueue.add(`Item ${i}`);
}
batchQueue.finish();
return Q.delay(5000).then(() => {
console.log("\n=== Worker Queue Demo ===");
const workerQueue = new WorkerQueue(2);
// Add tasks to worker queue
const tasks = [
{ name: "Task A", duration: 800 },
{ name: "Task B", duration: 1200 },
{ name: "Task C", duration: 600 },
{ name: "Task D", duration: 1000 },
{ name: "Task E", duration: 500 }
];
tasks.forEach(task => workerQueue.addTask(task));
// Shutdown after delay
setTimeout(() => workerQueue.shutdown(), 100);
return workerQueue.waitForCompletion();
});
}
// Run demonstrations
demonstrateQueuePatterns()
.then(() => console.log("\nAll queue demonstrations completed"))
.catch(error => console.error("Demo error:", error));Common patterns for integrating queues with other Q promise features.
const Queue = require("./queue"); // or require("q/queue") when installed
const Q = require("q");
// Queue with error handling and retry logic
class RobustQueue {
constructor(maxRetries = 3) {
this.queue = new Queue();
this.maxRetries = maxRetries;
this.errorQueue = new Queue();
}
put(item) {
this.queue.put({ item, retries: 0 });
}
process(processor) {
const processNext = () => {
return this.queue.get().then(({ item, retries }) => {
return Q.try(() => processor(item))
.then(result => {
console.log("Successfully processed:", item);
return processNext();
})
.catch(error => {
console.error(`Processing failed for ${item}:`, error.message);
if (retries < this.maxRetries) {
console.log(`Retrying ${item} (attempt ${retries + 1}/${this.maxRetries})`);
this.queue.put({ item, retries: retries + 1 });
} else {
console.error(`Max retries exceeded for ${item}, moving to error queue`);
this.errorQueue.put({ item, error, retries });
}
return processNext();
});
});
};
return processNext();
}
getErrors() {
return this.errorQueue.get();
}
}
// Usage
const robustQueue = new RobustQueue(2);
// Add items
robustQueue.put("good-item");
robustQueue.put("bad-item");
robustQueue.put("another-good-item");
// Process with flaky processor
robustQueue.process(item => {
if (item === "bad-item" && Math.random() > 0.7) {
throw new Error("Random processing failure");
}
return Q.delay(100).then(() => `Processed: ${item}`);
});Install with Tessl CLI
npx tessl i tessl/npm-q