Web Streams polyfill and ponyfill implementation based on the WHATWG specification
—
Transform stream functionality that connects a writable side to a readable side, allowing data to be transformed as it flows through.
A transform stream consists of a pair of streams: a writable stream (writable side) and a readable stream (readable side). Writes to the writable side result in new data being made available for reading from the readable side.
/**
* A transform stream consists of a writable stream and a readable stream connected together
*/
class TransformStream<I = any, O = any> {
constructor(
transformer?: Transformer<I, O>,
writableStrategy?: QueuingStrategy<I>,
readableStrategy?: QueuingStrategy<O>
);
/** The readable side of the transform stream */
readonly readable: ReadableStream<O>;
/** The writable side of the transform stream */
readonly writable: WritableStream<I>;
}Usage Examples:
import { TransformStream } from "web-streams-polyfill";
// Create a transform stream that converts text to uppercase
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString().toUpperCase());
}
});
// Create a transform stream that filters out empty lines
const filterEmptyLines = new TransformStream({
transform(chunk, controller) {
const line = chunk.toString().trim();
if (line.length > 0) {
controller.enqueue(line);
}
}
});
// Create a transform stream that adds line numbers
let lineNumber = 1;
const addLineNumbers = new TransformStream({
start(controller) {
lineNumber = 1;
},
transform(chunk, controller) {
const line = chunk.toString();
controller.enqueue(`${lineNumber++}: ${line}`);
}
});
// Chain transforms together
await inputStream
.pipeThrough(filterEmptyLines)
.pipeThrough(upperCaseTransform)
.pipeThrough(addLineNumbers)
.pipeTo(outputStream);Controller provided to transformers for managing the transform stream's readable side.
/**
* Controller for transform streams that manages the readable side
*/
class TransformStreamDefaultController<O> {
/** Returns the desired size to fill the controlled transform stream's readable side internal queue */
readonly desiredSize: number | null;
/** Enqueue a chunk to the controlled transform stream's readable side */
enqueue(chunk: O): void;
/** Error both sides of the controlled transform stream */
error(reason?: any): void;
/** Close the controlled transform stream's readable side and error the writable side */
terminate(): void;
}Usage Examples:
import { TransformStream } from "web-streams-polyfill";
// Transform stream that splits input into multiple chunks
const splitterTransform = new TransformStream({
transform(chunk, controller) {
const text = chunk.toString();
const words = text.split(' ');
// Enqueue each word as a separate chunk
for (const word of words) {
if (word.trim()) {
controller.enqueue(word.trim());
}
}
}
});
// Transform stream with error handling
const validationTransform = new TransformStream({
transform(chunk, controller) {
try {
const data = JSON.parse(chunk.toString());
if (!data.id) {
controller.error(new Error("Missing required 'id' field"));
return;
}
controller.enqueue(data);
} catch (error) {
controller.error(new Error(`Invalid JSON: ${error.message}`));
}
}
});
// Transform stream that terminates early
const takeFirstN = (n: number) => {
let count = 0;
return new TransformStream({
transform(chunk, controller) {
if (count < n) {
controller.enqueue(chunk);
count++;
} else {
controller.terminate(); // Stop processing more chunks
}
}
});
};Configuration object for transform streams that defines how data is transformed.
interface Transformer<I = any, O = any> {
/** Called immediately during construction of the TransformStream */
start?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
/** Called when a new chunk of data is ready to be transformed */
transform?: (chunk: I, controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
/** Called after all chunks written to the writable side have been transformed */
flush?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
/** Called when the readable side is cancelled or the writable side is aborted */
cancel?: (reason: any) => void | PromiseLike<void>;
/** Must be undefined for default transform streams */
readableType?: undefined;
/** Must be undefined for default transform streams */
writableType?: undefined;
}Usage Examples:
import { TransformStream } from "web-streams-polyfill";
// JSON processing transform stream
const jsonProcessor = new TransformStream({
start(controller) {
console.log("Starting JSON processing");
this.buffer = '';
},
transform(chunk, controller) {
this.buffer += chunk.toString();
// Process complete JSON objects
let startIndex = 0;
let braceCount = 0;
for (let i = 0; i < this.buffer.length; i++) {
if (this.buffer[i] === '{') braceCount++;
if (this.buffer[i] === '}') braceCount--;
if (braceCount === 0 && i > startIndex) {
const jsonStr = this.buffer.slice(startIndex, i + 1);
try {
const obj = JSON.parse(jsonStr);
controller.enqueue(obj);
} catch (error) {
controller.error(new Error(`Invalid JSON: ${error.message}`));
return;
}
startIndex = i + 1;
}
}
// Keep remaining incomplete JSON in buffer
this.buffer = this.buffer.slice(startIndex);
},
flush(controller) {
if (this.buffer.trim()) {
controller.error(new Error("Incomplete JSON at end of stream"));
}
console.log("JSON processing completed");
}
});
// Compression transform stream
const compressionTransform = new TransformStream({
start(controller) {
this.chunks = [];
},
transform(chunk, controller) {
// Collect chunks for batch compression
this.chunks.push(chunk);
// Flush when we have enough data
if (this.chunks.length >= 10) {
const combined = this.chunks.join('');
this.chunks = [];
// Simulate compression
const compressed = `compressed(${combined})`;
controller.enqueue(compressed);
}
},
flush(controller) {
// Flush remaining chunks
if (this.chunks.length > 0) {
const combined = this.chunks.join('');
const compressed = `compressed(${combined})`;
controller.enqueue(compressed);
}
}
});
// Rate limiting transform stream
const rateLimiter = (itemsPerSecond: number) => {
let lastTime = Date.now();
const interval = 1000 / itemsPerSecond;
return new TransformStream({
async transform(chunk, controller) {
const now = Date.now();
const timeDiff = now - lastTime;
if (timeDiff < interval) {
// Wait to maintain rate limit
await new Promise(resolve =>
setTimeout(resolve, interval - timeDiff)
);
}
controller.enqueue(chunk);
lastTime = Date.now();
}
});
};type TransformerStartCallback<O> = (
controller: TransformStreamDefaultController<O>
) => void | PromiseLike<void>;
type TransformerTransformCallback<I, O> = (
chunk: I,
controller: TransformStreamDefaultController<O>
) => void | PromiseLike<void>;
type TransformerFlushCallback<O> = (
controller: TransformStreamDefaultController<O>
) => void | PromiseLike<void>;
type TransformerCancelCallback = (reason: any) => void | PromiseLike<void>;// Pass through transform (no modification)
const identityTransform = new TransformStream();
// Equivalent to:
const identityTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk);
}
});// Buffer chunks and emit arrays
const bufferTransform = (bufferSize: number) => new TransformStream({
start(controller) {
this.buffer = [];
},
transform(chunk, controller) {
this.buffer.push(chunk);
if (this.buffer.length >= bufferSize) {
controller.enqueue([...this.buffer]);
this.buffer = [];
}
},
flush(controller) {
if (this.buffer.length > 0) {
controller.enqueue([...this.buffer]);
}
}
});// Transform with async operations
const asyncTransform = new TransformStream({
async transform(chunk, controller) {
try {
// Simulate async processing (e.g., API call)
const result = await processAsync(chunk);
controller.enqueue(result);
} catch (error) {
controller.error(error);
}
}
});
async function processAsync(data: any): Promise<any> {
// Simulate async work
await new Promise(resolve => setTimeout(resolve, 100));
return { processed: data, timestamp: Date.now() };
}Install with Tessl CLI
npx tessl i tessl/npm-web-streams-polyfill