or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

concurrency-control.mdevents.mdindex.mdqueue-management.md

index.mddocs/

0

# p-queue

1

2

p-queue is a sophisticated promise queue library with fine-grained concurrency control for managing asynchronous operations. It enables developers to limit concurrent operations, implement rate limiting, priority-based execution, and interval-based throttling to prevent overwhelming APIs or system resources.

3

4

## Package Information

5

6

- **Package Name**: p-queue

7

- **Package Type**: npm

8

- **Language**: TypeScript

9

- **Installation**: `npm install p-queue`

10

11

## Core Imports

12

13

```typescript

14

import PQueue from "p-queue";

15

```

16

17

**Note**: This package is ESM-only and does not provide CommonJS exports. Projects using CommonJS need to convert to ESM.

18

19

## Basic Usage

20

21

```typescript

22

import PQueue from "p-queue";

23

24

// Create queue with concurrency limit

25

const queue = new PQueue({ concurrency: 2 });

26

27

// Add tasks to the queue

28

await queue.add(async () => {

29

const response = await fetch('https://api.example.com/data');

30

return response.json();

31

});

32

33

// Add multiple tasks

34

const results = await queue.addAll([

35

async () => processFile('file1.txt'),

36

async () => processFile('file2.txt'),

37

async () => processFile('file3.txt'),

38

]);

39

40

// Wait for all tasks to complete

41

await queue.onIdle();

42

```

43

44

## Architecture

45

46

p-queue is built around several key components:

47

48

- **PQueue Class**: Main queue class extending EventEmitter3 for real-time monitoring

49

- **Concurrency Control**: Sophisticated limiting of simultaneous operations with interval-based throttling

50

- **Priority System**: Configurable priority-based execution order with dynamic priority updates

51

- **Event System**: Comprehensive event emission for monitoring queue state and task lifecycle

52

- **Queue Implementations**: Pluggable queue classes with default PriorityQueue implementation

53

- **Task Management**: Flexible task handling with timeout support and abort signal integration

54

55

## Capabilities

56

57

### Queue Management

58

59

Core queue operations for adding, controlling, and monitoring promise-based tasks. Includes methods for adding individual tasks, batch operations, and queue state management.

60

61

```typescript { .api }

62

class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> {

63

constructor(options?: Options<QueueType, EnqueueOptionsType>);

64

65

// Core task management

66

add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;

67

add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;

68

addAll<TaskResultsType>(functions: ReadonlyArray<Task<TaskResultsType>>, options?: Partial<EnqueueOptionsType>): Promise<Array<TaskResultsType | void>>;

69

70

// Queue control

71

start(): this;

72

pause(): void;

73

clear(): void;

74

75

// State monitoring

76

onEmpty(): Promise<void>;

77

onIdle(): Promise<void>;

78

onSizeLessThan(limit: number): Promise<void>;

79

}

80

```

81

82

[Queue Management](./queue-management.md)

83

84

### Concurrency Control

85

86

Advanced concurrency management with configurable limits, interval-based throttling, and priority handling for fine-grained control over async operation execution.

87

88

```typescript { .api }

89

// Concurrency properties and methods

90

class PQueue {

91

get concurrency(): number;

92

set concurrency(newConcurrency: number);

93

94

timeout?: number;

95

96

get size(): number;

97

get pending(): number;

98

get isPaused(): boolean;

99

100

sizeBy(options: Readonly<Partial<EnqueueOptionsType>>): number;

101

setPriority(id: string, priority: number): void;

102

}

103

```

104

105

[Concurrency Control](./concurrency-control.md)

106

107

### Event System

108

109

Comprehensive event emission system for monitoring queue state, task lifecycle, and execution progress. Built on EventEmitter3 for efficient event handling.

110

111

```typescript { .api }

112

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

113

114

// Event emission (inherited from EventEmitter3)

115

class PQueue extends EventEmitter<EventName> {

116

on(event: EventName, listener: (...args: any[]) => void): this;

117

off(event: EventName, listener: (...args: any[]) => void): this;

118

emit(event: EventName, ...args: any[]): boolean;

119

}

120

```

121

122

[Event System](./events.md)

123

124

## Types

125

126

### Core Types

127

128

```typescript { .api }

129

type Task<TaskResultType> =

130

| ((options: TaskOptions) => PromiseLike<TaskResultType>)

131

| ((options: TaskOptions) => TaskResultType);

132

133

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

134

135

type RunFunction = () => Promise<unknown>;

136

```

137

138

### Configuration Options

139

140

```typescript { .api }

141

type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {

142

readonly concurrency?: number;

143

readonly autoStart?: boolean;

144

readonly queueClass?: new () => QueueType;

145

readonly intervalCap?: number;

146

readonly interval?: number;

147

readonly carryoverConcurrencyCount?: boolean;

148

timeout?: number;

149

throwOnTimeout?: boolean;

150

};

151

152

type QueueAddOptions = {

153

readonly priority?: number;

154

id?: string;

155

readonly signal?: AbortSignal;

156

timeout?: number;

157

throwOnTimeout?: boolean;

158

};

159

160

type TaskOptions = {

161

readonly signal?: AbortSignal;

162

};

163

```

164

165

### Queue Interface

166

167

```typescript { .api }

168

interface Queue<Element, Options> {

169

size: number;

170

filter: (options: Readonly<Partial<Options>>) => Element[];

171

dequeue: () => Element | undefined;

172

enqueue: (run: Element, options?: Partial<Options>) => void;

173

setPriority: (id: string, priority: number) => void;

174

}

175

```