CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-fastq

Fast, in memory work queue

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

fastq

fastq provides a fast, in-memory work queue for Node.js with configurable concurrency. It supports both callback-based and promise-based APIs, enabling efficient task processing with minimal overhead through object reuse patterns and optimized performance.

Package Information

  • Package Name: fastq
  • Package Type: npm
  • Language: JavaScript with TypeScript definitions
  • Installation: npm install fastq

Core Imports

const fastq = require('fastq');

ES Modules:

import fastq from 'fastq';

TypeScript:

import * as fastq from "fastq";
import type { queue, queueAsPromised, done } from "fastq";

Basic Usage

Callback-based API

const fastq = require('fastq');

// Create queue with worker function and concurrency limit
const queue = fastq(worker, 1);

// Add tasks to queue
queue.push('hello', (err, result) => {
  if (err) throw err;
  console.log('Result:', result);
});

function worker(task, callback) {
  // Process task
  callback(null, `processed: ${task}`);
}

Promise-based API

const fastq = require('fastq');

// Create promise-based queue
const queue = fastq.promise(asyncWorker, 2);

// Add tasks with promises
const result = await queue.push('hello');
console.log('Result:', result);

async function asyncWorker(task) {
  // Process task asynchronously
  return `processed: ${task}`;
}

Context Binding

const context = { prefix: 'LOG: ' };
const queue = fastq(context, worker, 1);

function worker(task, callback) {
  console.log(this.prefix + task); // Access context via 'this'
  callback(null, `${this.prefix}processed: ${task}`);
}

TypeScript Usage

import * as fastq from "fastq";
import type { queue, queueAsPromised, done } from "fastq";

// Callback-based with types
type Task = { id: number };
const callbackQueue: queue<Task> = fastq(worker, 1);

function worker(arg: Task, cb: done) {
  console.log(arg.id);
  cb(null, `processed: ${arg.id}`);
}

// Promise-based with types
const promiseQueue: queueAsPromised<Task> = fastq.promise(asyncWorker, 1);

async function asyncWorker(arg: Task): Promise<string> {
  return `processed: ${arg.id}`;
}

Capabilities

Queue Creation

Create queue instances with worker functions and concurrency control.

/**
 * Create a callback-based queue
 * @param context - Optional context object bound to worker as 'this'
 * @param worker - Worker function to process tasks
 * @param concurrency - Number of concurrent tasks (minimum 1)
 * @returns queue instance
 */
function fastq(context, worker, concurrency);
function fastq(worker, concurrency);

/**
 * Create a promise-based queue
 * @param context - Optional context object bound to worker as 'this'  
 * @param worker - Async worker function returning Promise
 * @param concurrency - Number of concurrent tasks (minimum 1)
 * @returns queueAsPromised instance
 */
fastq.promise(context, worker, concurrency);
fastq.promise(worker, concurrency);

Task Scheduling

Add tasks to the queue for processing.

/**
 * Add task to end of queue (callback-based)
 * @param task - Task data to process
 * @param done - Completion callback (err, result) => void
 */
queue.push(task, done);

/**
 * Add task to beginning of queue (callback-based)
 * @param task - Task data to process
 * @param done - Completion callback (err, result) => void
 */
queue.unshift(task, done);

/**
 * Add task to end of queue (promise-based)
 * @param task - Task data to process
 * @returns {Promise} Promise that resolves with task result
 */
queue.push(task);

/**
 * Add task to beginning of queue (promise-based)
 * @param task - Task data to process
 * @returns {Promise} Promise that resolves with task result
 */
queue.unshift(task);

Queue Control

Control queue execution state.

/**
 * Pause task processing. Currently running tasks continue.
 */
queue.pause();

/**
 * Resume paused task processing
 */
queue.resume();

/**
 * Remove all waiting tasks and reset drain function
 */
queue.kill();

/**
 * Remove all waiting tasks but call drain first
 */
queue.killAndDrain();

Queue Inspection

Query queue status and contents.

/**
 * Get number of currently running tasks
 * @returns {number} number of running tasks
 */
queue.running();

/**
 * Check if queue is idle (no running or waiting tasks)
 * @returns {boolean} true if idle, false otherwise
 */
queue.idle();

/**
 * Get number of tasks waiting in queue
 * @returns {number} number of queued tasks
 */
queue.length();

/**
 * Get all tasks waiting in queue
 * @returns {Array} array of task values
 */
queue.getQueue();

Error Handling

Configure global error handling for task failures.

/**
 * Set global error handler for task failures
 * @param handler - Error handler function called only when tasks fail (err, task) => void
 */
queue.error(handler);

Promise-specific Methods

Additional methods available only on promise-based queues.

/**
 * Wait for queue to drain completely
 * @returns {Promise<void>} Promise that resolves when all tasks complete
 */
queue.drained();

Properties

Configuration Properties

/**
 * Current concurrency limit (readable/writable)
 * Setting this adjusts concurrent task limit at runtime
 * @type number (minimum 1)
 */
queue.concurrency;

/**
 * Current pause state (read-only)
 * Returns true when queue is paused, false otherwise
 * @type boolean
 */
queue.paused;

Event Hook Properties

/**
 * Function called when queue becomes empty and all tasks finish
 * Can be reassigned at runtime
 * @type () => void
 */
queue.drain;

/**
 * Function called when last task is assigned to worker
 * Can be reassigned at runtime  
 * @type () => void
 */
queue.empty;

/**
 * Function called when queue hits concurrency limit
 * Can be reassigned at runtime
 * @type () => void
 */
queue.saturated;

Types

/**
 * Callback-based worker function
 * @param task - Task data to process
 * @param callback - Completion callback (err, result) => void
 */
type Worker = (task, callback) => void;

/**
 * Promise-based worker function
 * @param task - Task data to process  
 * @returns Promise resolving to result
 */
type AsyncWorker = (task) => Promise;

/**
 * Task completion callback
 * @param err - Error if task failed, null if successful
 * @param result - Task result if successful
 */
type Done = (err, result) => void;

/**
 * Error handler for failed tasks
 * @param err - Error that occurred
 * @param task - Task that failed
 */
type ErrorHandler = (err, task) => void;

Performance Features

  • Object Pooling: Uses reusify library to minimize garbage collection through task wrapper object reuse via internal Task constructor reuse pattern
  • Direct Execution: Tasks run immediately when under concurrency limit, avoiding queue overhead
  • Minimal Allocation: Optimized linked-list implementation for efficient queue operations with head/tail pointers
  • Zero Overhead Series: For sequential processing, consider fastseries package from same author

Error Handling Patterns

// Global error handling - called only for task failures
queue.error((err, task) => {
  console.error(`Task failed:`, task, err);
});

// Per-task error handling (callback API)
queue.push(task, (err, result) => {
  if (err) {
    console.error('Task error:', err);
    return;
  }
  console.log('Success:', result);
});

// Per-task error handling (promise API)
try {
  const result = await queue.push(task);
  console.log('Success:', result);
} catch (err) {
  console.error('Task error:', err);
}

Common Usage Patterns

Rate-limited API Calls

const queue = fastq.promise(apiWorker, 3);

async function apiWorker(url) {
  const response = await fetch(url);
  return response.json();
}

// Process URLs with max 3 concurrent requests
const results = await Promise.all([
  queue.push('https://api.example.com/users/1'),
  queue.push('https://api.example.com/users/2'),
  queue.push('https://api.example.com/users/3'),
]);

Background Job Processing

const queue = fastq(jobWorker, 5);

queue.drain = () => {
  console.log('All jobs completed');
};

function jobWorker(job, callback) {
  processJob(job)
    .then(result => callback(null, result))
    .catch(err => callback(err));
}

// Add multiple jobs
jobs.forEach(job => queue.push(job));

Dynamic Concurrency Adjustment

const queue = fastq(worker, 1);

// Increase concurrency during high-load periods
if (isHighLoad()) {
  queue.concurrency = 5;
}

// Reduce for low-priority tasks
if (isLowPriority()) {
  queue.concurrency = 1;
}
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/fastq@1.19.x
Publish Source
CLI
Badge
tessl/npm-fastq badge