CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-readable-stream

Node.js Streams, a user-land copy of the stream library from Node.js

Pending
Overview
Eval results
Files

stream-operators.mddocs/

Stream Operators

Functional programming methods available on Readable streams for data transformation and processing. These operators provide a chainable API for stream manipulation and are inspired by functional programming patterns.

Capabilities

Stream-Returning Operators

Operators that return new stream instances, allowing for chainable transformations.

map

Transforms each chunk using the provided function, with full type safety.

/**
 * Transform each chunk using a mapping function
 * @param fn - Function to transform each chunk
 * @param options - Optional configuration
 * @returns New readable stream with transformed data
 */
map(fn: (chunk: any, options?: any) => any, options?: any): Readable;

Usage Examples:

const { Readable } = require('readable-stream');

// Create a source stream
const source = Readable.from([1, 2, 3, 4, 5]);

// Transform each number by doubling it
const doubled = source.map((num) => num * 2);

doubled.on('data', (chunk) => {
  console.log('Doubled:', chunk); // 2, 4, 6, 8, 10
});

// String transformation example
const words = Readable.from(['hello', 'world', 'stream']);
const uppercase = words.map((word) => word.toUpperCase());

uppercase.on('data', (chunk) => {
  console.log('Uppercase:', chunk); // "HELLO", "WORLD", "STREAM"
});

filter

Filters chunks based on a predicate function.

/**
 * Filter chunks based on a predicate
 * @param fn - Predicate function that returns true to keep the chunk
 * @param options - Optional configuration
 * @returns New readable stream with filtered data
 */
filter(fn: (chunk: any, options?: any) => boolean, options?: any): Readable;

Usage Examples:

const { Readable } = require('readable-stream');

// Filter even numbers
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
const evenNumbers = numbers.filter((num) => num % 2 === 0);

evenNumbers.on('data', (chunk) => {
  console.log('Even:', chunk); // 2, 4, 6, 8, 10
});

// Filter strings by length
const words = Readable.from(['cat', 'elephant', 'dog', 'hippopotamus']);
const longWords = words.filter((word) => word.length > 5);

longWords.on('data', (chunk) => {
  console.log('Long word:', chunk); // "elephant", "hippopotamus"
});

drop

Skips the first N chunks from the stream.

/**
 * Skip the first N chunks
 * @param number - Number of chunks to skip
 * @param options - Optional configuration
 * @returns New readable stream with first N chunks dropped
 */
drop(number: number, options?: any): Readable;

Usage Examples:

const { Readable } = require('readable-stream');

// Skip first 3 numbers
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);
const afterDrop = numbers.drop(3);

afterDrop.on('data', (chunk) => {
  console.log('After drop:', chunk); // 4, 5, 6, 7
});

take

Takes only the first N chunks from the stream.

/**
 * Take only the first N chunks
 * @param number - Number of chunks to take
 * @param options - Optional configuration
 * @returns New readable stream with only first N chunks
 */
take(number: number, options?: any): Readable;

Usage Examples:

const { Readable } = require('readable-stream');

// Take first 3 numbers
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7]);
const firstThree = numbers.take(3);

firstThree.on('data', (chunk) => {
  console.log('First three:', chunk); // 1, 2, 3
});

asIndexedPairs (Deprecated)

⚠️ DEPRECATED: This operator will be removed in a future version.

Creates index-value pairs for each chunk in the stream.

/**
 * @deprecated This operator will be removed in a future version
 * Map each chunk to [index, value] pairs
 * @param options - Optional configuration
 * @returns New readable stream with indexed pairs
 */
asIndexedPairs(options?: any): Readable;

flatMap

Maps each chunk to an iterable and flattens the results.

/**
 * Map each chunk to an iterable and flatten the results
 * @param fn - Function that returns an iterable for each chunk
 * @param options - Optional configuration
 * @returns New readable stream with flattened results
 */
flatMap(fn: (chunk: any, options?: any) => Iterable<any>, options?: any): Readable;

Usage Examples:

const { Readable } = require('readable-stream');

// Split each string into characters
const words = Readable.from(['hello', 'world']);
const characters = words.flatMap((word) => word.split(''));

characters.on('data', (chunk) => {
  console.log('Character:', chunk); // 'h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd'
});

// Expand numbers into ranges
const ranges = Readable.from([2, 3]);
const expanded = ranges.flatMap((n) => Array.from({length: n}, (_, i) => i));

expanded.on('data', (chunk) => {
  console.log('Expanded:', chunk); // 0, 1, 0, 1, 2
});

compose

Compose the readable stream with another transform stream.

/**
 * Compose this readable with a transform stream
 * @param stream - Transform stream to compose with
 * @param options - Optional configuration
 * @returns New readable stream representing the composition
 */
compose(stream: NodeJS.ReadWriteStream, options?: any): Readable;

Usage Examples:

const { Readable, Transform } = require('readable-stream');

// Create a transform stream
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Compose with the transform
const source = Readable.from(['hello', 'world']);
const composed = source.compose(upperCaseTransform);

composed.on('data', (chunk) => {
  console.log('Composed result:', chunk.toString()); // "HELLO", "WORLD"
});

Promise-Returning Operators

Operators that return promises, useful for consuming stream data or performing reductions.

reduce

Reduces the stream to a single value using an accumulator function.

/**
 * Reduce the stream to a single value
 * @param fn - Reducer function
 * @param initial - Initial accumulator value
 * @param options - Optional configuration
 * @returns Promise that resolves to the final accumulated value
 */
reduce(fn: (previous: any, current: any, options?: any) => any, initial?: any, options?: any): Promise<any>;

Usage Examples:

const { Readable } = require('readable-stream');

// Sum all numbers
const numbers = Readable.from([1, 2, 3, 4, 5]);
const sum = await numbers.reduce((acc, num) => acc + num, 0);
console.log('Sum:', sum); // 15

// Concatenate strings
const words = Readable.from(['hello', ' ', 'world']);
const sentence = await words.reduce((acc, word) => acc + word, '');
console.log('Sentence:', sentence); // "hello world"

// Find maximum value
const values = Readable.from([3, 7, 2, 9, 1]);
const max = await values.reduce((acc, val) => Math.max(acc, val), -Infinity);
console.log('Max:', max); // 9

toArray

Collects all chunks from the stream into an array.

/**
 * Collect all chunks into an array
 * @param options - Optional configuration
 * @returns Promise that resolves to an array of all chunks
 */
toArray(options?: any): Promise<any[]>;

Usage Examples:

const { Readable } = require('readable-stream');

// Collect all numbers
const numbers = Readable.from([1, 2, 3, 4, 5]);
const array = await numbers.toArray();
console.log('Array:', array); // [1, 2, 3, 4, 5]

// Collect transformed data
const doubled = Readable.from([1, 2, 3]).map(x => x * 2);
const result = await doubled.toArray();
console.log('Doubled array:', result); // [2, 4, 6]

forEach

Executes a function for each chunk in the stream.

/**
 * Execute a function for each chunk
 * @param fn - Function to execute for each chunk
 * @param options - Optional configuration
 * @returns Promise that resolves when all chunks have been processed
 */
forEach(fn: (chunk: any, options?: any) => void, options?: any): Promise<void>;

Usage Examples:

const { Readable } = require('readable-stream');

// Log each chunk
const numbers = Readable.from([1, 2, 3, 4, 5]);
await numbers.forEach((num) => {
  console.log('Processing:', num);
});
console.log('All numbers processed');

// Perform side effects
const data = Readable.from(['file1.txt', 'file2.txt']);
await data.forEach((filename) => {
  console.log('Would process file:', filename);
  // In real code, you might read/process the file here
});

every

Tests whether all chunks in the stream pass a predicate test.

/**
 * Test whether all chunks pass a predicate
 * @param fn - Predicate function to test each chunk
 * @param options - Optional configuration
 * @returns Promise that resolves to true if all chunks pass the test
 */
every(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;

Usage Examples:

const { Readable } = require('readable-stream');

// Check if all numbers are positive
const positiveNumbers = Readable.from([1, 2, 3, 4, 5]);
const allPositive = await positiveNumbers.every(num => num > 0);
console.log('All positive:', allPositive); // true

// Check if all strings are long enough
const words = Readable.from(['hello', 'world', 'stream']);
const allLongEnough = await words.every(word => word.length >= 5);
console.log('All long enough:', allLongEnough); // true

some

Tests whether at least one chunk in the stream passes a predicate test.

/**
 * Test whether at least one chunk passes a predicate
 * @param fn - Predicate function to test each chunk
 * @param options - Optional configuration
 * @returns Promise that resolves to true if any chunk passes the test
 */
some(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<boolean>;

Usage Examples:

const { Readable } = require('readable-stream');

// Check if any number is even
const numbers = Readable.from([1, 3, 5, 7, 8]);
const hasEven = await numbers.some(num => num % 2 === 0);
console.log('Has even number:', hasEven); // true

// Check if any string contains 'a'
const words = Readable.from(['hello', 'world', 'stream']);
const hasA = await words.some(word => word.includes('a'));
console.log('Has word with "a":', hasA); // true (stream)

find

Finds the first chunk that passes a predicate test.

/**
 * Find the first chunk that passes a predicate
 * @param fn - Predicate function to test each chunk
 * @param options - Optional configuration
 * @returns Promise that resolves to the first matching chunk or undefined
 */
find(fn: (chunk: any, options?: any) => boolean, options?: any): Promise<any>;

Usage Examples:

const { Readable } = require('readable-stream');

// Find first even number
const numbers = Readable.from([1, 3, 4, 6, 7]);
const firstEven = await numbers.find(num => num % 2 === 0);
console.log('First even:', firstEven); // 4

// Find first long word
const words = Readable.from(['cat', 'elephant', 'dog']);
const firstLongWord = await words.find(word => word.length > 5);
console.log('First long word:', firstLongWord); // "elephant"

Chaining Operators

Stream operators can be chained together to create complex data processing pipelines:

const { Readable } = require('readable-stream');

// Complex processing pipeline
const numbers = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

const result = await numbers
  .filter(n => n % 2 === 0)        // Keep only even numbers: [2, 4, 6, 8, 10]
  .map(n => n * n)                 // Square them: [4, 16, 36, 64, 100]
  .drop(1)                         // Skip first: [16, 36, 64, 100]
  .take(3)                         // Take first 3: [16, 36, 64]
  .reduce((sum, n) => sum + n, 0); // Sum them up

console.log('Result:', result); // 116

Error Handling

Stream operators properly propagate errors through the chain:

const { Readable } = require('readable-stream');

const source = Readable.from([1, 2, 3, 4, 5]);

try {
  const result = await source
    .map(n => {
      if (n === 3) throw new Error('Processing error');
      return n * 2;
    })
    .toArray();
} catch (error) {
  console.error('Pipeline error:', error.message); // "Processing error"
}

Types

// All operators accept an optional options parameter
interface OperatorOptions {
  signal?: AbortSignal;
  highWaterMark?: number;
  [key: string]: any;
}

Install with Tessl CLI

npx tessl i tessl/npm-readable-stream

docs

index.md

promise-api.md

stream-classes.md

stream-operators.md

utility-functions.md

tile.json