d3-queue provides a lightweight asynchronous task queue management system that enables developers to control the concurrency of parallel operations. It offers a simple yet powerful API for deferring tasks with configurable concurrency limits, supporting both infinite concurrency (parallel execution) and limited concurrency (controlled parallel or series execution). The library features task abortion capabilities, error handling with fail-fast behavior, and seamless integration with Node.js callback patterns and web APIs.
npm install d3-queueCommonJS:
const { queue } = require("d3-queue");ES Modules:
import { queue } from "d3-queue";UMD/Browser Global:
<script src="https://d3js.org/d3-queue.v3.min.js"></script>
<script>
var q = d3.queue();
</script>const { queue } = require("d3-queue");
// Create a queue with unlimited concurrency
const q = queue();
// Add tasks to the queue
q.defer(function(callback) {
setTimeout(() => {
console.log("Task 1 complete");
callback(null, "result1");
}, 100);
});
q.defer(function(callback) {
setTimeout(() => {
console.log("Task 2 complete");
callback(null, "result2");
}, 50);
});
// Execute all tasks and handle results
q.await(function(error, result1, result2) {
if (error) throw error;
console.log("All tasks complete:", result1, result2);
});d3-queue is built around a simple yet powerful design:
Creates a new queue instance with configurable concurrency control.
function queue(concurrency?: number): Queue;Parameters:
concurrency (optional): Maximum number of concurrent tasks. Defaults to Infinity for unlimited concurrency. Must be >= 1.Returns: Queue instance for method chaining
Usage Examples:
// Unlimited concurrency (all tasks run in parallel)
const parallelQueue = queue();
// Limited concurrency (at most 3 tasks run concurrently)
const limitedQueue = queue(3);
// Serial execution (tasks run one at a time)
const serialQueue = queue(1);Adds asynchronous tasks to the queue with optional arguments.
queue.defer(task: TaskFunction, ...arguments: any[]): Queue;
interface TaskFunction {
(callback: TaskCallback, ...args: any[]): void | AbortHandle;
}
interface TaskCallback {
(error: any, result?: any): void;
}
interface AbortHandle {
abort(): void;
}Parameters:
task: Asynchronous function that receives arguments plus a callback as the last parameter...arguments: Additional arguments passed to the task function before the callbackReturns: Queue instance for method chaining
Task Requirements:
callback(error, result)null for success, or error for failureabort() method for cleanupUsage Examples:
// Simple task
q.defer(function(callback) {
setTimeout(() => callback(null, "done"), 100);
});
// Task with arguments
q.defer(function(name, delay, callback) {
setTimeout(() => callback(null, `Hello ${name}`), delay);
}, "Alice", 250);
// Node.js API compatibility
q.defer(fs.stat, "/path/to/file");
// Abortable task
q.defer(function(callback) {
const timeoutId = setTimeout(() => callback(null, "completed"), 1000);
return {
abort() {
clearTimeout(timeoutId);
}
};
});Sets callback to be invoked when all tasks complete, passing individual results as separate arguments.
queue.await(callback: AwaitCallback): Queue;
interface AwaitCallback {
(error: any, ...results: any[]): void;
}Parameters:
callback: Function called with error as first argument, followed by individual task resultsReturns: Queue instance
Callback Arguments:
error (null if no error occurred)Usage Examples:
q.defer(task1);
q.defer(task2);
q.defer(task3);
q.await(function(error, result1, result2, result3) {
if (error) {
console.error("Task failed:", error);
return;
}
console.log("Results:", result1, result2, result3);
});Sets callback to be invoked when all tasks complete, passing results as an array.
queue.awaitAll(callback: AwaitAllCallback): Queue;
interface AwaitAllCallback {
(error: any, results?: any[]): void;
}Parameters:
callback: Function called with error as first argument and results array as secondReturns: Queue instance
Callback Arguments:
error (null if no error occurred)Usage Examples:
const tasks = [task1, task2, task3];
const q = queue();
tasks.forEach(task => q.defer(task));
q.awaitAll(function(error, results) {
if (error) {
console.error("Task failed:", error);
return;
}
console.log("All results:", results);
results.forEach((result, index) => {
console.log(`Task ${index + 1}:`, result);
});
});Aborts all active and pending tasks, immediately invoking the await callback with an abort error.
queue.abort(): Queue;Returns: Queue instance
Behavior:
abort() method on active tasks that provide itError("abort")Usage Examples:
const q = queue()
.defer(longRunningTask)
.defer(anotherTask)
.awaitAll(function(error, results) {
if (error && error.message === "abort") {
console.log("Queue was aborted");
return;
}
console.log("Tasks completed:", results);
});
// Abort after 5 seconds
setTimeout(() => {
q.abort();
}, 5000);// Process files in batches of 5
const fileQueue = queue(5);
const filePaths = ["file1.txt", "file2.txt", /* ... many files ... */];
filePaths.forEach(path => {
fileQueue.defer(fs.readFile, path, "utf8");
});
fileQueue.awaitAll((error, fileContents) => {
if (error) throw error;
console.log(`Processed ${fileContents.length} files`);
});queue()
.defer(function(callback) {
// This task will fail
setTimeout(() => callback(new Error("Task failed")), 100);
})
.defer(function(callback) {
// This task won't run due to the error above
setTimeout(() => callback(null, "success"), 50);
})
.await(function(error, result1, result2) {
console.log("Error:", error.message); // "Task failed"
console.log("Results:", result1, result2); // undefined, undefined
});function promisifyQueue(tasks) {
return new Promise((resolve, reject) => {
const q = queue();
tasks.forEach(task => q.defer(task));
q.awaitAll((error, results) => {
if (error) reject(error);
else resolve(results);
});
});
}
// Usage
promisifyQueue([task1, task2, task3])
.then(results => console.log("Success:", results))
.catch(error => console.error("Failed:", error));interface Queue {
defer(task: TaskFunction, ...args: any[]): Queue;
await(callback: AwaitCallback): Queue;
awaitAll(callback: AwaitAllCallback): Queue;
abort(): Queue;
}
interface TaskFunction {
(callback: TaskCallback, ...args: any[]): void | AbortHandle;
}
interface TaskCallback {
(error: any, result?: any): void;
}
interface AwaitCallback {
(error: any, ...results: any[]): void;
}
interface AwaitAllCallback {
(error: any, results?: any[]): void;
}
interface AbortHandle {
abort(): void;
}queue(concurrency) throws Error with message "invalid concurrency" if concurrency is not >= 1queue.defer() throws Error with message "invalid callback" if task is not a functionqueue.defer() throws Error with message "defer after await" if called after await/awaitAllqueue.await() throws Error with message "invalid callback" if callback is not a functionqueue.await() throws Error with message "multiple await" if called multiple times or after awaitAllqueue.awaitAll() throws Error with message "invalid callback" if callback is not a functionqueue.awaitAll() throws Error with message "multiple await" if called multiple times or after await