Node.js Streams, a user-land copy of the stream library from Node.js
—
Essential utilities for stream composition, error handling, and lifecycle management. These functions provide robust patterns for working with multiple streams and are crucial for building reliable streaming applications.
Pipes between streams forwarding errors and cleaning up properly, providing a robust way to compose multiple streams.
/**
* Pipe between streams, handling errors and cleanup automatically
* @param streams - Sequence of streams to pipe together
* @param callback - Called when pipeline completes or errors
* @returns The last stream in the pipeline
*/
function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>, callback: (err: NodeJS.ErrnoException | null) => void): NodeJS.ReadableStream;
/**
* Promise-based pipeline (available via promises.pipeline)
* @param streams - Sequence of streams to pipe together
* @returns Promise that resolves when pipeline completes
*/
function pipeline(...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>): Promise<void>;Usage Examples:
const { pipeline, Readable, Transform } = require('readable-stream');
const fs = require('fs');
// Callback-based pipeline
pipeline(
fs.createReadStream('input.txt'),
new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}),
fs.createWriteStream('output.txt'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
// Promise-based pipeline
const { pipeline: pipelineAsync } = require('readable-stream').promises;
async function processFile() {
try {
await pipelineAsync(
fs.createReadStream('input.txt'),
new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toLowerCase());
callback();
}
}),
fs.createWriteStream('output.txt')
);
console.log('Pipeline completed successfully');
} catch (error) {
console.error('Pipeline failed:', error);
}
}Get notified when a stream is no longer readable, writable, or has experienced an error or premature close.
/**
* Get notified when stream is finished
* @param stream - Stream to monitor
* @param options - Options for what conditions to wait for
* @param callback - Called when stream is finished or errors
* @returns Cleanup function to remove listeners
*/
function finished(
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
options: FinishedOptions,
callback: (err?: NodeJS.ErrnoException | null) => void
): () => void;
/**
* Simplified version with just callback
* @param stream - Stream to monitor
* @param callback - Called when stream is finished or errors
* @returns Cleanup function to remove listeners
*/
function finished(
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
callback: (err?: NodeJS.ErrnoException | null) => void
): () => void;
/**
* Promise-based finished (available via promises.finished)
* @param stream - Stream to monitor
* @param options - Options for what conditions to wait for
* @returns Promise that resolves when stream is finished
*/
function finished(
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
options?: FinishedOptions
): Promise<void>;Usage Examples:
const { finished, Readable } = require('readable-stream');
// Monitor a stream with callback
const readable = new Readable({
read() {
this.push('data');
this.push(null);
}
});
const cleanup = finished(readable, (err) => {
if (err) {
console.error('Stream error:', err);
} else {
console.log('Stream finished successfully');
}
});
// Promise-based monitoring
const { finished: finishedAsync } = require('readable-stream').promises;
async function monitorStream(stream) {
try {
await finishedAsync(stream);
console.log('Stream completed');
} catch (error) {
console.error('Stream failed:', error);
}
}Compose multiple transform streams into a single transform stream, useful for creating reusable transformation pipelines.
/**
* Compose multiple transform streams into a single transform
* @param streams - Transform streams to compose
* @returns A single transform stream representing the composition
*/
function compose(...streams: Array<NodeJS.ReadWriteStream>): NodeJS.ReadWriteStream;Usage Examples:
const { compose, Transform } = require('readable-stream');
// Create individual transforms
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
const addPrefix = new Transform({
transform(chunk, encoding, callback) {
this.push('PREFIX: ' + chunk.toString());
callback();
}
});
// Compose them into a single transform
const composedTransform = compose(upperCase, addPrefix);
// Use the composed transform
composedTransform.write('hello world');
composedTransform.end();
composedTransform.on('data', (chunk) => {
console.log('Result:', chunk.toString()); // "PREFIX: HELLO WORLD"
});Add AbortSignal support to a stream, allowing for cancellation of stream operations.
/**
* Add abort signal support to a stream
* @param signal - AbortSignal to use for cancellation
* @param stream - Stream to add abort support to
* @returns The stream with abort signal attached
*/
function addAbortSignal<T extends NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>(
signal: AbortSignal,
stream: T
): T;Usage Examples:
const { addAbortSignal, Readable } = require('readable-stream');
// Create an abort controller
const controller = new AbortController();
const { signal } = controller;
// Create a stream with abort support
const readable = addAbortSignal(signal, new Readable({
read() {
// Simulate slow reading
setTimeout(() => {
this.push('chunk');
}, 1000);
}
}));
// Abort after 500ms
setTimeout(() => {
controller.abort();
}, 500);
readable.on('error', (err) => {
if (err.name === 'AbortError') {
console.log('Stream was aborted');
}
});Destroy a stream, calling its destroy method if available, or emitting an error.
/**
* Destroy a stream
* @param stream - Stream to destroy
* @param error - Optional error to emit
*/
function destroy(stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, error?: Error): void;Usage Examples:
const { destroy, Readable } = require('readable-stream');
const readable = new Readable({
read() {
this.push('data');
}
});
// Destroy the stream with an error
destroy(readable, new Error('Something went wrong'));
readable.on('error', (err) => {
console.error('Stream destroyed:', err.message);
});Utility functions for checking stream states.
/**
* Check if a stream has been read from or disturbed
* @param stream - Stream to check
* @returns true if stream has been disturbed
*/
function isDisturbed(stream: NodeJS.ReadableStream): boolean;
/**
* Check if a stream has errored
* @param stream - Stream to check
* @returns true if stream has errored
*/
function isErrored(stream: NodeJS.ReadableStream | NodeJS.WritableStream): boolean;Usage Examples:
const { isDisturbed, isErrored, Readable } = require('readable-stream');
const readable = new Readable({
read() {
this.push('data');
this.push(null);
}
});
console.log(isDisturbed(readable)); // false
readable.read(); // Read some data
console.log(isDisturbed(readable)); // true
console.log(isErrored(readable)); // falseFunctions for configuring the default high water mark for streams.
/**
* Set the default high water mark for streams
* @param objectMode - Whether this is for object mode streams
* @param value - The high water mark value
*/
function setDefaultHighWaterMark(objectMode: boolean, value: number): void;
/**
* Get the default high water mark for streams
* @param objectMode - Whether this is for object mode streams
* @returns The current default high water mark
*/
function getDefaultHighWaterMark(objectMode: boolean): number;Usage Examples:
const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('readable-stream');
// Get current defaults
console.log('Current buffer HWM:', getDefaultHighWaterMark(false)); // 16384
console.log('Current object HWM:', getDefaultHighWaterMark(true)); // 16
// Set new defaults
setDefaultHighWaterMark(false, 32768); // Increase buffer default
setDefaultHighWaterMark(true, 32); // Increase object defaultInternal utility functions exported for compatibility with Node.js streams.
/**
* Convert Uint8Array to Buffer (internal utility)
* @param chunk - Uint8Array to convert
* @returns Buffer representation
*/
function _uint8ArrayToBuffer(chunk: Uint8Array): Buffer;
/**
* Check if value is Uint8Array (internal utility)
* @param value - Value to check
* @returns true if value is Uint8Array
*/
function _isUint8Array(value: any): boolean;Usage Examples:
const { _uint8ArrayToBuffer, _isUint8Array } = require('readable-stream');
// Check if value is Uint8Array
const buffer = new Uint8Array([1, 2, 3]);
console.log(_isUint8Array(buffer)); // true
console.log(_isUint8Array([1, 2, 3])); // false
// Convert Uint8Array to Buffer
const convertedBuffer = _uint8ArrayToBuffer(buffer);
console.log(Buffer.isBuffer(convertedBuffer)); // trueinterface FinishedOptions {
error?: boolean; // Wait for error event (default: true)
readable?: boolean; // Wait for readable to end (default: true)
writable?: boolean; // Wait for writable to finish (default: true)
signal?: AbortSignal; // AbortSignal for cancellation
}Install with Tessl CLI
npx tessl i tessl/npm-readable-stream