or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

index.mddocs/

fastq

fastq provides a fast, in-memory work queue for Node.js with configurable concurrency. It supports both callback-based and promise-based APIs, enabling efficient task processing with minimal overhead through object reuse patterns and optimized performance.

Package Information

  • Package Name: fastq
  • Package Type: npm
  • Language: JavaScript with TypeScript definitions
  • Installation: npm install fastq

Core Imports

const fastq = require('fastq');

ES Modules:

import fastq from 'fastq';

TypeScript:

import * as fastq from "fastq";
import type { queue, queueAsPromised, done } from "fastq";

Basic Usage

Callback-based API

const fastq = require('fastq');

// Create queue with worker function and concurrency limit
const queue = fastq(worker, 1);

// Add tasks to queue
queue.push('hello', (err, result) => {
  if (err) throw err;
  console.log('Result:', result);
});

function worker(task, callback) {
  // Process task
  callback(null, `processed: ${task}`);
}

Promise-based API

const fastq = require('fastq');

// Create promise-based queue
const queue = fastq.promise(asyncWorker, 2);

// Add tasks with promises
const result = await queue.push('hello');
console.log('Result:', result);

async function asyncWorker(task) {
  // Process task asynchronously
  return `processed: ${task}`;
}

Context Binding

const context = { prefix: 'LOG: ' };
const queue = fastq(context, worker, 1);

function worker(task, callback) {
  console.log(this.prefix + task); // Access context via 'this'
  callback(null, `${this.prefix}processed: ${task}`);
}

TypeScript Usage

import * as fastq from "fastq";
import type { queue, queueAsPromised, done } from "fastq";

// Callback-based with types
type Task = { id: number };
const callbackQueue: queue<Task> = fastq(worker, 1);

function worker(arg: Task, cb: done) {
  console.log(arg.id);
  cb(null, `processed: ${arg.id}`);
}

// Promise-based with types
const promiseQueue: queueAsPromised<Task> = fastq.promise(asyncWorker, 1);

async function asyncWorker(arg: Task): Promise<string> {
  return `processed: ${arg.id}`;
}

Capabilities

Queue Creation

Create queue instances with worker functions and concurrency control.

/**
 * Create a callback-based queue
 * @param context - Optional context object bound to worker as 'this'
 * @param worker - Worker function to process tasks
 * @param concurrency - Number of concurrent tasks (minimum 1)
 * @returns queue instance
 */
function fastq(context, worker, concurrency);
function fastq(worker, concurrency);

/**
 * Create a promise-based queue
 * @param context - Optional context object bound to worker as 'this'  
 * @param worker - Async worker function returning Promise
 * @param concurrency - Number of concurrent tasks (minimum 1)
 * @returns queueAsPromised instance
 */
fastq.promise(context, worker, concurrency);
fastq.promise(worker, concurrency);

Task Scheduling

Add tasks to the queue for processing.

/**
 * Add task to end of queue (callback-based)
 * @param task - Task data to process
 * @param done - Completion callback (err, result) => void
 */
queue.push(task, done);

/**
 * Add task to beginning of queue (callback-based)
 * @param task - Task data to process
 * @param done - Completion callback (err, result) => void
 */
queue.unshift(task, done);

/**
 * Add task to end of queue (promise-based)
 * @param task - Task data to process
 * @returns {Promise} Promise that resolves with task result
 */
queue.push(task);

/**
 * Add task to beginning of queue (promise-based)
 * @param task - Task data to process
 * @returns {Promise} Promise that resolves with task result
 */
queue.unshift(task);

Queue Control

Control queue execution state.

/**
 * Pause task processing. Currently running tasks continue.
 */
queue.pause();

/**
 * Resume paused task processing
 */
queue.resume();

/**
 * Remove all waiting tasks and reset drain function
 */
queue.kill();

/**
 * Remove all waiting tasks but call drain first
 */
queue.killAndDrain();

Queue Inspection

Query queue status and contents.

/**
 * Get number of currently running tasks
 * @returns {number} number of running tasks
 */
queue.running();

/**
 * Check if queue is idle (no running or waiting tasks)
 * @returns {boolean} true if idle, false otherwise
 */
queue.idle();

/**
 * Get number of tasks waiting in queue
 * @returns {number} number of queued tasks
 */
queue.length();

/**
 * Get all tasks waiting in queue
 * @returns {Array} array of task values
 */
queue.getQueue();

Error Handling

Configure global error handling for task failures.

/**
 * Set global error handler for task failures
 * @param handler - Error handler function called only when tasks fail (err, task) => void
 */
queue.error(handler);

Promise-specific Methods

Additional methods available only on promise-based queues.

/**
 * Wait for queue to drain completely
 * @returns {Promise<void>} Promise that resolves when all tasks complete
 */
queue.drained();

Properties

Configuration Properties

/**
 * Current concurrency limit (readable/writable)
 * Setting this adjusts concurrent task limit at runtime
 * @type number (minimum 1)
 */
queue.concurrency;

/**
 * Current pause state (read-only)
 * Returns true when queue is paused, false otherwise
 * @type boolean
 */
queue.paused;

Event Hook Properties

/**
 * Function called when queue becomes empty and all tasks finish
 * Can be reassigned at runtime
 * @type () => void
 */
queue.drain;

/**
 * Function called when last task is assigned to worker
 * Can be reassigned at runtime  
 * @type () => void
 */
queue.empty;

/**
 * Function called when queue hits concurrency limit
 * Can be reassigned at runtime
 * @type () => void
 */
queue.saturated;

Types

/**
 * Callback-based worker function
 * @param task - Task data to process
 * @param callback - Completion callback (err, result) => void
 */
type Worker = (task, callback) => void;

/**
 * Promise-based worker function
 * @param task - Task data to process  
 * @returns Promise resolving to result
 */
type AsyncWorker = (task) => Promise;

/**
 * Task completion callback
 * @param err - Error if task failed, null if successful
 * @param result - Task result if successful
 */
type Done = (err, result) => void;

/**
 * Error handler for failed tasks
 * @param err - Error that occurred
 * @param task - Task that failed
 */
type ErrorHandler = (err, task) => void;

Performance Features

  • Object Pooling: Uses reusify library to minimize garbage collection through task wrapper object reuse via internal Task constructor reuse pattern
  • Direct Execution: Tasks run immediately when under concurrency limit, avoiding queue overhead
  • Minimal Allocation: Optimized linked-list implementation for efficient queue operations with head/tail pointers
  • Zero Overhead Series: For sequential processing, consider fastseries package from same author

Error Handling Patterns

// Global error handling - called only for task failures
queue.error((err, task) => {
  console.error(`Task failed:`, task, err);
});

// Per-task error handling (callback API)
queue.push(task, (err, result) => {
  if (err) {
    console.error('Task error:', err);
    return;
  }
  console.log('Success:', result);
});

// Per-task error handling (promise API)
try {
  const result = await queue.push(task);
  console.log('Success:', result);
} catch (err) {
  console.error('Task error:', err);
}

Common Usage Patterns

Rate-limited API Calls

const queue = fastq.promise(apiWorker, 3);

async function apiWorker(url) {
  const response = await fetch(url);
  return response.json();
}

// Process URLs with max 3 concurrent requests
const results = await Promise.all([
  queue.push('https://api.example.com/users/1'),
  queue.push('https://api.example.com/users/2'),
  queue.push('https://api.example.com/users/3'),
]);

Background Job Processing

const queue = fastq(jobWorker, 5);

queue.drain = () => {
  console.log('All jobs completed');
};

function jobWorker(job, callback) {
  processJob(job)
    .then(result => callback(null, result))
    .catch(err => callback(err));
}

// Add multiple jobs
jobs.forEach(job => queue.push(job));

Dynamic Concurrency Adjustment

const queue = fastq(worker, 1);

// Increase concurrency during high-load periods
if (isHighLoad()) {
  queue.concurrency = 5;
}

// Reduce for low-priority tasks
if (isLowPriority()) {
  queue.concurrency = 1;
}