or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

concurrency-control.mdevents.mdindex.mdqueue-management.md

queue-management.mddocs/

0

# Queue Management

1

2

Core queue operations for adding, controlling, and monitoring promise-based tasks. Provides methods for adding individual tasks, batch operations, and comprehensive queue state management.

3

4

## Capabilities

5

6

### Task Addition

7

8

#### add Method

9

10

Adds a sync or async task to the queue and returns a promise that resolves with the task result.

11

12

```typescript { .api }

13

/**

14

* Adds a sync or async task to the queue. Always returns a promise.

15

* @param function_ - The task function to execute

16

* @param options - Optional configuration for the task

17

* @returns Promise that resolves with task result or void

18

*/

19

add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;

20

21

/**

22

* Adds a task with throwOnTimeout enabled, guaranteeing a result

23

* @param function_ - The task function to execute

24

* @param options - Configuration with throwOnTimeout: true

25

* @returns Promise that resolves with task result (never void)

26

*/

27

add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;

28

```

29

30

**Usage Examples:**

31

32

```typescript

33

import PQueue from "p-queue";

34

35

const queue = new PQueue({ concurrency: 2 });

36

37

// Basic task addition

38

const result = await queue.add(async () => {

39

const response = await fetch('https://api.example.com/users');

40

return response.json();

41

});

42

43

// Task with priority and ID

44

await queue.add(

45

async ({ signal }) => {

46

// Task can check for abort signal

47

if (signal?.aborted) throw signal.reason;

48

return processImportantData();

49

},

50

{

51

priority: 10,

52

id: 'important-task',

53

timeout: 5000

54

}

55

);

56

57

// Task with abort signal

58

const controller = new AbortController();

59

const taskPromise = queue.add(

60

async ({ signal }) => {

61

// Handle abortion within task

62

signal?.addEventListener('abort', () => cleanup());

63

return longRunningOperation();

64

},

65

{ signal: controller.signal }

66

);

67

68

// Can abort the task

69

setTimeout(() => controller.abort(), 1000);

70

```

71

72

#### addAll Method

73

74

Adds multiple sync or async tasks to the queue simultaneously.

75

76

```typescript { .api }

77

/**

78

* Same as .add(), but accepts an array of sync or async functions

79

* @param functions - Array of task functions to execute

80

* @param options - Optional configuration applied to all tasks

81

* @returns Promise that resolves when all functions are resolved

82

*/

83

addAll<TaskResultsType>(

84

functions: ReadonlyArray<Task<TaskResultsType>>,

85

options?: Partial<EnqueueOptionsType>

86

): Promise<Array<TaskResultsType | void>>;

87

88

/**

89

* Adds multiple tasks with throwOnTimeout enabled

90

* @param functions - Array of task functions to execute

91

* @param options - Configuration with throwOnTimeout: true

92

* @returns Promise with guaranteed results (no void values)

93

*/

94

addAll<TaskResultsType>(

95

functions: ReadonlyArray<Task<TaskResultsType>>,

96

options?: {throwOnTimeout: true} & Partial<Exclude<EnqueueOptionsType, 'throwOnTimeout'>>

97

): Promise<TaskResultsType[]>;

98

```

99

100

**Usage Examples:**

101

102

```typescript

103

// Process multiple files

104

const files = ['file1.txt', 'file2.txt', 'file3.txt'];

105

const results = await queue.addAll(

106

files.map(filename => async () => {

107

const data = await readFile(filename);

108

return processFileData(data);

109

}),

110

{ priority: 5 }

111

);

112

113

// Batch API requests with error handling

114

const urls = ['api/users', 'api/posts', 'api/comments'];

115

const responses = await queue.addAll(

116

urls.map(url => async ({ signal }) => {

117

try {

118

const response = await fetch(`https://api.example.com/${url}`, {

119

signal

120

});

121

return response.json();

122

} catch (error) {

123

console.error(`Failed to fetch ${url}:`, error);

124

return null;

125

}

126

})

127

);

128

```

129

130

### Queue Control

131

132

#### start Method

133

134

Starts or resumes executing enqueued tasks within concurrency limit.

135

136

```typescript { .api }

137

/**

138

* Start (or resume) executing enqueued tasks within concurrency limit.

139

* No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)

140

* @returns The queue instance for chaining

141

*/

142

start(): this;

143

```

144

145

**Usage Examples:**

146

147

```typescript

148

// Create paused queue

149

const queue = new PQueue({

150

concurrency: 2,

151

autoStart: false

152

});

153

154

// Add tasks while paused

155

queue.add(async () => task1());

156

queue.add(async () => task2());

157

queue.add(async () => task3());

158

159

console.log(queue.size); // 3 (tasks are queued but not running)

160

console.log(queue.pending); // 0 (no tasks executing)

161

162

// Start processing

163

queue.start();

164

165

console.log(queue.pending); // 2 (up to concurrency limit)

166

```

167

168

#### pause Method

169

170

Puts queue execution on hold, preventing new tasks from starting.

171

172

```typescript { .api }

173

/**

174

* Put queue execution on hold.

175

*/

176

pause(): void;

177

```

178

179

**Usage Examples:**

180

181

```typescript

182

const queue = new PQueue({ concurrency: 3 });

183

184

// Add some tasks

185

queue.add(async () => longTask1());

186

queue.add(async () => longTask2());

187

queue.add(async () => longTask3());

188

189

// Pause after 1 second

190

setTimeout(() => {

191

queue.pause();

192

console.log('Queue paused');

193

194

// Resume after another 2 seconds

195

setTimeout(() => {

196

queue.start();

197

console.log('Queue resumed');

198

}, 2000);

199

}, 1000);

200

```

201

202

#### clear Method

203

204

Clears all queued tasks that haven't started executing yet.

205

206

```typescript { .api }

207

/**

208

* Clear the queue.

209

*/

210

clear(): void;

211

```

212

213

**Usage Examples:**

214

215

```typescript

216

const queue = new PQueue({ concurrency: 1 });

217

218

// Add tasks

219

queue.add(async () => task1());

220

queue.add(async () => task2());

221

queue.add(async () => task3());

222

223

console.log(queue.size); // 2 (one task running, two queued)

224

225

// Clear queued tasks

226

queue.clear();

227

228

console.log(queue.size); // 0 (queued tasks cleared)

229

console.log(queue.pending); // 1 (running task continues)

230

```

231

232

### State Monitoring

233

234

#### onEmpty Method

235

236

Returns a promise that resolves when the queue becomes empty (no more tasks waiting).

237

238

```typescript { .api }

239

/**

240

* Can be called multiple times. Useful if you for example add additional items at a later time.

241

* @returns A promise that settles when the queue becomes empty.

242

*/

243

onEmpty(): Promise<void>;

244

```

245

246

#### onIdle Method

247

248

Returns a promise that resolves when the queue becomes empty and all promises have completed.

249

250

```typescript { .api }

251

/**

252

* The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished.

253

* .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

254

* @returns A promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.

255

*/

256

onIdle(): Promise<void>;

257

```

258

259

#### onSizeLessThan Method

260

261

Returns a promise that resolves when the queue size is less than the given limit.

262

263

```typescript { .api }

264

/**

265

* @param limit - The size limit to wait for

266

* @returns A promise that settles when the queue size is less than the given limit: queue.size < limit.

267

*

268

* If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan() before adding a new item.

269

*

270

* Note that this only limits the number of items waiting to start. There could still be up to concurrency jobs already running that this call does not include in its calculation.

271

*/

272

onSizeLessThan(limit: number): Promise<void>;

273

```

274

275

**Usage Examples:**

276

277

```typescript

278

const queue = new PQueue({ concurrency: 2 });

279

280

// Monitor queue states

281

queue.add(async () => {

282

await delay(1000);

283

return 'task1';

284

});

285

286

queue.add(async () => {

287

await delay(2000);

288

return 'task2';

289

});

290

291

// Wait for queue to be empty (no more tasks waiting)

292

await queue.onEmpty();

293

console.log('No more tasks in queue');

294

295

// Wait for all work to complete

296

await queue.onIdle();

297

console.log('All tasks completed');

298

299

// Limit queue growth

300

async function addTaskSafely(taskFn) {

301

// Wait if queue has too many items

302

await queue.onSizeLessThan(10);

303

return queue.add(taskFn);

304

}

305

```

306

307

### Task Configuration

308

309

Tasks added to the queue can be configured with various options:

310

311

```typescript { .api }

312

type QueueAddOptions = {

313

readonly priority?: number;

314

id?: string;

315

readonly signal?: AbortSignal;

316

timeout?: number;

317

throwOnTimeout?: boolean;

318

};

319

320

type TaskOptions = {

321

readonly signal?: AbortSignal;

322

};

323

324

type Task<TaskResultType> =

325

| ((options: TaskOptions) => PromiseLike<TaskResultType>)

326

| ((options: TaskOptions) => TaskResultType);

327

```

328

329

**Configuration Examples:**

330

331

```typescript

332

// Priority-based execution (higher numbers = higher priority)

333

await queue.add(async () => criticalTask(), { priority: 10 });

334

await queue.add(async () => normalTask(), { priority: 0 });

335

await queue.add(async () => lowPriorityTask(), { priority: -5 });

336

337

// Task with timeout

338

await queue.add(

339

async () => {

340

// This task will timeout after 5 seconds

341

await slowOperation();

342

return result;

343

},

344

{

345

timeout: 5000,

346

throwOnTimeout: true // Will throw TimeoutError instead of returning void

347

}

348

);

349

350

// Task with unique ID for later priority updates

351

await queue.add(

352

async () => updateUserProfile(userId),

353

{ id: `profile-update-${userId}`, priority: 5 }

354

);

355

356

// Later, increase priority of specific task

357

queue.setPriority(`profile-update-${userId}`, 15);

358

```