0
# Concurrency Control
1
2
Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.
3
4
## Capabilities
5
6
### Concurrency Management
7
8
#### concurrency Property
9
10
Controls the maximum number of tasks that can run simultaneously.
11
12
```typescript { .api }
13
/**
14
* Get the current concurrency limit
15
*/
16
get concurrency(): number;
17
18
/**
19
* Set a new concurrency limit. Changes take effect immediately.
20
* @param newConcurrency - Number from 1 and up
21
* @throws TypeError if not a number >= 1
22
*/
23
set concurrency(newConcurrency: number);
24
```
25
26
**Usage Examples:**
27
28
```typescript
29
import PQueue from "p-queue";
30
31
const queue = new PQueue({ concurrency: 2 });
32
33
// Check current concurrency
34
console.log(queue.concurrency); // 2
35
36
// Add some tasks
37
queue.add(async () => task1());
38
queue.add(async () => task2());
39
queue.add(async () => task3());
40
queue.add(async () => task4());
41
42
// Initially 2 tasks run, 2 are queued
43
console.log(queue.pending); // 2
44
console.log(queue.size); // 2
45
46
// Increase concurrency dynamically
47
queue.concurrency = 4;
48
// Now all 4 tasks can run simultaneously
49
50
// Decrease concurrency
51
queue.concurrency = 1;
52
// New tasks will be limited to 1 at a time
53
```
54
55
### Queue State Inspection
56
57
#### size Property
58
59
Number of queued items waiting to run.
60
61
```typescript { .api }
62
/**
63
* Size of the queue, the number of queued items waiting to run.
64
*/
65
get size(): number;
66
```
67
68
#### pending Property
69
70
Number of running items (no longer in the queue).
71
72
```typescript { .api }
73
/**
74
* Number of running items (no longer in the queue).
75
*/
76
get pending(): number;
77
```
78
79
#### isPaused Property
80
81
Whether the queue is currently paused.
82
83
```typescript { .api }
84
/**
85
* Whether the queue is currently paused.
86
*/
87
get isPaused(): boolean;
88
```
89
90
**Usage Examples:**
91
92
```typescript
93
const queue = new PQueue({ concurrency: 3 });
94
95
// Monitor queue state
96
console.log(`Queued: ${queue.size}, Running: ${queue.pending}, Paused: ${queue.isPaused}`);
97
98
// Add tasks and monitor changes
99
queue.add(async () => delay(1000));
100
queue.add(async () => delay(1000));
101
queue.add(async () => delay(1000));
102
queue.add(async () => delay(1000));
103
queue.add(async () => delay(1000));
104
105
// Check state after adding tasks
106
console.log(`Queued: ${queue.size}, Running: ${queue.pending}`); // Queued: 2, Running: 3
107
108
// Pause and check state
109
queue.pause();
110
console.log(`Paused: ${queue.isPaused}`); // Paused: true
111
```
112
113
#### timeout Property
114
115
Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already. Applies to each future operation.
116
117
```typescript { .api }
118
/**
119
* Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already.
120
* Applies to each future operation.
121
*/
122
timeout?: number;
123
```
124
125
**Usage Examples:**
126
127
```typescript
128
import PQueue from "p-queue";
129
130
const queue = new PQueue({ concurrency: 2, timeout: 5000 });
131
132
// Set timeout for all future operations
133
queue.timeout = 3000;
134
135
// Add task that will timeout after 3 seconds
136
await queue.add(async () => {
137
// This operation will timeout after 3 seconds
138
await someSlowOperation();
139
});
140
141
// Individual tasks can override the queue timeout
142
await queue.add(async () => {
143
return fastOperation();
144
}, { timeout: 1000 }); // This task has 1 second timeout
145
```
146
147
#### sizeBy Method
148
149
Get the size of the queue filtered by specific options.
150
151
```typescript { .api }
152
/**
153
* Size of the queue, filtered by the given options.
154
* For example, this can be used to find the number of items remaining in the queue with a specific priority level.
155
* @param options - Filter options to match against queued items
156
* @returns Number of matching items in the queue
157
*/
158
sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;
159
```
160
161
**Usage Examples:**
162
163
```typescript
164
// Add tasks with different priorities
165
await queue.add(async () => task1(), { priority: 10 });
166
await queue.add(async () => task2(), { priority: 5 });
167
await queue.add(async () => task3(), { priority: 10 });
168
await queue.add(async () => task4(), { priority: 0 });
169
170
// Check queue size by priority
171
console.log(queue.sizeBy({ priority: 10 })); // 2 tasks with priority 10
172
console.log(queue.sizeBy({ priority: 5 })); // 1 task with priority 5
173
console.log(queue.size); // 4 total tasks
174
```
175
176
### Priority Management
177
178
#### setPriority Method
179
180
Updates the priority of a queued task by its ID, affecting execution order.
181
182
```typescript { .api }
183
/**
184
* Updates the priority of a promise function by its id, affecting its execution order.
185
* Requires a defined concurrency limit to take effect.
186
*
187
* @param id - The unique identifier of the task
188
* @param priority - The new priority level (higher numbers = higher priority)
189
* @throws ReferenceError if no task with the given ID exists
190
*/
191
setPriority(id: string, priority: number): void;
192
```
193
194
**Usage Examples:**
195
196
```typescript
197
const queue = new PQueue({ concurrency: 1 });
198
199
// Add tasks with IDs and priorities
200
queue.add(async () => 'π¦', { priority: 1 });
201
queue.add(async () => 'π¦', { priority: 0, id: 'crab-task' });
202
queue.add(async () => 'π¦', { priority: 1 });
203
queue.add(async () => 'π¦', { priority: 1 });
204
205
// Before execution, increase priority of crab task
206
queue.setPriority('crab-task', 2);
207
// Now crab task will run second (after first unicorn task that's already running)
208
209
// Decrease priority example
210
const queue2 = new PQueue({ concurrency: 1 });
211
212
queue2.add(async () => 'π¦', { priority: 1 });
213
queue2.add(async () => 'π¦', { priority: 1, id: 'crab-task' });
214
queue2.add(async () => 'π¦');
215
queue2.add(async () => 'π¦', { priority: 0 });
216
217
queue2.setPriority('crab-task', -1);
218
// Now crab task will execute last
219
```
220
221
### Interval-Based Throttling
222
223
p-queue supports interval-based rate limiting to prevent overwhelming external services.
224
225
#### Configuration Options
226
227
```typescript { .api }
228
type Options<QueueType, QueueOptions> = {
229
readonly intervalCap?: number; // Max runs per interval (default: Infinity)
230
readonly interval?: number; // Interval length in ms (default: 0)
231
readonly carryoverConcurrencyCount?: boolean; // Carry over pending tasks (default: false)
232
// ... other options
233
};
234
```
235
236
**Usage Examples:**
237
238
```typescript
239
// Rate limiting: Max 10 requests per 1 second
240
const apiQueue = new PQueue({
241
concurrency: 5,
242
intervalCap: 10,
243
interval: 1000
244
});
245
246
// This will spread requests to comply with rate limits
247
for (let i = 0; i < 50; i++) {
248
apiQueue.add(async () => {
249
const response = await fetch(`https://api.example.com/data/${i}`);
250
return response.json();
251
});
252
}
253
254
// Advanced rate limiting with carryover
255
const restrictedQueue = new PQueue({
256
concurrency: 2,
257
intervalCap: 5,
258
interval: 2000,
259
carryoverConcurrencyCount: true // Tasks in progress count toward next interval
260
});
261
262
// Burst prevention: Max 3 operations per 5 seconds
263
const burstQueue = new PQueue({
264
concurrency: 10, // High concurrency allowed
265
intervalCap: 3, // But only 3 per interval
266
interval: 5000 // Every 5 seconds
267
});
268
```
269
270
### Advanced Configuration
271
272
#### Queue Initialization Options
273
274
```typescript { .api }
275
type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
276
readonly concurrency?: number; // Concurrency limit (default: Infinity, min: 1)
277
readonly autoStart?: boolean; // Auto-execute tasks when added (default: true)
278
readonly queueClass?: new () => QueueType; // Custom queue implementation
279
readonly intervalCap?: number; // Max runs in interval (default: Infinity, min: 1)
280
readonly interval?: number; // Interval length in ms (default: 0, min: 0)
281
readonly carryoverConcurrencyCount?: boolean; // Carry over tasks to next interval (default: false)
282
timeout?: number; // Default timeout for all tasks
283
throwOnTimeout?: boolean; // Whether timeout throws exception (default: false)
284
};
285
```
286
287
**Advanced Usage Examples:**
288
289
```typescript
290
// Comprehensive configuration
291
const advancedQueue = new PQueue({
292
concurrency: 3, // Run up to 3 tasks simultaneously
293
autoStart: false, // Don't start automatically (manual control)
294
intervalCap: 10, // Max 10 operations per interval
295
interval: 60000, // 1 minute intervals
296
carryoverConcurrencyCount: true, // Count running tasks toward next interval
297
timeout: 30000, // 30 second default timeout
298
throwOnTimeout: false // Return void on timeout instead of throwing
299
});
300
301
// Add tasks then start manually
302
queue.add(async () => heavyTask1());
303
queue.add(async () => heavyTask2());
304
queue.add(async () => heavyTask3());
305
306
// Start when ready
307
queue.start();
308
309
// Custom queue implementation
310
class FIFOQueue {
311
constructor() {
312
this.queue = [];
313
}
314
315
get size() { return this.queue.length; }
316
317
enqueue(run, options) {
318
this.queue.push({ run, ...options });
319
}
320
321
dequeue() {
322
return this.queue.shift()?.run;
323
}
324
325
filter(options) {
326
return this.queue
327
.filter(item => item.priority === options.priority)
328
.map(item => item.run);
329
}
330
331
setPriority(id, priority) {
332
const item = this.queue.find(item => item.id === id);
333
if (item) item.priority = priority;
334
}
335
}
336
337
const fifoQueue = new PQueue({
338
queueClass: FIFOQueue,
339
concurrency: 2
340
});
341
```
342
343
### Performance Monitoring
344
345
```typescript
346
// Monitor queue performance
347
const queue = new PQueue({ concurrency: 5 });
348
349
function logQueueState() {
350
console.log({
351
size: queue.size,
352
pending: queue.pending,
353
isPaused: queue.isPaused,
354
concurrency: queue.concurrency
355
});
356
}
357
358
// Log state periodically
359
const monitor = setInterval(logQueueState, 1000);
360
361
// Stop monitoring when idle
362
queue.onIdle().then(() => {
363
clearInterval(monitor);
364
console.log('Queue processing complete');
365
});
366
```