CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-mafintosh--streamx

An iteration of the Node.js core streams with a series of improvements

Pending
Overview
Eval results
Files

writable.mddocs/

Writable Streams

Writable stream implementation providing data consumption, buffering, and backpressure management with proper lifecycle handling.

Capabilities

Writable Constructor

Creates a new Writable stream with configuration options.

/**
 * Creates a new Writable stream
 * @param opts - Configuration options for the writable stream
 */
class Writable extends Stream {
  constructor(opts?: WritableOptions);
}

interface WritableOptions extends StreamOptions {
  /** Maximum buffer size in bytes (default: 16384) */
  highWaterMark?: number;
  /** Function to map input data */
  map?: (data: any) => any;
  /** Function to calculate byte size of data */
  byteLength?: (data: any) => number;
  /** Write function shorthand */
  write?: (data: any, cb: (error?: Error) => void) => void;
  /** Final function shorthand */
  final?: (cb: (error?: Error) => void) => void;
}

Usage Example:

const { Writable } = require("@mafintosh/streamx");

const writable = new Writable({
  write(data, cb) {
    console.log("Writing:", data.toString());
    cb(null);
  }
});

Data Writing

_write Method

Called when the stream wants to write data. Override to implement write logic.

/**
 * Called when stream wants to write data
 * @param data - Data to write
 * @param callback - Callback to call when write operation is complete
 */
_write(data: any, callback: (error?: Error) => void): void;

Usage Example:

const fs = require('fs');

class FileWritable extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }

  _open(cb) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) return cb(err);
      this.fd = fd;
      cb(null);
    });
  }

  _write(data, cb) {
    fs.write(this.fd, data, (err) => {
      if (err) return cb(err);
      cb(null);
    });
  }

  _destroy(cb) {
    if (this.fd) {
      fs.close(this.fd, cb);
    } else {
      cb(null);
    }
  }
}

write Method

Writes data to the stream.

/**
 * Write data to the stream
 * @param data - Data to write
 * @returns True if buffer is not full, false if backpressure should be applied
 */
write(data: any): boolean;

Usage Example:

const writable = new Writable({
  write(data, cb) {
    console.log("Received:", data.toString());
    cb(null);
  }
});

const canContinue = writable.write("Hello World");
if (!canContinue) {
  // Wait for 'drain' event before writing more
  writable.once('drain', () => {
    writable.write("More data");
  });
}

Stream Finalization

end Method

Ends the writable stream gracefully.

/**
 * End the writable stream gracefully
 * @param data - Optional final data to write before ending
 */
end(data?: any): void;

Usage Example:

const writable = new Writable({
  write(data, cb) {
    console.log("Writing:", data.toString());
    cb(null);
  }
});

writable.write("First chunk");
writable.write("Second chunk");
writable.end("Final chunk"); // Write final data and end

// Or end without final data
// writable.end();

writable.on('finish', () => {
  console.log('All writes completed');
});

_final Method

Called just before the 'finish' event when all writes have been processed.

/**
 * Called before finish event for final cleanup
 * @param callback - Callback to call when final is complete
 */
_final(callback: (error?: Error) => void): void;

Usage Example:

class BufferedWritable extends Writable {
  constructor() {
    super();
    this.buffer = [];
  }

  _write(data, cb) {
    this.buffer.push(data);
    cb(null);
  }

  _final(cb) {
    // Final processing of remaining buffer
    console.log("Final processing buffer:", this.buffer);
    this.buffer = [];
    cb(null);
  }
}

Events

drain Event

Emitted when the buffer has been drained after being full.

writable.on('drain', () => {
  // Buffer is no longer full, safe to write more
});

finish Event

Emitted when the stream has been ended and all writes have been processed.

writable.on('finish', () => {
  console.log('All writes completed');
});

close Event

Emitted when the stream has fully closed.

writable.on('close', () => {
  console.log('Stream closed');
});

error Event

Emitted when an error occurs.

writable.on('error', (err) => {
  console.error('Stream error:', err);
});

Backpressure Handling

Complete Usage Example:

const { Writable } = require("@mafintosh/streamx");

const writable = new Writable({
  highWaterMark: 1024, // 1KB buffer
  write(data, cb) {
    // Simulate slow write operation
    setTimeout(() => {
      console.log("Wrote:", data.toString());
      cb(null);
    }, 100);
  },
  final(cb) {
    console.log("Final processing of any remaining data");
    cb(null);
  }
});

function writeData(data) {
  const canContinue = writable.write(data);
  if (!canContinue) {
    console.log("Buffer full, waiting for drain...");
    writable.once('drain', () => {
      console.log("Buffer drained, can continue writing");
    });
  }
}

// Write some data
writeData("First chunk");
writeData("Second chunk");
writeData("Third chunk");

// End the stream
writable.end();

writable.on('finish', () => {
  console.log('All writes completed');
});

writable.on('close', () => {
  console.log('Stream closed');
});

writable.on('error', (err) => {
  console.error('Error:', err);
});

Integration with File System

const fs = require('fs');
const { Writable } = require("@mafintosh/streamx");

class FileWriter extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }

  _open(cb) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) return cb(err);
      this.fd = fd;
      cb(null);
    });
  }

  _write(data, cb) {
    if (!Buffer.isBuffer(data)) {
      data = Buffer.from(data);
    }
    fs.write(this.fd, data, 0, data.length, null, (err) => {
      if (err) return cb(err);
      cb(null);
    });
  }

  _destroy(cb) {
    if (this.fd) {
      fs.close(this.fd, cb);
    } else {
      cb(null);
    }
  }
}

// Usage
const writer = new FileWriter('output.txt');
writer.write('Hello ');
writer.write('World!');
writer.end();

Install with Tessl CLI

npx tessl i tessl/npm-mafintosh--streamx

docs

duplex.md

index.md

readable.md

stream.md

transform.md

writable.md

tile.json