Web Streams polyfill and ponyfill implementation based on the WHATWG specification
—
Backpressure management through queuing strategies that control how data is buffered within streams and determine when backpressure should be applied.
A queuing strategy that counts the number of chunks in the queue, regardless of their individual sizes.
/**
* A queuing strategy that counts the number of chunks
*/
class CountQueuingStrategy implements QueuingStrategy<any> {
constructor(options: { highWaterMark: number });
/** Returns the high water mark provided to the constructor */
readonly highWaterMark: number;
/** Measures the size of chunk by always returning 1 */
readonly size: (chunk: any) => 1;
}Usage Examples:
import { ReadableStream, CountQueuingStrategy } from "web-streams-polyfill";
// Readable stream that buffers up to 5 chunks
const stream = new ReadableStream({
start(controller) {
controller.enqueue("chunk 1");
controller.enqueue("chunk 2");
controller.enqueue("chunk 3");
// Stream will signal backpressure after 5 chunks
}
}, new CountQueuingStrategy({ highWaterMark: 5 }));
// Writable stream with count-based backpressure
const writable = new WritableStream({
write(chunk) {
console.log("Processing:", chunk);
return new Promise(resolve => setTimeout(resolve, 100));
}
}, new CountQueuingStrategy({ highWaterMark: 3 }));
// Transform stream with different strategies for each side
const transform = new TransformStream({
transform(chunk, controller) {
// Split each input chunk into multiple output chunks
const parts = chunk.toString().split(' ');
parts.forEach(part => controller.enqueue(part));
}
},
new CountQueuingStrategy({ highWaterMark: 2 }), // writable side
new CountQueuingStrategy({ highWaterMark: 10 }) // readable side
);A queuing strategy that measures the size of chunks by their byte length, useful for streams dealing with binary data or text where the size of individual chunks matters.
/**
* A queuing strategy that counts the number of bytes in each chunk
*/
class ByteLengthQueuingStrategy implements QueuingStrategy<ArrayBufferView> {
constructor(options: { highWaterMark: number });
/** Returns the high water mark provided to the constructor */
readonly highWaterMark: number;
/** Measures the size of chunk by returning the value of its byteLength property */
readonly size: (chunk: ArrayBufferView) => number;
}Usage Examples:
import { ReadableStream, ByteLengthQueuingStrategy } from "web-streams-polyfill";
// Readable stream that buffers up to 1MB of data
const byteStream = new ReadableStream({
start(controller) {
const chunk1 = new Uint8Array(512 * 1024); // 512KB
const chunk2 = new Uint8Array(256 * 1024); // 256KB
controller.enqueue(chunk1);
controller.enqueue(chunk2);
// Stream will signal backpressure after 1MB total
}
}, new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }));
// File upload stream with byte-based backpressure
const uploadStream = new WritableStream({
async write(chunk) {
// Upload chunk to server
await fetch('/upload', {
method: 'POST',
body: chunk
});
}
}, new ByteLengthQueuingStrategy({ highWaterMark: 64 * 1024 })); // 64KB buffer
// Transform stream that processes binary data
const compressionTransform = new TransformStream({
transform(chunk, controller) {
// Simulate compression (input: Uint8Array, output: compressed Uint8Array)
const compressed = new Uint8Array(chunk.byteLength / 2); // 50% compression
compressed.set(new Uint8Array(chunk.buffer.slice(0, compressed.length)));
controller.enqueue(compressed);
}
},
new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }), // 1MB input buffer
new ByteLengthQueuingStrategy({ highWaterMark: 512 * 1024 }) // 512KB output buffer
);You can create custom queuing strategies by implementing the QueuingStrategy interface.
interface QueuingStrategy<T = any> {
/** The high water mark value */
highWaterMark?: number;
/** Function that computes and returns the finite non-negative size of the given chunk value */
size?: (chunk: T) => number;
}Usage Examples:
import { ReadableStream, WritableStream } from "web-streams-polyfill";
// Custom strategy that measures string length
const stringLengthStrategy = {
highWaterMark: 1000, // Buffer up to 1000 characters
size(chunk: string) {
return chunk.length;
}
};
const textStream = new ReadableStream({
start(controller) {
controller.enqueue("Hello"); // 5 characters
controller.enqueue("World"); // 5 characters
controller.enqueue("!"); // 1 character
// Total: 11 characters (well under 1000 limit)
}
}, stringLengthStrategy);
// Custom strategy for objects based on property count
const objectComplexityStrategy = {
highWaterMark: 100, // Buffer objects with up to 100 total properties
size(chunk: object) {
return Object.keys(chunk).length;
}
};
const objectStream = new WritableStream({
write(chunk) {
console.log(`Processing object with ${Object.keys(chunk).length} properties`);
}
}, objectComplexityStrategy);
// Custom strategy with priority-based sizing
interface PriorityItem {
data: any;
priority: 'low' | 'medium' | 'high';
}
const priorityStrategy = {
highWaterMark: 50,
size(chunk: PriorityItem) {
switch (chunk.priority) {
case 'high': return 10; // High priority items take more buffer space
case 'medium': return 5;
case 'low': return 1;
default: return 1;
}
}
};
const priorityStream = new ReadableStream({
start(controller) {
controller.enqueue({ data: "urgent", priority: 'high' as const });
controller.enqueue({ data: "normal", priority: 'medium' as const });
controller.enqueue({ data: "background", priority: 'low' as const });
}
}, priorityStrategy);interface QueuingStrategyInit {
/** The high water mark value, which must be a non-negative number */
highWaterMark: number;
}
type QueuingStrategySizeCallback<T = any> = (chunk: T) => number;Queuing strategies work together with stream controllers to manage backpressure:
import { WritableStream, CountQueuingStrategy } from "web-streams-polyfill";
const slowProcessor = new WritableStream({
async write(chunk, controller) {
// Simulate slow processing
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("Processed:", chunk);
}
}, new CountQueuingStrategy({ highWaterMark: 2 }));
const writer = slowProcessor.getWriter();
async function writeWithBackpressure() {
try {
await writer.write("chunk 1");
console.log("Desired size:", writer.desiredSize); // 1 (2 - 1)
await writer.write("chunk 2");
console.log("Desired size:", writer.desiredSize); // 0 (2 - 2)
await writer.write("chunk 3");
console.log("Desired size:", writer.desiredSize); // -1 (2 - 3, backpressure!)
// The write above will wait until there's space in the queue
// before resolving, providing automatic backpressure handling
} finally {
writer.releaseLock();
}
}
writeWithBackpressure();When no strategy is provided, streams use default strategies:
// These are equivalent:
const stream1 = new ReadableStream(source);
const stream2 = new ReadableStream(source, new CountQueuingStrategy({ highWaterMark: 1 }));
// For writable streams:
const writable1 = new WritableStream(sink);
const writable2 = new WritableStream(sink, new CountQueuingStrategy({ highWaterMark: 1 }));
// For byte streams, the default high water mark is 0:
const byteStream1 = new ReadableStream({ type: 'bytes' });
const byteStream2 = new ReadableStream({ type: 'bytes' }, { highWaterMark: 0 });Install with Tessl CLI
npx tessl i tessl/npm-web-streams-polyfill