CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Overview
Eval results
Files

writable.mddocs/

Writable Streams

Writable streams in StreamX provide enhanced drain handling, batch writing support, and proper finish/close lifecycle management. They include integrated backpressure handling and support both individual writes and batch operations.

Capabilities

Writable Class

Creates a writable stream with enhanced lifecycle support and proper resource management.

/**
 * Creates a new writable stream with enhanced lifecycle support
 * @param options - Configuration options for the writable stream
 */
class Writable extends Stream {
  constructor(options?: WritableOptions);
  
  /** Override this method to implement custom write logic */
  _write(data: any, cb: (err?: Error) => void): void;
  
  /** Override this method to implement batch write logic */
  _writev(batch: any[], cb: (err?: Error) => void): void;
  
  /** Lifecycle hook called before the first write operation */
  _open(cb: (err?: Error) => void): void;
  
  /** Cleanup hook called when the stream is destroyed */
  _destroy(cb: (err?: Error) => void): void;
  
  /** Hook called immediately when destroy() is first invoked */
  _predestroy(): void;
  
  /** Hook called just before 'finish' is emitted */
  _final(cb: (err?: Error) => void): void;
  
  /** Write data to the stream */
  write(data: any): boolean;
  
  /** End the writable stream gracefully */
  end(): Writable;
  
  /** Forcefully destroy the stream */
  destroy(err?: Error): void;
}

interface WritableOptions {
  /** Maximum buffer size in bytes (default: 16384) */
  highWaterMark?: number;
  
  /** Optional function to map input data */
  map?: (data: any) => any;
  
  /** Optional function to calculate byte size of data */
  byteLength?: (data: any) => number;
  
  /** AbortSignal that triggers destroy when aborted */
  signal?: AbortSignal;
  
  /** Shorthand for _write method */
  write?: (data: any, cb: (err?: Error) => void) => void;
  
  /** Shorthand for _writev method */
  writev?: (batch: any[], cb: (err?: Error) => void) => void;
  
  /** Shorthand for _final method */
  final?: (cb: (err?: Error) => void) => void;
  
  /** Shorthand for _open method */
  open?: (cb: (err?: Error) => void) => void;
  
  /** Shorthand for _destroy method */
  destroy?: (cb: (err?: Error) => void) => void;
  
  /** Shorthand for _predestroy method */
  predestroy?: () => void;
}

Usage Examples:

const { Writable } = require('streamx');

// Basic writable stream
const writable = new Writable({
  write(data, cb) {
    console.log('Received:', data.toString());
    cb(); // Signal completion
  }
});

// Write some data
writable.write('Hello, ');
writable.write('World!');
writable.end(); // End the stream

writable.on('finish', () => {
  console.log('All writes completed');
});

// Writable with lifecycle hooks
const fileWriter = new Writable({
  open(cb) {
    console.log('Opening file for writing...');
    // Open file or resource
    cb();
  },
  
  write(data, cb) {
    console.log('Writing:', data.toString());
    // Write to file
    cb();
  },
  
  final(cb) {
    console.log('Finalizing writes...');
    // Flush buffers, etc.
    cb();
  },
  
  destroy(cb) {
    console.log('Closing file...');
    // Clean up resources
    cb();
  }
});

Batch Writing

StreamX supports efficient batch writing through the _writev method.

/**
 * Override this method to implement batch write operations
 * @param batch - Array of data items to write
 * @param cb - Callback to signal completion
 */
_writev(batch: any[], cb: (err?: Error) => void): void;

Batch Writing Example:

const batchWriter = new Writable({
  writev(batch, cb) {
    console.log(`Writing batch of ${batch.length} items:`);
    batch.forEach((item, index) => {
      console.log(`  ${index}: ${item.toString()}`);
    });
    cb();
  }
});

// Multiple writes will be batched automatically
batchWriter.write('item 1');
batchWriter.write('item 2');
batchWriter.write('item 3');
batchWriter.end();

Static Methods

StreamX provides static utility methods for writable stream inspection and management.

/**
 * Check if a writable stream is under backpressure
 * @param stream - The writable stream to check
 * @returns True if the stream is backpressured
 */
static isBackpressured(stream: Writable): boolean;

/**
 * Wait for a stream to drain the currently queued writes
 * @param stream - The writable stream to wait for
 * @returns Promise that resolves when drained or false if destroyed
 */
static drained(stream: Writable): Promise<boolean>;

Static Method Examples:

const { Writable } = require('streamx');

const writable = new Writable({
  write(data, cb) {
    // Simulate slow writing
    setTimeout(() => {
      console.log('Written:', data.toString());
      cb();
    }, 100);
  }
});

// Check backpressure
if (Writable.isBackpressured(writable)) {
  console.log('Stream is backpressured, waiting...');
  
  // Wait for drain
  Writable.drained(writable).then((success) => {
    if (success) {
      console.log('Stream drained successfully');
    } else {
      console.log('Stream was destroyed');
    }
  });
}

// Write data
for (let i = 0; i < 10; i++) {
  const canContinue = writable.write(`Message ${i}`);
  if (!canContinue) {
    console.log('Backpressure detected');
    break;
  }
}

Events

Writable streams emit various events during their lifecycle.

interface WritableEvents {
  /** Emitted when the stream buffer is drained and ready for more writes */
  'drain': () => void;
  
  /** Emitted when all writes have been flushed after end() is called */
  'finish': () => void;
  
  /** Emitted when the stream has been fully closed */
  'close': () => void;
  
  /** Emitted when an error occurs */
  'error': (err: Error) => void;
  
  /** Emitted when a readable stream is piped to this writable */
  'pipe': (src: Readable) => void;
}

Properties

interface WritableProperties {
  /** Boolean indicating whether the stream has been destroyed */
  destroyed: boolean;
}

Advanced Configuration

StreamX writable streams support advanced configuration for specialized use cases.

Backpressure Handling:

const writable = new Writable({
  highWaterMark: 1024, // Small buffer for demonstration
  
  write(data, cb) {
    console.log('Writing:', data.toString());
    // Simulate async write
    setTimeout(cb, 10);
  }
});

function writeWithBackpressure(data) {
  const canContinue = writable.write(data);
  
  if (!canContinue) {
    console.log('Backpressure detected, waiting for drain...');
    writable.once('drain', () => {
      console.log('Stream drained, can continue writing');
    });
  }
  
  return canContinue;
}

// Write data with backpressure handling
for (let i = 0; i < 100; i++) {
  writeWithBackpressure(`Data chunk ${i}`);
}

Data Transformation:

const transformWriter = new Writable({
  map: (data) => {
    // Transform data before writing
    if (typeof data === 'string') {
      return Buffer.from(data.toUpperCase());
    }
    return data;
  },
  
  write(data, cb) {
    console.log('Transformed data:', data.toString());
    cb();
  }
});

transformWriter.write('hello world'); // Will be transformed to uppercase

AbortSignal Integration:

const controller = new AbortController();

const writable = new Writable({
  signal: controller.signal,
  
  write(data, cb) {
    // Check if aborted before processing
    if (controller.signal.aborted) {
      return cb(new Error('Aborted'));
    }
    
    console.log('Writing:', data.toString());
    cb();
  }
});

// Write some data
writable.write('test data');

// Abort after 1 second
setTimeout(() => {
  controller.abort();
  console.log('Write operation aborted');
}, 1000);

Error Handling

StreamX provides comprehensive error handling with automatic cleanup.

const errorProneWriter = new Writable({
  write(data, cb) {
    if (data.toString().includes('error')) {
      // Pass error to callback
      return cb(new Error('Write failed'));
    }
    
    console.log('Successfully wrote:', data.toString());
    cb();
  }
});

errorProneWriter.on('error', (err) => {
  console.error('Stream error:', err.message);
});

errorProneWriter.on('close', () => {
  console.log('Stream closed (cleanup completed)');
});

// This will trigger an error
errorProneWriter.write('This contains error');

Install with Tessl CLI

npx tessl i tessl/npm-streamx

docs

duplex-transform.md

index.md

pipeline.md

readable.md

writable.md

tile.json