0
# Queuing Strategies
1
2
Backpressure management through queuing strategies that control how data is buffered within streams and determine when backpressure should be applied.
3
4
## Capabilities
5
6
### CountQueuingStrategy Class
7
8
A queuing strategy that counts the number of chunks in the queue, regardless of their individual sizes.
9
10
```typescript { .api }
11
/**
12
* A queuing strategy that counts the number of chunks
13
*/
14
class CountQueuingStrategy implements QueuingStrategy<any> {
15
constructor(options: { highWaterMark: number });
16
17
/** Returns the high water mark provided to the constructor */
18
readonly highWaterMark: number;
19
20
/** Measures the size of chunk by always returning 1 */
21
readonly size: (chunk: any) => 1;
22
}
23
```
24
25
**Usage Examples:**
26
27
```typescript
28
import { ReadableStream, CountQueuingStrategy } from "web-streams-polyfill";
29
30
// Readable stream that buffers up to 5 chunks
31
const stream = new ReadableStream({
32
start(controller) {
33
controller.enqueue("chunk 1");
34
controller.enqueue("chunk 2");
35
controller.enqueue("chunk 3");
36
// Stream will signal backpressure after 5 chunks
37
}
38
}, new CountQueuingStrategy({ highWaterMark: 5 }));
39
40
// Writable stream with count-based backpressure
41
const writable = new WritableStream({
42
write(chunk) {
43
console.log("Processing:", chunk);
44
return new Promise(resolve => setTimeout(resolve, 100));
45
}
46
}, new CountQueuingStrategy({ highWaterMark: 3 }));
47
48
// Transform stream with different strategies for each side
49
const transform = new TransformStream({
50
transform(chunk, controller) {
51
// Split each input chunk into multiple output chunks
52
const parts = chunk.toString().split(' ');
53
parts.forEach(part => controller.enqueue(part));
54
}
55
},
56
new CountQueuingStrategy({ highWaterMark: 2 }), // writable side
57
new CountQueuingStrategy({ highWaterMark: 10 }) // readable side
58
);
59
```
60
61
### ByteLengthQueuingStrategy Class
62
63
A queuing strategy that measures the size of chunks by their byte length, useful for streams dealing with binary data or text where the size of individual chunks matters.
64
65
```typescript { .api }
66
/**
67
* A queuing strategy that counts the number of bytes in each chunk
68
*/
69
class ByteLengthQueuingStrategy implements QueuingStrategy<ArrayBufferView> {
70
constructor(options: { highWaterMark: number });
71
72
/** Returns the high water mark provided to the constructor */
73
readonly highWaterMark: number;
74
75
/** Measures the size of chunk by returning the value of its byteLength property */
76
readonly size: (chunk: ArrayBufferView) => number;
77
}
78
```
79
80
**Usage Examples:**
81
82
```typescript
83
import { ReadableStream, ByteLengthQueuingStrategy } from "web-streams-polyfill";
84
85
// Readable stream that buffers up to 1MB of data
86
const byteStream = new ReadableStream({
87
start(controller) {
88
const chunk1 = new Uint8Array(512 * 1024); // 512KB
89
const chunk2 = new Uint8Array(256 * 1024); // 256KB
90
91
controller.enqueue(chunk1);
92
controller.enqueue(chunk2);
93
// Stream will signal backpressure after 1MB total
94
}
95
}, new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }));
96
97
// File upload stream with byte-based backpressure
98
const uploadStream = new WritableStream({
99
async write(chunk) {
100
// Upload chunk to server
101
await fetch('/upload', {
102
method: 'POST',
103
body: chunk
104
});
105
}
106
}, new ByteLengthQueuingStrategy({ highWaterMark: 64 * 1024 })); // 64KB buffer
107
108
// Transform stream that processes binary data
109
const compressionTransform = new TransformStream({
110
transform(chunk, controller) {
111
// Simulate compression (input: Uint8Array, output: compressed Uint8Array)
112
const compressed = new Uint8Array(chunk.byteLength / 2); // 50% compression
113
compressed.set(new Uint8Array(chunk.buffer.slice(0, compressed.length)));
114
controller.enqueue(compressed);
115
}
116
},
117
new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }), // 1MB input buffer
118
new ByteLengthQueuingStrategy({ highWaterMark: 512 * 1024 }) // 512KB output buffer
119
);
120
```
121
122
### Custom Queuing Strategies
123
124
You can create custom queuing strategies by implementing the QueuingStrategy interface.
125
126
```typescript { .api }
127
interface QueuingStrategy<T = any> {
128
/** The high water mark value */
129
highWaterMark?: number;
130
131
/** Function that computes and returns the finite non-negative size of the given chunk value */
132
size?: (chunk: T) => number;
133
}
134
```
135
136
**Usage Examples:**
137
138
```typescript
139
import { ReadableStream, WritableStream } from "web-streams-polyfill";
140
141
// Custom strategy that measures string length
142
const stringLengthStrategy = {
143
highWaterMark: 1000, // Buffer up to 1000 characters
144
size(chunk: string) {
145
return chunk.length;
146
}
147
};
148
149
const textStream = new ReadableStream({
150
start(controller) {
151
controller.enqueue("Hello"); // 5 characters
152
controller.enqueue("World"); // 5 characters
153
controller.enqueue("!"); // 1 character
154
// Total: 11 characters (well under 1000 limit)
155
}
156
}, stringLengthStrategy);
157
158
// Custom strategy for objects based on property count
159
const objectComplexityStrategy = {
160
highWaterMark: 100, // Buffer objects with up to 100 total properties
161
size(chunk: object) {
162
return Object.keys(chunk).length;
163
}
164
};
165
166
const objectStream = new WritableStream({
167
write(chunk) {
168
console.log(`Processing object with ${Object.keys(chunk).length} properties`);
169
}
170
}, objectComplexityStrategy);
171
172
// Custom strategy with priority-based sizing
173
interface PriorityItem {
174
data: any;
175
priority: 'low' | 'medium' | 'high';
176
}
177
178
const priorityStrategy = {
179
highWaterMark: 50,
180
size(chunk: PriorityItem) {
181
switch (chunk.priority) {
182
case 'high': return 10; // High priority items take more buffer space
183
case 'medium': return 5;
184
case 'low': return 1;
185
default: return 1;
186
}
187
}
188
};
189
190
const priorityStream = new ReadableStream({
191
start(controller) {
192
controller.enqueue({ data: "urgent", priority: 'high' as const });
193
controller.enqueue({ data: "normal", priority: 'medium' as const });
194
controller.enqueue({ data: "background", priority: 'low' as const });
195
}
196
}, priorityStrategy);
197
```
198
199
## Queuing Strategy Types
200
201
```typescript { .api }
202
interface QueuingStrategyInit {
203
/** The high water mark value, which must be a non-negative number */
204
highWaterMark: number;
205
}
206
207
type QueuingStrategySizeCallback<T = any> = (chunk: T) => number;
208
```
209
210
## Backpressure Management
211
212
Queuing strategies work together with stream controllers to manage backpressure:
213
214
```typescript
215
import { WritableStream, CountQueuingStrategy } from "web-streams-polyfill";
216
217
const slowProcessor = new WritableStream({
218
async write(chunk, controller) {
219
// Simulate slow processing
220
await new Promise(resolve => setTimeout(resolve, 1000));
221
console.log("Processed:", chunk);
222
}
223
}, new CountQueuingStrategy({ highWaterMark: 2 }));
224
225
const writer = slowProcessor.getWriter();
226
227
async function writeWithBackpressure() {
228
try {
229
await writer.write("chunk 1");
230
console.log("Desired size:", writer.desiredSize); // 1 (2 - 1)
231
232
await writer.write("chunk 2");
233
console.log("Desired size:", writer.desiredSize); // 0 (2 - 2)
234
235
await writer.write("chunk 3");
236
console.log("Desired size:", writer.desiredSize); // -1 (2 - 3, backpressure!)
237
238
// The write above will wait until there's space in the queue
239
// before resolving, providing automatic backpressure handling
240
241
} finally {
242
writer.releaseLock();
243
}
244
}
245
246
writeWithBackpressure();
247
```
248
249
## Default Strategies
250
251
When no strategy is provided, streams use default strategies:
252
253
```typescript
254
// These are equivalent:
255
const stream1 = new ReadableStream(source);
256
const stream2 = new ReadableStream(source, new CountQueuingStrategy({ highWaterMark: 1 }));
257
258
// For writable streams:
259
const writable1 = new WritableStream(sink);
260
const writable2 = new WritableStream(sink, new CountQueuingStrategy({ highWaterMark: 1 }));
261
262
// For byte streams, the default high water mark is 0:
263
const byteStream1 = new ReadableStream({ type: 'bytes' });
264
const byteStream2 = new ReadableStream({ type: 'bytes' }, { highWaterMark: 0 });
265
```