Node.js Streams, a user-land copy of the stream library from Node.js
—
Functional programming methods available on Readable streams for data transformation and processing. These operators provide a chainable API for stream manipulation and are inspired by functional programming patterns.
Operators that return new stream instances, allowing for chainable transformations.
Transforms each chunk using the provided function, with full type safety.
/**
* Transform each chunk using a mapping function
* @param fn - Function to transform each chunk
* @param options - Optional configuration
* @returns New readable stream with transformed data
*/
map(fn: (chunk: any, options?: any) => any, options?: any): Readable;Usage Examples:
const { Readable } = require('readable-stream');
// Create a source stream
const source = Readable.from([1, 2, 3, 4, 5]);
// Transform each number by doubling it
const doubled = source.map((num) => num * 2);
doubled.on('data', (chunk) => {
console.log('Doubled:', chunk); // 2, 4, 6, 8, 10
});
// String transformation example
const words = Readable.from(['hello', 'world', 'stream']);
const uppercase = words.map((word) => word.toUpperCase());
uppercase.on('data', (chunk) => {
console.log('Uppercase:', chunk); // "HELLO", "WORLD", "STREAM"
});Filters chunks based on a predicate function.
/**
* Filter chunks based on a predicate
* @param fn - Predicate function that returns true to keep the chunk
* @param options - Optional configuration
* @returns New readable stream with filtered data
*/
filter(fn: (chunk: any, options?: any) => boolean, options?: any): Readable;Usage Examples:
const { Readable } = require('readable-stream');
// Filter even numbers
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const evenNumbers = numbers.filter((num) => num % 2 === 0);
evenNumbers.on('data', (chunk) => {
console.log('Even:', chunk); // 2, 4, 6, 8, 10
});
// Filter strings by length
const words = Readable.from(['cat', 'elephant', 'dog', 'hippopotamus']);
const longWords = words.filter((word) => word.length > 5);
longWords.on('data', (chunk) => {
console.log('Long word:', chunk); // "elephant", "hippopotamus"
});Skips the first N chunks from the stream.
/**
* Skip the first N chunks
* @param number - Number of chunks to skip
* @param options - Optional configuration
* @returns New readable stream with first N chunks dropped
*/
drop(number: number, options?: any): Readable;Usage Examples:
const { Readable } = require('readable-stream');
// Skip first 3 numbers
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);
const afterDrop = numbers.drop(3);
afterDrop.on('data', (chunk) => {
console.log('After drop:', chunk); // 4, 5, 6, 7
});Takes only the first N chunks from the stream.
/**
* Take only the first N chunks
* @param number - Number of chunks to take
* @param options - Optional configuration
* @returns New readable stream with only first N chunks
*/
take(number: number, options?: any): Readable;Usage Examples:
const { Readable } = require('readable-stream');
// Take first 3 numbers
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);
const firstThree = numbers.take(3);
firstThree.on('data', (chunk) => {
console.log('First three:', chunk); // 1, 2, 3
});⚠️ DEPRECATED: This operator will be removed in a future version.
Creates index-value pairs for each chunk in the stream.
/**
* @deprecated This operator will be removed in a future version
* Map each chunk to [index, value] pairs
* @param options - Optional configuration
* @returns New readable stream with indexed pairs
*/
asIndexedPairs(options?: any): Readable;Maps each chunk to an iterable and flattens the results.
/**
* Map each chunk to an iterable and flatten the results
* @param fn - Function that returns an iterable for each chunk
* @param options - Optional configuration
* @returns New readable stream with flattened results
*/
flatMap(fn: (chunk: any, options?: any) => Iterable<any>, options?: any): Readable;Usage Examples:
const { Readable } = require('readable-stream');
// Split each string into characters
const words = Readable.from(['hello', 'world']);
const characters = words.flatMap((word) => word.split(''));
characters.on('data', (chunk) => {
console.log('Character:', chunk); // 'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'
});
// Expand numbers into ranges
const ranges = Readable.from([2, 3]);
const expanded = ranges.flatMap((n) => Array.from({length: n}, (_, i) => i));
expanded.on('data', (chunk) => {
console.log('Expanded:', chunk); // 0, 1, 0, 1, 2
});Compose the readable stream with another transform stream.
/**
* Compose this readable with a transform stream
* @param stream - Transform stream to compose with
* @param options - Optional configuration
* @returns New readable stream representing the composition
*/
compose(stream: NodeJS.ReadWriteStream, options?: any): Readable;Usage Examples:
const { Readable, Transform } = require('readable-stream');
// Create a transform stream
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Compose with the transform
const source = Readable.from(['hello', 'world']);
const composed = source.compose(upperCaseTransform);
composed.on('data', (chunk) => {
console.log('Composed result:', chunk.toString()); // "HELLO", "WORLD"
});Operators that return promises, useful for consuming stream data or performing reductions.
Reduces the stream to a single value using an accumulator function.
/**
* Reduce the stream to a single value
* @param fn - Reducer function
* @param initial - Initial accumulator value
* @param options - Optional configuration
* @returns Promise that resolves to the final accumulated value
*/
reduce(fn: (previous: any, current: any, options?: any) => any, initial?: any, options?: any): Promise<any>;Usage Examples:
const { Readable } = require('readable-stream');
// Sum all numbers
const numbers = Readable.from([1, 2, 3, 4, 5]);
const sum = await numbers.reduce((acc, num) => acc + num, 0);
console.log('Sum:', sum); // 15
// Concatenate strings
const words = Readable.from(['hello', ' ', 'world']);
const sentence = await words.reduce((acc, word) => acc + word, '');
console.log('Sentence:', sentence); // "hello world"
// Find maximum value
const values = Readable.from([3, 7, 2, 9, 1]);
const max = await values.reduce((acc, val) => Math.max(acc, val), -Infinity);
console.log('Max:', max); // 9Collects all chunks from the stream into an array.
/**
* Collect all chunks into an array
* @param options - Optional configuration
* @returns Promise that resolves to an array of all chunks
*/
toArray(options?: any): Promise<any[]>;Usage Examples:
const { Readable } = require('readable-stream');
// Collect all numbers
const numbers = Readable.from([1, 2, 3, 4, 5]);
const array = await numbers.toArray();
console.log('Array:', array); // [1, 2, 3, 4, 5]
// Collect transformed data
const doubled = Readable.from([1, 2, 3]).map(x => x * 2);
const result = await doubled.toArray();
console.log('Doubled array:', result); // [2, 4, 6]Executes a function for each chunk in the stream.
/**
* Execute a function for each chunk
* @param fn - Function to execute for each chunk
* @param options - Optional configuration
* @returns Promise that resolves when all chunks have been processed
*/
forEach(fn: (chunk: any, options?: any) => void, options?: any): Promise<void>;Usage Examples:
const { Readable } = require('readable-stream');
// Log each chunk
const numbers = Readable.from([1, 2, 3, 4, 5]);
await numbers.forEach((num) => {
console.log('Processing:', num);
});
console.log('All numbers processed');
// Perform side effects
const data = Readable.from(['file1.txt', 'file2.txt']);
await data.forEach((filename) => {
console.log('Would process file:', filename);
// In real code, you might read/process the file here
});Tests whether all chunks in the stream pass a predicate test.
/**
* Test whether all chunks pass a predicate
* @param fn - Predicate function to test each chunk
* @param options - Optional configuration
* @returns Promise that resolves to true if all chunks pass the test
*/
every(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;Usage Examples:
const { Readable } = require('readable-stream');
// Check if all numbers are positive
const positiveNumbers = Readable.from([1, 2, 3, 4, 5]);
const allPositive = await positiveNumbers.every(num => num > 0);
console.log('All positive:', allPositive); // true
// Check if all strings are long enough
const words = Readable.from(['hello', 'world', 'stream']);
const allLongEnough = await words.every(word => word.length >= 5);
console.log('All long enough:', allLongEnough); // trueTests whether at least one chunk in the stream passes a predicate test.
/**
* Test whether at least one chunk passes a predicate
* @param fn - Predicate function to test each chunk
* @param options - Optional configuration
* @returns Promise that resolves to true if any chunk passes the test
*/
some(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;Usage Examples:
const { Readable } = require('readable-stream');
// Check if any number is even
const numbers = Readable.from([1, 3, 5, 7, 8]);
const hasEven = await numbers.some(num => num % 2 === 0);
console.log('Has even number:', hasEven); // true
// Check if any string contains 'a'
const words = Readable.from(['hello', 'world', 'stream']);
const hasA = await words.some(word => word.includes('a'));
console.log('Has word with "a":', hasA); // true (stream)Finds the first chunk that passes a predicate test.
/**
* Find the first chunk that passes a predicate
* @param fn - Predicate function to test each chunk
* @param options - Optional configuration
* @returns Promise that resolves to the first matching chunk or undefined
*/
find(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<any>;Usage Examples:
const { Readable } = require('readable-stream');
// Find first even number
const numbers = Readable.from([1, 3, 4, 6, 7]);
const firstEven = await numbers.find(num => num % 2 === 0);
console.log('First even:', firstEven); // 4
// Find first long word
const words = Readable.from(['cat', 'elephant', 'dog']);
const firstLongWord = await words.find(word => word.length > 5);
console.log('First long word:', firstLongWord); // "elephant"Stream operators can be chained together to create complex data processing pipelines:
const { Readable } = require('readable-stream');
// Complex processing pipeline
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const result = await numbers
.filter(n => n % 2 === 0) // Keep only even numbers: [2, 4, 6, 8, 10]
.map(n => n * n) // Square them: [4, 16, 36, 64, 100]
.drop(1) // Skip first: [16, 36, 64, 100]
.take(3) // Take first 3: [16, 36, 64]
.reduce((sum, n) => sum + n, 0); // Sum them up
console.log('Result:', result); // 116Stream operators properly propagate errors through the chain:
const { Readable } = require('readable-stream');
const source = Readable.from([1, 2, 3, 4, 5]);
try {
const result = await source
.map(n => {
if (n === 3) throw new Error('Processing error');
return n * 2;
})
.toArray();
} catch (error) {
console.error('Pipeline error:', error.message); // "Processing error"
}// All operators accept an optional options parameter
interface OperatorOptions {
signal?: AbortSignal;
highWaterMark?: number;
[key: string]: any;
}Install with Tessl CLI
npx tessl i tessl/npm-readable-stream