CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-readable-stream

Node.js Streams, a user-land copy of the stream library from Node.js

Pending
Overview
Eval results
Files

promise-api.mddocs/

Promise API

Promise-based versions of utility functions that integrate seamlessly with modern async/await patterns. These functions provide the same functionality as their callback-based counterparts but return promises for cleaner async code.

Capabilities

promises.pipeline

Promise-based version of the pipeline utility function for composing streams.

/**
 * Promise-based pipeline for composing multiple streams
 * @param streams - Sequence of streams to pipe together
 * @returns Promise that resolves when the pipeline completes successfully
 */
const promises = {
  pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
};

Usage Examples:

const { promises, Transform } = require('readable-stream');
const fs = require('fs');

// Async/await pipeline
async function processFile() {
  try {
    await promises.pipeline(
      fs.createReadStream('input.txt'),
      new Transform({
        transform(chunk, encoding, callback) {
          // Convert to uppercase
          this.push(chunk.toString().toUpperCase());
          callback();
        }
      }),
      fs.createWriteStream('output.txt')
    );
    console.log('File processed successfully');
  } catch (error) {
    console.error('Pipeline failed:', error);
  }
}

// Multiple transform pipeline
async function complexProcessing() {
  const upperCase = new Transform({
    transform(chunk, encoding, callback) {
      this.push(chunk.toString().toUpperCase());
      callback();
    }
  });

  const addLineNumbers = new Transform({
    objectMode: true,
    transform(chunk, encoding, callback) {
      const lines = chunk.toString().split('\n');
      lines.forEach((line, index) => {
        if (line.trim()) {
          this.push(`${index + 1}: ${line}\n`);
        }
      });
      callback();
    }
  });

  try {
    await promises.pipeline(
      fs.createReadStream('source.txt'),
      upperCase,
      addLineNumbers,
      fs.createWriteStream('numbered.txt')
    );
    console.log('Complex processing completed');
  } catch (error) {
    console.error('Processing failed:', error);
  }
}

promises.finished

Promise-based version of the finished utility function for monitoring stream completion.

/**
 * Promise-based stream completion monitoring
 * @param stream - Stream to monitor for completion
 * @param options - Optional configuration for what events to wait for
 * @returns Promise that resolves when the stream is finished
 */
const promises = {
  finished: (
    stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
    options?: FinishedOptions
  ) => Promise<void>;
};

Usage Examples:

const { promises, Readable, Writable } = require('readable-stream');

// Monitor readable stream completion
async function monitorReadable() {
  const readable = new Readable({
    read() {
      this.push('chunk 1');
      this.push('chunk 2');
      this.push(null); // End stream
    }
  });

  // Start reading the stream
  readable.on('data', (chunk) => {
    console.log('Read:', chunk.toString());
  });

  try {
    await promises.finished(readable);
    console.log('Readable stream finished successfully');
  } catch (error) {
    console.error('Readable stream error:', error);
  }
}

// Monitor writable stream completion
async function monitorWritable() {
  const writable = new Writable({
    write(chunk, encoding, callback) {
      console.log('Writing:', chunk.toString());
      // Simulate async operation
      setTimeout(callback, 100);
    }
  });

  // Start writing to the stream
  writable.write('data 1');
  writable.write('data 2');
  writable.end();

  try {
    await promises.finished(writable);
    console.log('Writable stream finished successfully');
  } catch (error) {
    console.error('Writable stream error:', error);
  }
}

// Monitor with specific options
async function monitorWithOptions() {
  const readable = new Readable({
    read() {
      // Simulate data production
      setTimeout(() => {
        this.push('data');
      }, 100);
    }
  });

  try {
    await promises.finished(readable, {
      readable: true,
      writable: false, // Don't wait for writable events
      error: true      // Wait for error events
    });
    console.log('Stream monitoring completed');
  } catch (error) {
    console.error('Stream monitoring failed:', error);
  }
}

Combining Promise API with Stream Operators

The Promise API works seamlessly with stream operators:

const { promises, Readable } = require('readable-stream');

async function processDataWithOperators() {
  // Create a readable stream
  const dataSource = new Readable({
    objectMode: true,
    read() {
      // Simulate data source
      const data = [1, 2, 3, 4, 5];
      data.forEach(item => this.push(item));
      this.push(null);
    }
  });

  // Transform with operators
  const transformed = dataSource
    .map(x => x * 2)
    .filter(x => x > 5);

  // Use promises to collect results
  try {
    const results = await transformed.toArray();
    console.log('Results:', results); // [6, 8, 10]
  } catch (error) {
    console.error('Processing failed:', error);
  }
}

// Pipeline with operator-transformed streams
async function pipelineWithOperators() {
  const source = Readable.from([1, 2, 3, 4, 5]);
  const doubled = source.map(x => x * 2);
  
  const writable = new Writable({
    objectMode: true,
    write(chunk, encoding, callback) {
      console.log('Received:', chunk);
      callback();
    }
  });

  try {
    await promises.pipeline(doubled, writable);
    console.log('Pipeline with operators completed');
  } catch (error) {
    console.error('Pipeline failed:', error);
  }
}

Error Handling Patterns

The Promise API provides clean error handling patterns:

const { promises, Readable, Transform } = require('readable-stream');

// Graceful error handling
async function robustProcessing() {
  const source = new Readable({
    read() {
      this.push('valid data');
      this.push('invalid data');
      this.push(null);
    }
  });

  const validator = new Transform({
    transform(chunk, encoding, callback) {
      const data = chunk.toString();
      if (data === 'invalid data') {
        callback(new Error('Data validation failed'));
        return;
      }
      this.push(data.toUpperCase());
      callback();
    }
  });

  const output = new Writable({
    write(chunk, encoding, callback) {
      console.log('Processed:', chunk.toString());
      callback();
    }
  });

  try {
    await promises.pipeline(source, validator, output);
    console.log('Processing completed successfully');
  } catch (error) {
    console.error('Processing failed:', error.message);
    // Handle cleanup or recovery here
  }
}

// Multiple pipeline error handling
async function multipleOperations() {
  const operations = [
    () => promises.pipeline(/* pipeline 1 */),
    () => promises.pipeline(/* pipeline 2 */),
    () => promises.pipeline(/* pipeline 3 */)
  ];

  const results = await Promise.allSettled(
    operations.map(op => op())
  );

  results.forEach((result, index) => {
    if (result.status === 'fulfilled') {
      console.log(`Operation ${index + 1} succeeded`);
    } else {
      console.error(`Operation ${index + 1} failed:`, result.reason);
    }
  });
}

AbortSignal Support

The Promise API supports AbortSignal for cancellation:

const { promises, Readable, Transform } = require('readable-stream');

async function cancellableOperation() {
  const controller = new AbortController();
  const { signal } = controller;

  // Cancel after 5 seconds
  setTimeout(() => {
    controller.abort();
  }, 5000);

  const slowSource = new Readable({
    read() {
      // Simulate slow data production
      setTimeout(() => {
        this.push('data');
      }, 1000);
    }
  });

  const slowTransform = new Transform({
    transform(chunk, encoding, callback) {
      // Simulate slow processing
      setTimeout(() => {
        this.push(chunk.toString().toUpperCase());
        callback();
      }, 2000);
    }
  });

  try {
    await promises.finished(slowSource, { signal });
    console.log('Operation completed');
  } catch (error) {
    if (error.name === 'AbortError') {
      console.log('Operation was cancelled');
    } else {
      console.error('Operation failed:', error);
    }
  }
}

Integration with Other APIs

The Promise API integrates well with other Node.js Promise APIs:

const { promises, Readable } = require('readable-stream');
const { promises: fs } = require('fs');

async function fileProcessingWorkflow() {
  try {
    // Read file list
    const files = await fs.readdir('./data');
    
    // Create stream from file list
    const fileStream = Readable.from(files);
    
    // Process each file
    const processedFiles = await fileStream
      .filter(filename => filename.endsWith('.txt'))
      .map(async (filename) => {
        const content = await fs.readFile(`./data/${filename}`, 'utf8');
        return { filename, content, size: content.length };
      })
      .toArray();

    console.log('Processed files:', processedFiles);
  } catch (error) {
    console.error('Workflow failed:', error);
  }
}

Types

interface FinishedOptions {
  error?: boolean;      // Wait for error event (default: true)
  readable?: boolean;   // Wait for readable to end (default: true)
  writable?: boolean;   // Wait for writable to finish (default: true)
  signal?: AbortSignal; // AbortSignal for cancellation
}

// Promise API namespace
interface StreamPromises {
  pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
  finished: (stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options?: FinishedOptions) => Promise<void>;
}

Install with Tessl CLI

npx tessl i tessl/npm-readable-stream

docs

index.md

promise-api.md

stream-classes.md

stream-operators.md

utility-functions.md

tile.json