CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-q

A comprehensive promise library implementing CommonJS Promises/A,B,D specifications for JavaScript

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

queue.mddocs/

Queue Operations

FIFO queue implementation using promises for producer-consumer patterns and asynchronous data streaming.

Capabilities

Queue Constructor

Creates a FIFO queue that uses promises for asynchronous data handling.

/**
 * FIFO queue constructor using promises
 * @returns New Queue instance
 */
function Queue();

// Queue instance methods
interface Queue {
  put(value: any): void;
  get(): Promise<any>;
}

Usage Examples:

const Queue = require("./queue"); // Import from queue.js module (relative path)
// OR when installed as npm package:
// const Queue = require("./queue"); // or require("q/queue") when installed

// Create a new queue
const messageQueue = new Queue();

// Producer: Add items to queue
messageQueue.put("First message");
messageQueue.put("Second message");
messageQueue.put("Third message");

// Consumer: Get items from queue
messageQueue.get()
  .then(message => console.log("Received:", message)); // "First message"

messageQueue.get()
  .then(message => console.log("Received:", message)); // "Second message"

// Queue is FIFO (First In, First Out)
messageQueue.get()
  .then(message => console.log("Received:", message)); // "Third message"

// Getting from empty queue returns pending promise
const emptyPromise = messageQueue.get(); // This will wait

setTimeout(() => {
  messageQueue.put("Delayed message");
  // emptyPromise will now resolve with "Delayed message"
}, 1000);

emptyPromise.then(message => {
  console.log("Finally received:", message); // "Delayed message"
});

Producer-Consumer Patterns

Implementing producer-consumer patterns with promise-based queues.

const Queue = require("./queue"); // or require("q/queue") when installed
const Q = require("q");

// Single producer, single consumer
function singleProducerConsumerExample() {
  const queue = new Queue();
  
  // Producer function
  function producer() {
    let count = 0;
    const interval = setInterval(() => {
      if (count < 5) {
        queue.put(`Item ${count + 1}`);
        console.log(`Produced: Item ${count + 1}`);
        count++;
      } else {
        clearInterval(interval);
        queue.put(null); // Sentinel value to indicate end
      }
    }, 500);
  }
  
  // Consumer function
  function consumer() {
    function processNext() {
      return queue.get().then(item => {
        if (item === null) {
          console.log("Consumer finished");
          return;
        }
        
        console.log(`Consumed: ${item}`);
        return Q.delay(200).then(() => processNext()); // Simulate processing time
      });
    }
    
    return processNext();
  }
  
  // Start producer and consumer
  producer();
  return consumer();
}

// Multiple producers, single consumer
function multipleProducersExample() {
  const queue = new Queue();
  let activeProducers = 0;
  
  // Producer factory
  function createProducer(name, items) {
    activeProducers++;
    
    return Q.async(function* () {
      for (let item of items) {
        yield Q.delay(Math.random() * 1000); // Random delay
        queue.put(`${name}: ${item}`);
        console.log(`${name} produced: ${item}`);
      }
      
      activeProducers--;
      if (activeProducers === 0) {
        queue.put(null); // Signal end when all producers done
      }
    })();
  }
  
  // Consumer
  function consumer() {
    function processNext() {
      return queue.get().then(item => {
        if (item === null) {
          console.log("All producers finished");
          return;
        }
        
        console.log(`Consumed: ${item}`);
        return processNext();
      });
    }
    
    return processNext();
  }
  
  // Start multiple producers
  createProducer("Producer A", ["Item 1", "Item 2", "Item 3"]);
  createProducer("Producer B", ["Data X", "Data Y"]);
  createProducer("Producer C", ["Message Alpha", "Message Beta", "Message Gamma"]);
  
  return consumer();
}

// Usage
singleProducerConsumerExample()
  .then(() => console.log("Single producer-consumer completed"))
  .then(() => multipleProducersExample())
  .then(() => console.log("Multiple producers example completed"));

Queue-Based Data Processing

Advanced patterns for processing data streams using promise queues.

const Queue = require("./queue"); // or require("q/queue") when installed
const Q = require("q");

// Batch processing queue
class BatchQueue {
  constructor(batchSize = 5, processingDelay = 1000) {
    this.queue = new Queue();
    this.batchSize = batchSize;
    this.processingDelay = processingDelay;
    this.batch = [];
    this.isProcessing = false;
    
    this.startBatchProcessor();
  }
  
  add(item) {
    this.queue.put(item);
  }
  
  startBatchProcessor() {
    const processBatch = () => {
      this.queue.get().then(item => {
        if (item === null) {
          // Process final batch if not empty
          if (this.batch.length > 0) {
            this.processBatch(this.batch.slice());
            this.batch = [];
          }
          return;
        }
        
        this.batch.push(item);
        
        if (this.batch.length >= this.batchSize) {
          this.processBatch(this.batch.slice());
          this.batch = [];
        }
        
        return processBatch();
      }).catch(error => {
        console.error("Batch processing error:", error);
      });
    };
    
    processBatch();
  }
  
  processBatch(batch) {
    console.log(`Processing batch of ${batch.length} items:`, batch);
    
    return Q.delay(this.processingDelay).then(() => {
      console.log(`Batch processed: ${batch.join(", ")}`);
    });
  }
  
  finish() {
    this.queue.put(null);
  }
}

// Priority queue implementation
class PriorityQueue {
  constructor() {
    this.queues = new Map();
    this.processing = false;
  }
  
  put(item, priority = 0) {
    if (!this.queues.has(priority)) {
      this.queues.set(priority, new Queue());
    }
    
    this.queues.get(priority).put(item);
    
    if (!this.processing) {
      this.processNext();
    }
  }
  
  processNext() {
    this.processing = true;
    
    // Get highest priority (lowest number)
    const priorities = Array.from(this.queues.keys()).sort((a, b) => a - b);
    
    const tryNextPriority = (index) => {
      if (index >= priorities.length) {
        this.processing = false;
        return Q.resolve();
      }
      
      const priority = priorities[index];
      const queue = this.queues.get(priority);
      
      // Try to get item from this priority queue
      const inspection = queue.get().inspect();
      
      if (inspection.state === "fulfilled") {
        // Item available immediately
        console.log(`Processing priority ${priority} item:`, inspection.value);
        return Q.delay(100).then(() => this.processNext());
      } else {
        // No item at this priority, try next
        return tryNextPriority(index + 1);
      }
    };
    
    return tryNextPriority(0);
  }
}

// Task queue with worker pool
class WorkerQueue {
  constructor(workerCount = 3) {
    this.queue = new Queue();
    this.workerCount = workerCount;
    this.activeWorkers = 0;
    
    this.startWorkers();
  }
  
  addTask(task) {
    this.queue.put(task);
  }
  
  startWorkers() {
    for (let i = 0; i < this.workerCount; i++) {
      this.startWorker(`Worker-${i + 1}`);
    }
  }
  
  startWorker(name) {
    this.activeWorkers++;
    
    const processTask = () => {
      return this.queue.get().then(task => {
        if (task === null) {
          console.log(`${name} shutting down`);
          this.activeWorkers--;
          return;
        }
        
        console.log(`${name} processing task:`, task.name);
        
        return Q.delay(task.duration || 1000).then(() => {
          console.log(`${name} completed task:`, task.name);
          return processTask();
        });
      }).catch(error => {
        console.error(`${name} error:`, error);
        return processTask();
      });
    };
    
    processTask();
  }
  
  shutdown() {
    // Send shutdown signal to all workers
    for (let i = 0; i < this.workerCount; i++) {
      this.queue.put(null);
    }
  }
  
  waitForCompletion() {
    const checkCompletion = () => {
      if (this.activeWorkers === 0) {
        return Q.resolve();
      }
      return Q.delay(100).then(checkCompletion);
    };
    
    return checkCompletion();
  }
}

// Usage examples
function demonstrateQueuePatterns() {
  console.log("=== Batch Queue Demo ===");
  const batchQueue = new BatchQueue(3, 500);
  
  // Add items to batch queue
  for (let i = 1; i <= 10; i++) {
    batchQueue.add(`Item ${i}`);
  }
  batchQueue.finish();
  
  return Q.delay(5000).then(() => {
    console.log("\n=== Worker Queue Demo ===");
    const workerQueue = new WorkerQueue(2);
    
    // Add tasks to worker queue
    const tasks = [
      { name: "Task A", duration: 800 },
      { name: "Task B", duration: 1200 },
      { name: "Task C", duration: 600 },
      { name: "Task D", duration: 1000 },
      { name: "Task E", duration: 500 }
    ];
    
    tasks.forEach(task => workerQueue.addTask(task));
    
    // Shutdown after delay
    setTimeout(() => workerQueue.shutdown(), 100);
    
    return workerQueue.waitForCompletion();
  });
}

// Run demonstrations
demonstrateQueuePatterns()
  .then(() => console.log("\nAll queue demonstrations completed"))
  .catch(error => console.error("Demo error:", error));

Integration Patterns

Common patterns for integrating queues with other Q promise features.

const Queue = require("./queue"); // or require("q/queue") when installed
const Q = require("q");

// Queue with error handling and retry logic
class RobustQueue {
  constructor(maxRetries = 3) {
    this.queue = new Queue();
    this.maxRetries = maxRetries;
    this.errorQueue = new Queue();
  }
  
  put(item) {
    this.queue.put({ item, retries: 0 });
  }
  
  process(processor) {
    const processNext = () => {
      return this.queue.get().then(({ item, retries }) => {
        return Q.try(() => processor(item))
          .then(result => {
            console.log("Successfully processed:", item);
            return processNext();
          })
          .catch(error => {
            console.error(`Processing failed for ${item}:`, error.message);
            
            if (retries < this.maxRetries) {
              console.log(`Retrying ${item} (attempt ${retries + 1}/${this.maxRetries})`);
              this.queue.put({ item, retries: retries + 1 });
            } else {
              console.error(`Max retries exceeded for ${item}, moving to error queue`);
              this.errorQueue.put({ item, error, retries });
            }
            
            return processNext();
          });
      });
    };
    
    return processNext();
  }
  
  getErrors() {
    return this.errorQueue.get();
  }
}

// Usage
const robustQueue = new RobustQueue(2);

// Add items
robustQueue.put("good-item");
robustQueue.put("bad-item");
robustQueue.put("another-good-item");

// Process with flaky processor
robustQueue.process(item => {
  if (item === "bad-item" && Math.random() > 0.7) {
    throw new Error("Random processing failure");
  }
  return Q.delay(100).then(() => `Processed: ${item}`);
});

Install with Tessl CLI

npx tessl i tessl/npm-q

docs

advanced.md

collections.md

core-promises.md

flow-control.md

functional.md

index.md

nodejs.md

promise-chains.md

property-access.md

queue.md

state-inspection.md

tile.json