CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-stdio

Standard input/output manager for Node.js with command-line parsing, async file reading, interactive terminal, and progress bars

Pending
Overview
Eval results
Files

read.mddocs/

Asynchronous File Reading

Line-by-line processing of standard input or large files without memory concerns, with processing statistics and automatic flow control.

Capabilities

Main Reading Function

Processes input streams line-by-line asynchronously, with automatic backpressure handling and statistics collection.

/**
 * Process input stream line by line asynchronously
 * @param lineHandler - Function to process each line
 * @param input - Input stream (defaults to process.stdin)
 * @returns Promise that resolves with processing statistics when all lines are processed
 */
function read(
  lineHandler: LineHandler,
  input?: NodeJS.ReadableStream
): Promise<Stats>;

/**
 * Line processing function
 * @param line - The line content as string
 * @param index - Zero-based line number
 * @returns Promise that resolves when line processing is complete
 */
type LineHandler = (line: string, index: number) => Promise<any>;

Usage Examples:

import { read } from "stdio";

// Basic line processing
const stats = await read(async (line, index) => {
  console.log(`Line ${index}: ${line}`);
});
console.log(`Processed ${stats.length} lines in ${stats.timeAverage}ms average`);

// Data transformation with processing time tracking
await read(async (line, index) => {
  const data = JSON.parse(line);
  const processed = await processData(data);
  await saveToDatabase(processed);
  
  if (index % 1000 === 0) {
    console.log(`Processed ${index} lines`);
  }
});

// Reading from file instead of stdin
import { createReadStream } from 'fs';

const fileStream = createReadStream('large-file.txt');
await read(async (line, index) => {
  // Process each line from file
  await processLine(line);
}, fileStream);

// Error handling in line processing
await read(async (line, index) => {
  try {
    const result = await riskyOperation(line);
    console.log(`Line ${index}: ${result}`);
  } catch (error) {
    console.error(`Failed to process line ${index}: ${error.message}`);
    // Throwing here will stop processing and reject the main promise
    throw error;
  }
});

Statistics Interface

Provides processing statistics including timing information and line counts.

interface Stats {
  /** Total number of lines processed */
  length: number;
  /** Processing time for each line in milliseconds */
  times: number[];
  /** Average processing time per line in milliseconds */
  timeAverage: number;
}

Statistics are automatically calculated:

  • length: Incremented for each line received
  • times: Array of processing times for performance analysis
  • timeAverage: Computed average of processing times

Internal State Management

The read function manages complex internal state for optimal performance and memory usage.

interface State {
  /** Buffer of lines waiting to be processed */
  buffer: string[];
  /** Whether the input stream is still open */
  isOpen: boolean;
  /** Processing statistics */
  stats: Stats;
  /** Readline interface for stream handling */
  reader: ReadLine;
  /** Promise resolution function */
  resolve: Function;
  /** Promise rejection function */
  reject: Function;
  /** User-provided line processing function */
  lineHandler: LineHandler;
  /** Current line index (zero-based) */
  index: number;
}

Advanced Features

Automatic Flow Control

The read function automatically manages stream flow to prevent memory issues:

  • Backpressure Handling: Pauses input stream during line processing
  • Buffer Management: Maintains small line buffer to prevent memory overflow
  • Resume Logic: Resumes stream reading when processing completes

Memory Efficiency

Designed for processing arbitrarily large files:

  • Streaming Processing: Only one line in memory at a time
  • No Accumulation: Does not accumulate results (user handles storage)
  • Automatic Cleanup: Properly closes streams and clears buffers

Error Propagation

Comprehensive error handling throughout the processing pipeline:

// Errors in line handler stop processing
await read(async (line, index) => {
  if (line.startsWith('ERROR')) {
    throw new Error(`Invalid line at ${index}`);
  }
  // This will cause the main read promise to reject
});

// Stream errors are properly propagated
const brokenStream = new BrokenReadableStream();
try {
  await read(lineHandler, brokenStream);
} catch (error) {
  console.error('Stream error:', error);
}

Performance Monitoring

Built-in timing statistics for performance analysis:

await read(async (line, index) => {
  // Long-running processing
  await complexOperation(line);
  
  // Stats are automatically collected:
  // - Processing time for this line
  // - Running average of all processing times
  // - Total line count
});

// Access final statistics through promise resolution
const stats = await read(lineHandler);
console.log(`Processed ${stats.length} lines`);
console.log(`Average time: ${stats.timeAverage}ms per line`);

Integration with Node.js Streams

Works seamlessly with any Node.js readable stream:

import { createReadStream } from 'fs';
import { createGunzip } from 'zlib';
import { read } from 'stdio';

// Reading compressed files
const gzipStream = createReadStream('data.txt.gz').pipe(createGunzip());
await read(async (line) => {
  // Process decompressed lines
}, gzipStream);

// Reading from HTTP responses
import { get } from 'https';

get('https://api.example.com/data', (response) => {
  read(async (line) => {
    const record = JSON.parse(line);
    await processRecord(record);
  }, response);
});

Install with Tessl CLI

npx tessl i tessl/npm-stdio

docs

ask.md

getopt.md

index.md

progress-bar.md

read.md

readLine.md

tile.json