Fast, in memory work queue
npx @tessl/cli install tessl/npm-fastq@1.19.0fastq provides a fast, in-memory work queue for Node.js with configurable concurrency. It supports both callback-based and promise-based APIs, enabling efficient task processing with minimal overhead through object reuse patterns and optimized performance.
npm install fastqconst fastq = require('fastq');ES Modules:
import fastq from 'fastq';TypeScript:
import * as fastq from "fastq";
import type { queue, queueAsPromised, done } from "fastq";const fastq = require('fastq');
// Create queue with worker function and concurrency limit
const queue = fastq(worker, 1);
// Add tasks to queue
queue.push('hello', (err, result) => {
if (err) throw err;
console.log('Result:', result);
});
function worker(task, callback) {
// Process task
callback(null, `processed: ${task}`);
}const fastq = require('fastq');
// Create promise-based queue
const queue = fastq.promise(asyncWorker, 2);
// Add tasks with promises
const result = await queue.push('hello');
console.log('Result:', result);
async function asyncWorker(task) {
// Process task asynchronously
return `processed: ${task}`;
}const context = { prefix: 'LOG: ' };
const queue = fastq(context, worker, 1);
function worker(task, callback) {
console.log(this.prefix + task); // Access context via 'this'
callback(null, `${this.prefix}processed: ${task}`);
}import * as fastq from "fastq";
import type { queue, queueAsPromised, done } from "fastq";
// Callback-based with types
type Task = { id: number };
const callbackQueue: queue<Task> = fastq(worker, 1);
function worker(arg: Task, cb: done) {
console.log(arg.id);
cb(null, `processed: ${arg.id}`);
}
// Promise-based with types
const promiseQueue: queueAsPromised<Task> = fastq.promise(asyncWorker, 1);
async function asyncWorker(arg: Task): Promise<string> {
return `processed: ${arg.id}`;
}Create queue instances with worker functions and concurrency control.
/**
* Create a callback-based queue
* @param context - Optional context object bound to worker as 'this'
* @param worker - Worker function to process tasks
* @param concurrency - Number of concurrent tasks (minimum 1)
* @returns queue instance
*/
function fastq(context, worker, concurrency);
function fastq(worker, concurrency);
/**
* Create a promise-based queue
* @param context - Optional context object bound to worker as 'this'
* @param worker - Async worker function returning Promise
* @param concurrency - Number of concurrent tasks (minimum 1)
* @returns queueAsPromised instance
*/
fastq.promise(context, worker, concurrency);
fastq.promise(worker, concurrency);Add tasks to the queue for processing.
/**
* Add task to end of queue (callback-based)
* @param task - Task data to process
* @param done - Completion callback (err, result) => void
*/
queue.push(task, done);
/**
* Add task to beginning of queue (callback-based)
* @param task - Task data to process
* @param done - Completion callback (err, result) => void
*/
queue.unshift(task, done);
/**
* Add task to end of queue (promise-based)
* @param task - Task data to process
* @returns {Promise} Promise that resolves with task result
*/
queue.push(task);
/**
* Add task to beginning of queue (promise-based)
* @param task - Task data to process
* @returns {Promise} Promise that resolves with task result
*/
queue.unshift(task);Control queue execution state.
/**
* Pause task processing. Currently running tasks continue.
*/
queue.pause();
/**
* Resume paused task processing
*/
queue.resume();
/**
* Remove all waiting tasks and reset drain function
*/
queue.kill();
/**
* Remove all waiting tasks but call drain first
*/
queue.killAndDrain();Query queue status and contents.
/**
* Get number of currently running tasks
* @returns {number} number of running tasks
*/
queue.running();
/**
* Check if queue is idle (no running or waiting tasks)
* @returns {boolean} true if idle, false otherwise
*/
queue.idle();
/**
* Get number of tasks waiting in queue
* @returns {number} number of queued tasks
*/
queue.length();
/**
* Get all tasks waiting in queue
* @returns {Array} array of task values
*/
queue.getQueue();Configure global error handling for task failures.
/**
* Set global error handler for task failures
* @param handler - Error handler function called only when tasks fail (err, task) => void
*/
queue.error(handler);Additional methods available only on promise-based queues.
/**
* Wait for queue to drain completely
* @returns {Promise<void>} Promise that resolves when all tasks complete
*/
queue.drained();/**
* Current concurrency limit (readable/writable)
* Setting this adjusts concurrent task limit at runtime
* @type number (minimum 1)
*/
queue.concurrency;
/**
* Current pause state (read-only)
* Returns true when queue is paused, false otherwise
* @type boolean
*/
queue.paused;/**
* Function called when queue becomes empty and all tasks finish
* Can be reassigned at runtime
* @type () => void
*/
queue.drain;
/**
* Function called when last task is assigned to worker
* Can be reassigned at runtime
* @type () => void
*/
queue.empty;
/**
* Function called when queue hits concurrency limit
* Can be reassigned at runtime
* @type () => void
*/
queue.saturated;/**
* Callback-based worker function
* @param task - Task data to process
* @param callback - Completion callback (err, result) => void
*/
type Worker = (task, callback) => void;
/**
* Promise-based worker function
* @param task - Task data to process
* @returns Promise resolving to result
*/
type AsyncWorker = (task) => Promise;
/**
* Task completion callback
* @param err - Error if task failed, null if successful
* @param result - Task result if successful
*/
type Done = (err, result) => void;
/**
* Error handler for failed tasks
* @param err - Error that occurred
* @param task - Task that failed
*/
type ErrorHandler = (err, task) => void;reusify library to minimize garbage collection through task wrapper object reuse via internal Task constructor reuse patternfastseries package from same author// Global error handling - called only for task failures
queue.error((err, task) => {
console.error(`Task failed:`, task, err);
});
// Per-task error handling (callback API)
queue.push(task, (err, result) => {
if (err) {
console.error('Task error:', err);
return;
}
console.log('Success:', result);
});
// Per-task error handling (promise API)
try {
const result = await queue.push(task);
console.log('Success:', result);
} catch (err) {
console.error('Task error:', err);
}const queue = fastq.promise(apiWorker, 3);
async function apiWorker(url) {
const response = await fetch(url);
return response.json();
}
// Process URLs with max 3 concurrent requests
const results = await Promise.all([
queue.push('https://api.example.com/users/1'),
queue.push('https://api.example.com/users/2'),
queue.push('https://api.example.com/users/3'),
]);const queue = fastq(jobWorker, 5);
queue.drain = () => {
console.log('All jobs completed');
};
function jobWorker(job, callback) {
processJob(job)
.then(result => callback(null, result))
.catch(err => callback(err));
}
// Add multiple jobs
jobs.forEach(job => queue.push(job));const queue = fastq(worker, 1);
// Increase concurrency during high-load periods
if (isHighLoad()) {
queue.concurrency = 5;
}
// Reduce for low-priority tasks
if (isLowPriority()) {
queue.concurrency = 1;
}