0
# Queue Management
1
2
Core queue operations for adding, controlling, and monitoring promise-based tasks. Provides methods for adding individual tasks, batch operations, and comprehensive queue state management.
3
4
## Capabilities
5
6
### Task Addition
7
8
#### add Method
9
10
Adds a sync or async task to the queue and returns a promise that resolves with the task result.
11
12
```typescript { .api }
13
/**
14
* Adds a sync or async task to the queue. Always returns a promise.
15
* @param function_ - The task function to execute
16
* @param options - Optional configuration for the task
17
* @returns Promise that resolves with task result or void
18
*/
19
add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
20
21
/**
22
* Adds a task with throwOnTimeout enabled, guaranteeing a result
23
* @param function_ - The task function to execute
24
* @param options - Configuration with throwOnTimeout: true
25
* @returns Promise that resolves with task result (never void)
26
*/
27
add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
28
```
29
30
**Usage Examples:**
31
32
```typescript
33
import PQueue from "p-queue";
34
35
const queue = new PQueue({ concurrency: 2 });
36
37
// Basic task addition
38
const result = await queue.add(async () => {
39
const response = await fetch('https://api.example.com/users');
40
return response.json();
41
});
42
43
// Task with priority and ID
44
await queue.add(
45
async ({ signal }) => {
46
// Task can check for abort signal
47
if (signal?.aborted) throw signal.reason;
48
return processImportantData();
49
},
50
{
51
priority: 10,
52
id: 'important-task',
53
timeout: 5000
54
}
55
);
56
57
// Task with abort signal
58
const controller = new AbortController();
59
const taskPromise = queue.add(
60
async ({ signal }) => {
61
// Handle abortion within task
62
signal?.addEventListener('abort', () => cleanup());
63
return longRunningOperation();
64
},
65
{ signal: controller.signal }
66
);
67
68
// Can abort the task
69
setTimeout(() => controller.abort(), 1000);
70
```
71
72
#### addAll Method
73
74
Adds multiple sync or async tasks to the queue simultaneously.
75
76
```typescript { .api }
77
/**
78
* Same as .add(), but accepts an array of sync or async functions
79
* @param functions - Array of task functions to execute
80
* @param options - Optional configuration applied to all tasks
81
* @returns Promise that resolves when all functions are resolved
82
*/
83
addAll<TaskResultsType>(
84
functions: ReadonlyArray<Task<TaskResultsType>>,
85
options?: Partial<EnqueueOptionsType>
86
): Promise<Array<TaskResultsType | void>>;
87
88
/**
89
* Adds multiple tasks with throwOnTimeout enabled
90
* @param functions - Array of task functions to execute
91
* @param options - Configuration with throwOnTimeout: true
92
* @returns Promise with guaranteed results (no void values)
93
*/
94
addAll<TaskResultsType>(
95
functions: ReadonlyArray<Task<TaskResultsType>>,
96
options?: {throwOnTimeout: true} & Partial<Exclude<EnqueueOptionsType, 'throwOnTimeout'>>
97
): Promise<TaskResultsType[]>;
98
```
99
100
**Usage Examples:**
101
102
```typescript
103
// Process multiple files
104
const files = ['file1.txt', 'file2.txt', 'file3.txt'];
105
const results = await queue.addAll(
106
files.map(filename => async () => {
107
const data = await readFile(filename);
108
return processFileData(data);
109
}),
110
{ priority: 5 }
111
);
112
113
// Batch API requests with error handling
114
const urls = ['api/users', 'api/posts', 'api/comments'];
115
const responses = await queue.addAll(
116
urls.map(url => async ({ signal }) => {
117
try {
118
const response = await fetch(`https://api.example.com/${url}`, {
119
signal
120
});
121
return response.json();
122
} catch (error) {
123
console.error(`Failed to fetch ${url}:`, error);
124
return null;
125
}
126
})
127
);
128
```
129
130
### Queue Control
131
132
#### start Method
133
134
Starts or resumes executing enqueued tasks within concurrency limit.
135
136
```typescript { .api }
137
/**
138
* Start (or resume) executing enqueued tasks within concurrency limit.
139
* No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)
140
* @returns The queue instance for chaining
141
*/
142
start(): this;
143
```
144
145
**Usage Examples:**
146
147
```typescript
148
// Create paused queue
149
const queue = new PQueue({
150
concurrency: 2,
151
autoStart: false
152
});
153
154
// Add tasks while paused
155
queue.add(async () => task1());
156
queue.add(async () => task2());
157
queue.add(async () => task3());
158
159
console.log(queue.size); // 3 (tasks are queued but not running)
160
console.log(queue.pending); // 0 (no tasks executing)
161
162
// Start processing
163
queue.start();
164
165
console.log(queue.pending); // 2 (up to concurrency limit)
166
```
167
168
#### pause Method
169
170
Puts queue execution on hold, preventing new tasks from starting.
171
172
```typescript { .api }
173
/**
174
* Put queue execution on hold.
175
*/
176
pause(): void;
177
```
178
179
**Usage Examples:**
180
181
```typescript
182
const queue = new PQueue({ concurrency: 3 });
183
184
// Add some tasks
185
queue.add(async () => longTask1());
186
queue.add(async () => longTask2());
187
queue.add(async () => longTask3());
188
189
// Pause after 1 second
190
setTimeout(() => {
191
queue.pause();
192
console.log('Queue paused');
193
194
// Resume after another 2 seconds
195
setTimeout(() => {
196
queue.start();
197
console.log('Queue resumed');
198
}, 2000);
199
}, 1000);
200
```
201
202
#### clear Method
203
204
Clears all queued tasks that haven't started executing yet.
205
206
```typescript { .api }
207
/**
208
* Clear the queue.
209
*/
210
clear(): void;
211
```
212
213
**Usage Examples:**
214
215
```typescript
216
const queue = new PQueue({ concurrency: 1 });
217
218
// Add tasks
219
queue.add(async () => task1());
220
queue.add(async () => task2());
221
queue.add(async () => task3());
222
223
console.log(queue.size); // 2 (one task running, two queued)
224
225
// Clear queued tasks
226
queue.clear();
227
228
console.log(queue.size); // 0 (queued tasks cleared)
229
console.log(queue.pending); // 1 (running task continues)
230
```
231
232
### State Monitoring
233
234
#### onEmpty Method
235
236
Returns a promise that resolves when the queue becomes empty (no more tasks waiting).
237
238
```typescript { .api }
239
/**
240
* Can be called multiple times. Useful if you for example add additional items at a later time.
241
* @returns A promise that settles when the queue becomes empty.
242
*/
243
onEmpty(): Promise<void>;
244
```
245
246
#### onIdle Method
247
248
Returns a promise that resolves when the queue becomes empty and all promises have completed.
249
250
```typescript { .api }
251
/**
252
* The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished.
253
* .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
254
* @returns A promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.
255
*/
256
onIdle(): Promise<void>;
257
```
258
259
#### onSizeLessThan Method
260
261
Returns a promise that resolves when the queue size is less than the given limit.
262
263
```typescript { .api }
264
/**
265
* @param limit - The size limit to wait for
266
* @returns A promise that settles when the queue size is less than the given limit: queue.size < limit.
267
*
268
* If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan() before adding a new item.
269
*
270
* Note that this only limits the number of items waiting to start. There could still be up to concurrency jobs already running that this call does not include in its calculation.
271
*/
272
onSizeLessThan(limit: number): Promise<void>;
273
```
274
275
**Usage Examples:**
276
277
```typescript
278
const queue = new PQueue({ concurrency: 2 });
279
280
// Monitor queue states
281
queue.add(async () => {
282
await delay(1000);
283
return 'task1';
284
});
285
286
queue.add(async () => {
287
await delay(2000);
288
return 'task2';
289
});
290
291
// Wait for queue to be empty (no more tasks waiting)
292
await queue.onEmpty();
293
console.log('No more tasks in queue');
294
295
// Wait for all work to complete
296
await queue.onIdle();
297
console.log('All tasks completed');
298
299
// Limit queue growth
300
async function addTaskSafely(taskFn) {
301
// Wait if queue has too many items
302
await queue.onSizeLessThan(10);
303
return queue.add(taskFn);
304
}
305
```
306
307
### Task Configuration
308
309
Tasks added to the queue can be configured with various options:
310
311
```typescript { .api }
312
type QueueAddOptions = {
313
readonly priority?: number;
314
id?: string;
315
readonly signal?: AbortSignal;
316
timeout?: number;
317
throwOnTimeout?: boolean;
318
};
319
320
type TaskOptions = {
321
readonly signal?: AbortSignal;
322
};
323
324
type Task<TaskResultType> =
325
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
326
| ((options: TaskOptions) => TaskResultType);
327
```
328
329
**Configuration Examples:**
330
331
```typescript
332
// Priority-based execution (higher numbers = higher priority)
333
await queue.add(async () => criticalTask(), { priority: 10 });
334
await queue.add(async () => normalTask(), { priority: 0 });
335
await queue.add(async () => lowPriorityTask(), { priority: -5 });
336
337
// Task with timeout
338
await queue.add(
339
async () => {
340
// This task will timeout after 5 seconds
341
await slowOperation();
342
return result;
343
},
344
{
345
timeout: 5000,
346
throwOnTimeout: true // Will throw TimeoutError instead of returning void
347
}
348
);
349
350
// Task with unique ID for later priority updates
351
await queue.add(
352
async () => updateUserProfile(userId),
353
{ id: `profile-update-${userId}`, priority: 5 }
354
);
355
356
// Later, increase priority of specific task
357
queue.setPriority(`profile-update-${userId}`, 15);
358
```