0
# Task Queues
1
2
Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.
3
4
## Capabilities
5
6
### TaskQueue Interface
7
8
Base interface that all task queue implementations must satisfy.
9
10
```typescript { .api }
11
/**
12
* Interface for task queue implementations
13
*/
14
interface TaskQueue {
15
/** Current number of tasks in the queue */
16
readonly size: number;
17
18
/**
19
* Remove and return the first task from the queue
20
* @returns First task or null if queue is empty
21
*/
22
shift(): Task | null;
23
24
/**
25
* Remove a specific task from the queue
26
* @param task - Task to remove
27
*/
28
remove(task: Task): void;
29
30
/**
31
* Add a task to the end of the queue
32
* @param task - Task to add
33
*/
34
push(task: Task): void;
35
}
36
37
/**
38
* Base task interface
39
*/
40
interface Task {
41
readonly [queueOptionsSymbol]: object | null;
42
}
43
```
44
45
### ArrayTaskQueue
46
47
Simple array-based FIFO task queue implementation suitable for most use cases.
48
49
```typescript { .api }
50
/**
51
* Simple array-based FIFO task queue implementation
52
* Best for: General purpose, variable queue sizes, simple scenarios
53
* Performance: O(n) for shift operations due to array shifting
54
*/
55
class ArrayTaskQueue implements TaskQueue {
56
readonly size: number;
57
58
/**
59
* Remove and return the first task (FIFO)
60
* @returns First task or null if empty
61
*/
62
shift(): Task | null;
63
64
/**
65
* Add task to end of queue
66
* @param task - Task to enqueue
67
*/
68
push(task: Task): void;
69
70
/**
71
* Remove specific task from queue
72
* @param task - Task to remove
73
*/
74
remove(task: Task): void;
75
}
76
```
77
78
**Usage Examples:**
79
80
```typescript
81
import { Piscina, ArrayTaskQueue } from "piscina";
82
83
// Use ArrayTaskQueue explicitly
84
const pool = new Piscina({
85
filename: "worker.js",
86
taskQueue: new ArrayTaskQueue()
87
});
88
89
// ArrayTaskQueue is also the default fallback
90
const defaultPool = new Piscina({
91
filename: "worker.js"
92
// Uses ArrayTaskQueue internally when no custom queue specified
93
});
94
```
95
96
### FixedQueue
97
98
High-performance fixed-size circular buffer queue implementation optimized for Node.js V8 engine.
99
100
```typescript { .api }
101
/**
102
* High-performance fixed-size circular buffer queue implementation
103
* Best for: High-throughput scenarios, predictable memory usage
104
* Performance: O(1) for all operations
105
* Based on Node.js internal implementation, optimized for V8
106
*/
107
class FixedQueue implements TaskQueue {
108
readonly size: number;
109
110
/**
111
* Remove and return the first task (FIFO)
112
* @returns First task or null if empty
113
*/
114
shift(): Task | null;
115
116
/**
117
* Add task to end of queue
118
* @param task - Task to enqueue
119
*/
120
push(task: Task): void;
121
122
/**
123
* Remove specific task from queue
124
* @param task - Task to remove
125
*/
126
remove(task: Task): void;
127
}
128
```
129
130
**Usage Examples:**
131
132
```typescript
133
import { Piscina, FixedQueue } from "piscina";
134
135
// Use FixedQueue for high-performance scenarios
136
const highThroughputPool = new Piscina({
137
filename: "worker.js",
138
taskQueue: new FixedQueue(),
139
maxThreads: 16,
140
concurrentTasksPerWorker: 4
141
});
142
143
// FixedQueue is the default when not specified
144
const pool = new Piscina({
145
filename: "worker.js"
146
// Uses FixedQueue by default
147
});
148
```
149
150
### Queue Validation
151
152
Utility function to validate task queue implementations.
153
154
```typescript { .api }
155
/**
156
* Validates that an object implements the TaskQueue interface
157
* @param value - Object to validate
158
* @returns True if object is a valid TaskQueue
159
*/
160
function isTaskQueue(value: TaskQueue): boolean;
161
```
162
163
**Usage Examples:**
164
165
```typescript
166
import { isTaskQueue, ArrayTaskQueue, FixedQueue } from "piscina";
167
168
const arrayQueue = new ArrayTaskQueue();
169
const fixedQueue = new FixedQueue();
170
const invalidQueue = { size: 0 }; // Missing required methods
171
172
console.log(isTaskQueue(arrayQueue)); // true
173
console.log(isTaskQueue(fixedQueue)); // true
174
console.log(isTaskQueue(invalidQueue)); // false
175
176
// Custom queue validation
177
class CustomQueue implements TaskQueue {
178
readonly size = 0;
179
shift() { return null; }
180
push(task: Task) { }
181
remove(task: Task) { }
182
}
183
184
console.log(isTaskQueue(new CustomQueue())); // true
185
```
186
187
### Custom Queue Implementation
188
189
You can implement custom task queues for specialized scenarios.
190
191
```typescript { .api }
192
/**
193
* Example custom priority queue implementation
194
*/
195
class PriorityTaskQueue implements TaskQueue {
196
private tasks: (Task & { priority: number })[] = [];
197
198
get size(): number {
199
return this.tasks.length;
200
}
201
202
shift(): Task | null {
203
if (this.tasks.length === 0) return null;
204
205
// Find highest priority task
206
let highestIndex = 0;
207
for (let i = 1; i < this.tasks.length; i++) {
208
if (this.tasks[i].priority > this.tasks[highestIndex].priority) {
209
highestIndex = i;
210
}
211
}
212
213
return this.tasks.splice(highestIndex, 1)[0];
214
}
215
216
push(task: Task): void {
217
// Assume task has priority property
218
this.tasks.push(task as Task & { priority: number });
219
}
220
221
remove(task: Task): void {
222
const index = this.tasks.indexOf(task as Task & { priority: number });
223
if (index !== -1) {
224
this.tasks.splice(index, 1);
225
}
226
}
227
}
228
```
229
230
**Usage Examples:**
231
232
```typescript
233
import { Piscina } from "piscina";
234
235
// Custom priority queue usage
236
const priorityPool = new Piscina({
237
filename: "worker.js",
238
taskQueue: new PriorityTaskQueue()
239
});
240
241
// Tasks with priority (requires custom task wrapper)
242
await priorityPool.run({
243
data: "urgent task",
244
[Piscina.queueOptionsSymbol]: { priority: 10 }
245
});
246
247
await priorityPool.run({
248
data: "normal task",
249
[Piscina.queueOptionsSymbol]: { priority: 1 }
250
});
251
```
252
253
### PiscinaTask Interface
254
255
Public interface representing queued tasks with metadata.
256
257
```typescript { .api }
258
/**
259
* Public interface for queued tasks
260
*/
261
interface PiscinaTask extends Task {
262
/** Unique task identifier */
263
readonly taskId: number;
264
/** Worker filename for this task */
265
readonly filename: string;
266
/** Task name identifier */
267
readonly name: string;
268
/** Task creation timestamp */
269
readonly created: number;
270
/** Whether task supports cancellation */
271
readonly isAbortable: boolean;
272
}
273
```
274
275
## Queue Selection Guidelines
276
277
### ArrayTaskQueue
278
- **Best for**: General purpose applications, variable queue sizes
279
- **Pros**: Simple implementation, dynamic sizing, familiar array operations
280
- **Cons**: O(n) shift operations can impact performance with large queues
281
- **Use when**: Queue sizes are typically small to medium, or simplicity is preferred
282
283
### FixedQueue
284
- **Best for**: High-throughput applications, performance-critical scenarios
285
- **Pros**: O(1) operations, optimized memory usage, V8-optimized implementation
286
- **Cons**: More complex internal structure
287
- **Use when**: Performance is critical, predictable memory usage needed
288
289
### Custom Queue
290
- **Best for**: Specialized requirements like priority queues, delayed execution
291
- **Implementation**: Must implement TaskQueue interface completely
292
- **Validation**: Use `isTaskQueue()` to verify implementation
293
294
**Performance Comparison:**
295
296
```typescript
297
import { Piscina, ArrayTaskQueue, FixedQueue } from "piscina";
298
299
// For high-throughput scenarios (recommended)
300
const performancePool = new Piscina({
301
filename: "worker.js",
302
taskQueue: new FixedQueue(),
303
maxThreads: 8,
304
concurrentTasksPerWorker: 2
305
});
306
307
// For general purpose (simpler but slower at scale)
308
const generalPool = new Piscina({
309
filename: "worker.js",
310
taskQueue: new ArrayTaskQueue(),
311
maxThreads: 4
312
});
313
314
// Benchmark queues
315
async function benchmarkQueues() {
316
const tasks = Array.from({ length: 1000 }, (_, i) => ({ id: i }));
317
318
console.time('FixedQueue');
319
await Promise.all(tasks.map(task => performancePool.run(task)));
320
console.timeEnd('FixedQueue');
321
322
console.time('ArrayTaskQueue');
323
await Promise.all(tasks.map(task => generalPool.run(task)));
324
console.timeEnd('ArrayTaskQueue');
325
}
326
```