An iteration of the Node.js core streams with a series of improvements
—
Writable stream implementation providing data consumption, buffering, and backpressure management with proper lifecycle handling.
Creates a new Writable stream with configuration options.
/**
* Creates a new Writable stream
* @param opts - Configuration options for the writable stream
*/
class Writable extends Stream {
constructor(opts?: WritableOptions);
}
interface WritableOptions extends StreamOptions {
/** Maximum buffer size in bytes (default: 16384) */
highWaterMark?: number;
/** Function to map input data */
map?: (data: any) => any;
/** Function to calculate byte size of data */
byteLength?: (data: any) => number;
/** Write function shorthand */
write?: (data: any, cb: (error?: Error) => void) => void;
/** Final function shorthand */
final?: (cb: (error?: Error) => void) => void;
}Usage Example:
const { Writable } = require("@mafintosh/streamx");
const writable = new Writable({
write(data, cb) {
console.log("Writing:", data.toString());
cb(null);
}
});Called when the stream wants to write data. Override to implement write logic.
/**
* Called when stream wants to write data
* @param data - Data to write
* @param callback - Callback to call when write operation is complete
*/
_write(data: any, callback: (error?: Error) => void): void;Usage Example:
const fs = require('fs');
class FileWritable extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_open(cb) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) return cb(err);
this.fd = fd;
cb(null);
});
}
_write(data, cb) {
fs.write(this.fd, data, (err) => {
if (err) return cb(err);
cb(null);
});
}
_destroy(cb) {
if (this.fd) {
fs.close(this.fd, cb);
} else {
cb(null);
}
}
}Writes data to the stream.
/**
* Write data to the stream
* @param data - Data to write
* @returns True if buffer is not full, false if backpressure should be applied
*/
write(data: any): boolean;Usage Example:
const writable = new Writable({
write(data, cb) {
console.log("Received:", data.toString());
cb(null);
}
});
const canContinue = writable.write("Hello World");
if (!canContinue) {
// Wait for 'drain' event before writing more
writable.once('drain', () => {
writable.write("More data");
});
}Ends the writable stream gracefully.
/**
* End the writable stream gracefully
* @param data - Optional final data to write before ending
*/
end(data?: any): void;Usage Example:
const writable = new Writable({
write(data, cb) {
console.log("Writing:", data.toString());
cb(null);
}
});
writable.write("First chunk");
writable.write("Second chunk");
writable.end("Final chunk"); // Write final data and end
// Or end without final data
// writable.end();
writable.on('finish', () => {
console.log('All writes completed');
});Called just before the 'finish' event when all writes have been processed.
/**
* Called before finish event for final cleanup
* @param callback - Callback to call when final is complete
*/
_final(callback: (error?: Error) => void): void;Usage Example:
class BufferedWritable extends Writable {
constructor() {
super();
this.buffer = [];
}
_write(data, cb) {
this.buffer.push(data);
cb(null);
}
_final(cb) {
// Final processing of remaining buffer
console.log("Final processing buffer:", this.buffer);
this.buffer = [];
cb(null);
}
}Emitted when the buffer has been drained after being full.
writable.on('drain', () => {
// Buffer is no longer full, safe to write more
});Emitted when the stream has been ended and all writes have been processed.
writable.on('finish', () => {
console.log('All writes completed');
});Emitted when the stream has fully closed.
writable.on('close', () => {
console.log('Stream closed');
});Emitted when an error occurs.
writable.on('error', (err) => {
console.error('Stream error:', err);
});Complete Usage Example:
const { Writable } = require("@mafintosh/streamx");
const writable = new Writable({
highWaterMark: 1024, // 1KB buffer
write(data, cb) {
// Simulate slow write operation
setTimeout(() => {
console.log("Wrote:", data.toString());
cb(null);
}, 100);
},
final(cb) {
console.log("Final processing of any remaining data");
cb(null);
}
});
function writeData(data) {
const canContinue = writable.write(data);
if (!canContinue) {
console.log("Buffer full, waiting for drain...");
writable.once('drain', () => {
console.log("Buffer drained, can continue writing");
});
}
}
// Write some data
writeData("First chunk");
writeData("Second chunk");
writeData("Third chunk");
// End the stream
writable.end();
writable.on('finish', () => {
console.log('All writes completed');
});
writable.on('close', () => {
console.log('Stream closed');
});
writable.on('error', (err) => {
console.error('Error:', err);
});const fs = require('fs');
const { Writable } = require("@mafintosh/streamx");
class FileWriter extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_open(cb) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) return cb(err);
this.fd = fd;
cb(null);
});
}
_write(data, cb) {
if (!Buffer.isBuffer(data)) {
data = Buffer.from(data);
}
fs.write(this.fd, data, 0, data.length, null, (err) => {
if (err) return cb(err);
cb(null);
});
}
_destroy(cb) {
if (this.fd) {
fs.close(this.fd, cb);
} else {
cb(null);
}
}
}
// Usage
const writer = new FileWriter('output.txt');
writer.write('Hello ');
writer.write('World!');
writer.end();Install with Tessl CLI
npx tessl i tessl/npm-mafintosh--streamx