or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/npm-d3-queue

Evaluate asynchronous tasks with configurable concurrency.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/d3-queue@3.0.x

To install, run

npx @tessl/cli install tessl/npm-d3-queue@3.0.0

index.mddocs/

d3-queue

d3-queue provides a lightweight asynchronous task queue management system that enables developers to control the concurrency of parallel operations. It offers a simple yet powerful API for deferring tasks with configurable concurrency limits, supporting both infinite concurrency (parallel execution) and limited concurrency (controlled parallel or series execution). The library features task abortion capabilities, error handling with fail-fast behavior, and seamless integration with Node.js callback patterns and web APIs.

Package Information

  • Package Name: d3-queue
  • Package Type: npm
  • Language: JavaScript
  • Installation: npm install d3-queue

Core Imports

CommonJS:

const { queue } = require("d3-queue");

ES Modules:

import { queue } from "d3-queue";

UMD/Browser Global:

<script src="https://d3js.org/d3-queue.v3.min.js"></script>
<script>
var q = d3.queue();
</script>

Basic Usage

const { queue } = require("d3-queue");

// Create a queue with unlimited concurrency
const q = queue();

// Add tasks to the queue
q.defer(function(callback) {
  setTimeout(() => {
    console.log("Task 1 complete");
    callback(null, "result1");
  }, 100);
});

q.defer(function(callback) {
  setTimeout(() => {
    console.log("Task 2 complete");
    callback(null, "result2");
  }, 50);
});

// Execute all tasks and handle results
q.await(function(error, result1, result2) {
  if (error) throw error;
  console.log("All tasks complete:", result1, result2);
});

Architecture

d3-queue is built around a simple yet powerful design:

  • Queue Instance: Manages task execution with configurable concurrency limits
  • Task Deferral: Tasks are queued but not immediately executed, allowing for batching
  • Concurrency Control: Limits number of simultaneously executing tasks (1 = serial, Infinity = parallel)
  • Callback Pattern: Uses Node.js-style error-first callbacks for task completion
  • Error Handling: Fail-fast behavior where first error stops remaining tasks from starting
  • Abort Support: Tasks can provide cleanup via abort() method for cancellation

Capabilities

Queue Creation

Creates a new queue instance with configurable concurrency control.

function queue(concurrency?: number): Queue;

Parameters:

  • concurrency (optional): Maximum number of concurrent tasks. Defaults to Infinity for unlimited concurrency. Must be >= 1.

Returns: Queue instance for method chaining

Usage Examples:

// Unlimited concurrency (all tasks run in parallel)
const parallelQueue = queue();

// Limited concurrency (at most 3 tasks run concurrently)
const limitedQueue = queue(3);

// Serial execution (tasks run one at a time)
const serialQueue = queue(1);

Task Deferral

Adds asynchronous tasks to the queue with optional arguments.

queue.defer(task: TaskFunction, ...arguments: any[]): Queue;

interface TaskFunction {
  (callback: TaskCallback, ...args: any[]): void | AbortHandle;
}

interface TaskCallback {
  (error: any, result?: any): void;
}

interface AbortHandle {
  abort(): void;
}

Parameters:

  • task: Asynchronous function that receives arguments plus a callback as the last parameter
  • ...arguments: Additional arguments passed to the task function before the callback

Returns: Queue instance for method chaining

Task Requirements:

  • Must call the provided callback when complete
  • Callback signature: callback(error, result)
  • First argument should be null for success, or error for failure
  • Can optionally return an object with abort() method for cleanup

Usage Examples:

// Simple task
q.defer(function(callback) {
  setTimeout(() => callback(null, "done"), 100);
});

// Task with arguments
q.defer(function(name, delay, callback) {
  setTimeout(() => callback(null, `Hello ${name}`), delay);
}, "Alice", 250);

// Node.js API compatibility
q.defer(fs.stat, "/path/to/file");

// Abortable task
q.defer(function(callback) {
  const timeoutId = setTimeout(() => callback(null, "completed"), 1000);
  return {
    abort() {
      clearTimeout(timeoutId);
    }
  };
});

Result Handling - Individual Results

Sets callback to be invoked when all tasks complete, passing individual results as separate arguments.

queue.await(callback: AwaitCallback): Queue;

interface AwaitCallback {
  (error: any, ...results: any[]): void;
}

Parameters:

  • callback: Function called with error as first argument, followed by individual task results

Returns: Queue instance

Callback Arguments:

  • First argument: error (null if no error occurred)
  • Subsequent arguments: Individual task results in order of deferral

Usage Examples:

q.defer(task1);
q.defer(task2);
q.defer(task3);

q.await(function(error, result1, result2, result3) {
  if (error) {
    console.error("Task failed:", error);
    return;
  }
  console.log("Results:", result1, result2, result3);
});

Result Handling - Array Results

Sets callback to be invoked when all tasks complete, passing results as an array.

queue.awaitAll(callback: AwaitAllCallback): Queue;

interface AwaitAllCallback {
  (error: any, results?: any[]): void;
}

Parameters:

  • callback: Function called with error as first argument and results array as second

Returns: Queue instance

Callback Arguments:

  • First argument: error (null if no error occurred)
  • Second argument: Array of task results in order of deferral

Usage Examples:

const tasks = [task1, task2, task3];
const q = queue();

tasks.forEach(task => q.defer(task));

q.awaitAll(function(error, results) {
  if (error) {
    console.error("Task failed:", error);
    return;
  }
  console.log("All results:", results);
  results.forEach((result, index) => {
    console.log(`Task ${index + 1}:`, result);
  });
});

Queue Abortion

Aborts all active and pending tasks, immediately invoking the await callback with an abort error.

queue.abort(): Queue;

Returns: Queue instance

Behavior:

  • Calls abort() method on active tasks that provide it
  • Prevents pending tasks from starting
  • Immediately invokes await/awaitAll callback with Error("abort")
  • Safe to call multiple times

Usage Examples:

const q = queue()
  .defer(longRunningTask)
  .defer(anotherTask)
  .awaitAll(function(error, results) {
    if (error && error.message === "abort") {
      console.log("Queue was aborted");
      return;
    }
    console.log("Tasks completed:", results);
  });

// Abort after 5 seconds
setTimeout(() => {
  q.abort();
}, 5000);

Advanced Usage Patterns

Concurrency Control

// Process files in batches of 5
const fileQueue = queue(5);
const filePaths = ["file1.txt", "file2.txt", /* ... many files ... */];

filePaths.forEach(path => {
  fileQueue.defer(fs.readFile, path, "utf8");
});

fileQueue.awaitAll((error, fileContents) => {
  if (error) throw error;
  console.log(`Processed ${fileContents.length} files`);
});

Error Handling

queue()
  .defer(function(callback) {
    // This task will fail
    setTimeout(() => callback(new Error("Task failed")), 100);
  })
  .defer(function(callback) {
    // This task won't run due to the error above
    setTimeout(() => callback(null, "success"), 50);
  })
  .await(function(error, result1, result2) {
    console.log("Error:", error.message); // "Task failed"
    console.log("Results:", result1, result2); // undefined, undefined
  });

Integration with Promises

function promisifyQueue(tasks) {
  return new Promise((resolve, reject) => {
    const q = queue();
    
    tasks.forEach(task => q.defer(task));
    
    q.awaitAll((error, results) => {
      if (error) reject(error);
      else resolve(results);
    });
  });
}

// Usage
promisifyQueue([task1, task2, task3])
  .then(results => console.log("Success:", results))
  .catch(error => console.error("Failed:", error));

Types

interface Queue {
  defer(task: TaskFunction, ...args: any[]): Queue;
  await(callback: AwaitCallback): Queue;
  awaitAll(callback: AwaitAllCallback): Queue;
  abort(): Queue;
}

interface TaskFunction {
  (callback: TaskCallback, ...args: any[]): void | AbortHandle;
}

interface TaskCallback {
  (error: any, result?: any): void;
}

interface AwaitCallback {
  (error: any, ...results: any[]): void;
}

interface AwaitAllCallback {
  (error: any, results?: any[]): void;
}

interface AbortHandle {
  abort(): void;
}

Error Conditions

  • queue(concurrency) throws Error with message "invalid concurrency" if concurrency is not >= 1
  • queue.defer() throws Error with message "invalid callback" if task is not a function
  • queue.defer() throws Error with message "defer after await" if called after await/awaitAll
  • queue.await() throws Error with message "invalid callback" if callback is not a function
  • queue.await() throws Error with message "multiple await" if called multiple times or after awaitAll
  • queue.awaitAll() throws Error with message "invalid callback" if callback is not a function
  • queue.awaitAll() throws Error with message "multiple await" if called multiple times or after await