or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

concurrency-control.mdevents.mdindex.mdqueue-management.md

concurrency-control.mddocs/

0

# Concurrency Control

1

2

Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.

3

4

## Capabilities

5

6

### Concurrency Management

7

8

#### concurrency Property

9

10

Controls the maximum number of tasks that can run simultaneously.

11

12

```typescript { .api }

13

/**

14

* Get the current concurrency limit

15

*/

16

get concurrency(): number;

17

18

/**

19

* Set a new concurrency limit. Changes take effect immediately.

20

* @param newConcurrency - Number from 1 and up

21

* @throws TypeError if not a number >= 1

22

*/

23

set concurrency(newConcurrency: number);

24

```

25

26

**Usage Examples:**

27

28

```typescript

29

import PQueue from "p-queue";

30

31

const queue = new PQueue({ concurrency: 2 });

32

33

// Check current concurrency

34

console.log(queue.concurrency); // 2

35

36

// Add some tasks

37

queue.add(async () => task1());

38

queue.add(async () => task2());

39

queue.add(async () => task3());

40

queue.add(async () => task4());

41

42

// Initially 2 tasks run, 2 are queued

43

console.log(queue.pending); // 2

44

console.log(queue.size); // 2

45

46

// Increase concurrency dynamically

47

queue.concurrency = 4;

48

// Now all 4 tasks can run simultaneously

49

50

// Decrease concurrency

51

queue.concurrency = 1;

52

// New tasks will be limited to 1 at a time

53

```

54

55

### Queue State Inspection

56

57

#### size Property

58

59

Number of queued items waiting to run.

60

61

```typescript { .api }

62

/**

63

* Size of the queue, the number of queued items waiting to run.

64

*/

65

get size(): number;

66

```

67

68

#### pending Property

69

70

Number of running items (no longer in the queue).

71

72

```typescript { .api }

73

/**

74

* Number of running items (no longer in the queue).

75

*/

76

get pending(): number;

77

```

78

79

#### isPaused Property

80

81

Whether the queue is currently paused.

82

83

```typescript { .api }

84

/**

85

* Whether the queue is currently paused.

86

*/

87

get isPaused(): boolean;

88

```

89

90

**Usage Examples:**

91

92

```typescript

93

const queue = new PQueue({ concurrency: 3 });

94

95

// Monitor queue state

96

console.log(`Queued: ${queue.size}, Running: ${queue.pending}, Paused: ${queue.isPaused}`);

97

98

// Add tasks and monitor changes

99

queue.add(async () => delay(1000));

100

queue.add(async () => delay(1000));

101

queue.add(async () => delay(1000));

102

queue.add(async () => delay(1000));

103

queue.add(async () => delay(1000));

104

105

// Check state after adding tasks

106

console.log(`Queued: ${queue.size}, Running: ${queue.pending}`); // Queued: 2, Running: 3

107

108

// Pause and check state

109

queue.pause();

110

console.log(`Paused: ${queue.isPaused}`); // Paused: true

111

```

112

113

#### timeout Property

114

115

Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already. Applies to each future operation.

116

117

```typescript { .api }

118

/**

119

* Per-operation timeout in milliseconds. Operations fulfill once timeout elapses if they haven't already.

120

* Applies to each future operation.

121

*/

122

timeout?: number;

123

```

124

125

**Usage Examples:**

126

127

```typescript

128

import PQueue from "p-queue";

129

130

const queue = new PQueue({ concurrency: 2, timeout: 5000 });

131

132

// Set timeout for all future operations

133

queue.timeout = 3000;

134

135

// Add task that will timeout after 3 seconds

136

await queue.add(async () => {

137

// This operation will timeout after 3 seconds

138

await someSlowOperation();

139

});

140

141

// Individual tasks can override the queue timeout

142

await queue.add(async () => {

143

return fastOperation();

144

}, { timeout: 1000 }); // This task has 1 second timeout

145

```

146

147

#### sizeBy Method

148

149

Get the size of the queue filtered by specific options.

150

151

```typescript { .api }

152

/**

153

* Size of the queue, filtered by the given options.

154

* For example, this can be used to find the number of items remaining in the queue with a specific priority level.

155

* @param options - Filter options to match against queued items

156

* @returns Number of matching items in the queue

157

*/

158

sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;

159

```

160

161

**Usage Examples:**

162

163

```typescript

164

// Add tasks with different priorities

165

await queue.add(async () => task1(), { priority: 10 });

166

await queue.add(async () => task2(), { priority: 5 });

167

await queue.add(async () => task3(), { priority: 10 });

168

await queue.add(async () => task4(), { priority: 0 });

169

170

// Check queue size by priority

171

console.log(queue.sizeBy({ priority: 10 })); // 2 tasks with priority 10

172

console.log(queue.sizeBy({ priority: 5 })); // 1 task with priority 5

173

console.log(queue.size); // 4 total tasks

174

```

175

176

### Priority Management

177

178

#### setPriority Method

179

180

Updates the priority of a queued task by its ID, affecting execution order.

181

182

```typescript { .api }

183

/**

184

* Updates the priority of a promise function by its id, affecting its execution order.

185

* Requires a defined concurrency limit to take effect.

186

*

187

* @param id - The unique identifier of the task

188

* @param priority - The new priority level (higher numbers = higher priority)

189

* @throws ReferenceError if no task with the given ID exists

190

*/

191

setPriority(id: string, priority: number): void;

192

```

193

194

**Usage Examples:**

195

196

```typescript

197

const queue = new PQueue({ concurrency: 1 });

198

199

// Add tasks with IDs and priorities

200

queue.add(async () => 'πŸ¦„', { priority: 1 });

201

queue.add(async () => 'πŸ¦€', { priority: 0, id: 'crab-task' });

202

queue.add(async () => 'πŸ¦„', { priority: 1 });

203

queue.add(async () => 'πŸ¦„', { priority: 1 });

204

205

// Before execution, increase priority of crab task

206

queue.setPriority('crab-task', 2);

207

// Now crab task will run second (after first unicorn task that's already running)

208

209

// Decrease priority example

210

const queue2 = new PQueue({ concurrency: 1 });

211

212

queue2.add(async () => 'πŸ¦„', { priority: 1 });

213

queue2.add(async () => 'πŸ¦€', { priority: 1, id: 'crab-task' });

214

queue2.add(async () => 'πŸ¦„');

215

queue2.add(async () => 'πŸ¦„', { priority: 0 });

216

217

queue2.setPriority('crab-task', -1);

218

// Now crab task will execute last

219

```

220

221

### Interval-Based Throttling

222

223

p-queue supports interval-based rate limiting to prevent overwhelming external services.

224

225

#### Configuration Options

226

227

```typescript { .api }

228

type Options<QueueType, QueueOptions> = {

229

readonly intervalCap?: number; // Max runs per interval (default: Infinity)

230

readonly interval?: number; // Interval length in ms (default: 0)

231

readonly carryoverConcurrencyCount?: boolean; // Carry over pending tasks (default: false)

232

// ... other options

233

};

234

```

235

236

**Usage Examples:**

237

238

```typescript

239

// Rate limiting: Max 10 requests per 1 second

240

const apiQueue = new PQueue({

241

concurrency: 5,

242

intervalCap: 10,

243

interval: 1000

244

});

245

246

// This will spread requests to comply with rate limits

247

for (let i = 0; i < 50; i++) {

248

apiQueue.add(async () => {

249

const response = await fetch(`https://api.example.com/data/${i}`);

250

return response.json();

251

});

252

}

253

254

// Advanced rate limiting with carryover

255

const restrictedQueue = new PQueue({

256

concurrency: 2,

257

intervalCap: 5,

258

interval: 2000,

259

carryoverConcurrencyCount: true // Tasks in progress count toward next interval

260

});

261

262

// Burst prevention: Max 3 operations per 5 seconds

263

const burstQueue = new PQueue({

264

concurrency: 10, // High concurrency allowed

265

intervalCap: 3, // But only 3 per interval

266

interval: 5000 // Every 5 seconds

267

});

268

```

269

270

### Advanced Configuration

271

272

#### Queue Initialization Options

273

274

```typescript { .api }

275

type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {

276

readonly concurrency?: number; // Concurrency limit (default: Infinity, min: 1)

277

readonly autoStart?: boolean; // Auto-execute tasks when added (default: true)

278

readonly queueClass?: new () => QueueType; // Custom queue implementation

279

readonly intervalCap?: number; // Max runs in interval (default: Infinity, min: 1)

280

readonly interval?: number; // Interval length in ms (default: 0, min: 0)

281

readonly carryoverConcurrencyCount?: boolean; // Carry over tasks to next interval (default: false)

282

timeout?: number; // Default timeout for all tasks

283

throwOnTimeout?: boolean; // Whether timeout throws exception (default: false)

284

};

285

```

286

287

**Advanced Usage Examples:**

288

289

```typescript

290

// Comprehensive configuration

291

const advancedQueue = new PQueue({

292

concurrency: 3, // Run up to 3 tasks simultaneously

293

autoStart: false, // Don't start automatically (manual control)

294

intervalCap: 10, // Max 10 operations per interval

295

interval: 60000, // 1 minute intervals

296

carryoverConcurrencyCount: true, // Count running tasks toward next interval

297

timeout: 30000, // 30 second default timeout

298

throwOnTimeout: false // Return void on timeout instead of throwing

299

});

300

301

// Add tasks then start manually

302

queue.add(async () => heavyTask1());

303

queue.add(async () => heavyTask2());

304

queue.add(async () => heavyTask3());

305

306

// Start when ready

307

queue.start();

308

309

// Custom queue implementation

310

class FIFOQueue {

311

constructor() {

312

this.queue = [];

313

}

314

315

get size() { return this.queue.length; }

316

317

enqueue(run, options) {

318

this.queue.push({ run, ...options });

319

}

320

321

dequeue() {

322

return this.queue.shift()?.run;

323

}

324

325

filter(options) {

326

return this.queue

327

.filter(item => item.priority === options.priority)

328

.map(item => item.run);

329

}

330

331

setPriority(id, priority) {

332

const item = this.queue.find(item => item.id === id);

333

if (item) item.priority = priority;

334

}

335

}

336

337

const fifoQueue = new PQueue({

338

queueClass: FIFOQueue,

339

concurrency: 2

340

});

341

```

342

343

### Performance Monitoring

344

345

```typescript

346

// Monitor queue performance

347

const queue = new PQueue({ concurrency: 5 });

348

349

function logQueueState() {

350

console.log({

351

size: queue.size,

352

pending: queue.pending,

353

isPaused: queue.isPaused,

354

concurrency: queue.concurrency

355

});

356

}

357

358

// Log state periodically

359

const monitor = setInterval(logQueueState, 1000);

360

361

// Stop monitoring when idle

362

queue.onIdle().then(() => {

363

clearInterval(monitor);

364

console.log('Queue processing complete');

365

});

366

```