or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdload-balancing.mdperformance-monitoring.mdpool-management.mdtask-cancellation.mdtask-queues.mdtransferable-objects.md

task-queues.mddocs/

0

# Task Queues

1

2

Configurable task queuing strategies with built-in FIFO and fixed-size circular buffer implementations for optimal performance in different scenarios.

3

4

## Capabilities

5

6

### TaskQueue Interface

7

8

Base interface that all task queue implementations must satisfy.

9

10

```typescript { .api }

11

/**

12

* Interface for task queue implementations

13

*/

14

interface TaskQueue {

15

/** Current number of tasks in the queue */

16

readonly size: number;

17

18

/**

19

* Remove and return the first task from the queue

20

* @returns First task or null if queue is empty

21

*/

22

shift(): Task | null;

23

24

/**

25

* Remove a specific task from the queue

26

* @param task - Task to remove

27

*/

28

remove(task: Task): void;

29

30

/**

31

* Add a task to the end of the queue

32

* @param task - Task to add

33

*/

34

push(task: Task): void;

35

}

36

37

/**

38

* Base task interface

39

*/

40

interface Task {

41

readonly [queueOptionsSymbol]: object | null;

42

}

43

```

44

45

### ArrayTaskQueue

46

47

Simple array-based FIFO task queue implementation suitable for most use cases.

48

49

```typescript { .api }

50

/**

51

* Simple array-based FIFO task queue implementation

52

* Best for: General purpose, variable queue sizes, simple scenarios

53

* Performance: O(n) for shift operations due to array shifting

54

*/

55

class ArrayTaskQueue implements TaskQueue {

56

readonly size: number;

57

58

/**

59

* Remove and return the first task (FIFO)

60

* @returns First task or null if empty

61

*/

62

shift(): Task | null;

63

64

/**

65

* Add task to end of queue

66

* @param task - Task to enqueue

67

*/

68

push(task: Task): void;

69

70

/**

71

* Remove specific task from queue

72

* @param task - Task to remove

73

*/

74

remove(task: Task): void;

75

}

76

```

77

78

**Usage Examples:**

79

80

```typescript

81

import { Piscina, ArrayTaskQueue } from "piscina";

82

83

// Use ArrayTaskQueue explicitly

84

const pool = new Piscina({

85

filename: "worker.js",

86

taskQueue: new ArrayTaskQueue()

87

});

88

89

// ArrayTaskQueue is also the default fallback

90

const defaultPool = new Piscina({

91

filename: "worker.js"

92

// Uses ArrayTaskQueue internally when no custom queue specified

93

});

94

```

95

96

### FixedQueue

97

98

High-performance fixed-size circular buffer queue implementation optimized for Node.js V8 engine.

99

100

```typescript { .api }

101

/**

102

* High-performance fixed-size circular buffer queue implementation

103

* Best for: High-throughput scenarios, predictable memory usage

104

* Performance: O(1) for all operations

105

* Based on Node.js internal implementation, optimized for V8

106

*/

107

class FixedQueue implements TaskQueue {

108

readonly size: number;

109

110

/**

111

* Remove and return the first task (FIFO)

112

* @returns First task or null if empty

113

*/

114

shift(): Task | null;

115

116

/**

117

* Add task to end of queue

118

* @param task - Task to enqueue

119

*/

120

push(task: Task): void;

121

122

/**

123

* Remove specific task from queue

124

* @param task - Task to remove

125

*/

126

remove(task: Task): void;

127

}

128

```

129

130

**Usage Examples:**

131

132

```typescript

133

import { Piscina, FixedQueue } from "piscina";

134

135

// Use FixedQueue for high-performance scenarios

136

const highThroughputPool = new Piscina({

137

filename: "worker.js",

138

taskQueue: new FixedQueue(),

139

maxThreads: 16,

140

concurrentTasksPerWorker: 4

141

});

142

143

// FixedQueue is the default when not specified

144

const pool = new Piscina({

145

filename: "worker.js"

146

// Uses FixedQueue by default

147

});

148

```

149

150

### Queue Validation

151

152

Utility function to validate task queue implementations.

153

154

```typescript { .api }

155

/**

156

* Validates that an object implements the TaskQueue interface

157

* @param value - Object to validate

158

* @returns True if object is a valid TaskQueue

159

*/

160

function isTaskQueue(value: TaskQueue): boolean;

161

```

162

163

**Usage Examples:**

164

165

```typescript

166

import { isTaskQueue, ArrayTaskQueue, FixedQueue } from "piscina";

167

168

const arrayQueue = new ArrayTaskQueue();

169

const fixedQueue = new FixedQueue();

170

const invalidQueue = { size: 0 }; // Missing required methods

171

172

console.log(isTaskQueue(arrayQueue)); // true

173

console.log(isTaskQueue(fixedQueue)); // true

174

console.log(isTaskQueue(invalidQueue)); // false

175

176

// Custom queue validation

177

class CustomQueue implements TaskQueue {

178

readonly size = 0;

179

shift() { return null; }

180

push(task: Task) { }

181

remove(task: Task) { }

182

}

183

184

console.log(isTaskQueue(new CustomQueue())); // true

185

```

186

187

### Custom Queue Implementation

188

189

You can implement custom task queues for specialized scenarios.

190

191

```typescript { .api }

192

/**

193

* Example custom priority queue implementation

194

*/

195

class PriorityTaskQueue implements TaskQueue {

196

private tasks: (Task & { priority: number })[] = [];

197

198

get size(): number {

199

return this.tasks.length;

200

}

201

202

shift(): Task | null {

203

if (this.tasks.length === 0) return null;

204

205

// Find highest priority task

206

let highestIndex = 0;

207

for (let i = 1; i < this.tasks.length; i++) {

208

if (this.tasks[i].priority > this.tasks[highestIndex].priority) {

209

highestIndex = i;

210

}

211

}

212

213

return this.tasks.splice(highestIndex, 1)[0];

214

}

215

216

push(task: Task): void {

217

// Assume task has priority property

218

this.tasks.push(task as Task & { priority: number });

219

}

220

221

remove(task: Task): void {

222

const index = this.tasks.indexOf(task as Task & { priority: number });

223

if (index !== -1) {

224

this.tasks.splice(index, 1);

225

}

226

}

227

}

228

```

229

230

**Usage Examples:**

231

232

```typescript

233

import { Piscina } from "piscina";

234

235

// Custom priority queue usage

236

const priorityPool = new Piscina({

237

filename: "worker.js",

238

taskQueue: new PriorityTaskQueue()

239

});

240

241

// Tasks with priority (requires custom task wrapper)

242

await priorityPool.run({

243

data: "urgent task",

244

[Piscina.queueOptionsSymbol]: { priority: 10 }

245

});

246

247

await priorityPool.run({

248

data: "normal task",

249

[Piscina.queueOptionsSymbol]: { priority: 1 }

250

});

251

```

252

253

### PiscinaTask Interface

254

255

Public interface representing queued tasks with metadata.

256

257

```typescript { .api }

258

/**

259

* Public interface for queued tasks

260

*/

261

interface PiscinaTask extends Task {

262

/** Unique task identifier */

263

readonly taskId: number;

264

/** Worker filename for this task */

265

readonly filename: string;

266

/** Task name identifier */

267

readonly name: string;

268

/** Task creation timestamp */

269

readonly created: number;

270

/** Whether task supports cancellation */

271

readonly isAbortable: boolean;

272

}

273

```

274

275

## Queue Selection Guidelines

276

277

### ArrayTaskQueue

278

- **Best for**: General purpose applications, variable queue sizes

279

- **Pros**: Simple implementation, dynamic sizing, familiar array operations

280

- **Cons**: O(n) shift operations can impact performance with large queues

281

- **Use when**: Queue sizes are typically small to medium, or simplicity is preferred

282

283

### FixedQueue

284

- **Best for**: High-throughput applications, performance-critical scenarios

285

- **Pros**: O(1) operations, optimized memory usage, V8-optimized implementation

286

- **Cons**: More complex internal structure

287

- **Use when**: Performance is critical, predictable memory usage needed

288

289

### Custom Queue

290

- **Best for**: Specialized requirements like priority queues, delayed execution

291

- **Implementation**: Must implement TaskQueue interface completely

292

- **Validation**: Use `isTaskQueue()` to verify implementation

293

294

**Performance Comparison:**

295

296

```typescript

297

import { Piscina, ArrayTaskQueue, FixedQueue } from "piscina";

298

299

// For high-throughput scenarios (recommended)

300

const performancePool = new Piscina({

301

filename: "worker.js",

302

taskQueue: new FixedQueue(),

303

maxThreads: 8,

304

concurrentTasksPerWorker: 2

305

});

306

307

// For general purpose (simpler but slower at scale)

308

const generalPool = new Piscina({

309

filename: "worker.js",

310

taskQueue: new ArrayTaskQueue(),

311

maxThreads: 4

312

});

313

314

// Benchmark queues

315

async function benchmarkQueues() {

316

const tasks = Array.from({ length: 1000 }, (_, i) => ({ id: i }));

317

318

console.time('FixedQueue');

319

await Promise.all(tasks.map(task => performancePool.run(task)));

320

console.timeEnd('FixedQueue');

321

322

console.time('ArrayTaskQueue');

323

await Promise.all(tasks.map(task => generalPool.run(task)));

324

console.timeEnd('ArrayTaskQueue');

325

}

326

```