An iteration of the Node.js core streams with a series of improvements
—
Writable streams in StreamX provide enhanced drain handling, batch writing support, and proper finish/close lifecycle management. They include integrated backpressure handling and support both individual writes and batch operations.
Creates a writable stream with enhanced lifecycle support and proper resource management.
/**
* Creates a new writable stream with enhanced lifecycle support
* @param options - Configuration options for the writable stream
*/
class Writable extends Stream {
constructor(options?: WritableOptions);
/** Override this method to implement custom write logic */
_write(data: any, cb: (err?: Error) => void): void;
/** Override this method to implement batch write logic */
_writev(batch: any[], cb: (err?: Error) => void): void;
/** Lifecycle hook called before the first write operation */
_open(cb: (err?: Error) => void): void;
/** Cleanup hook called when the stream is destroyed */
_destroy(cb: (err?: Error) => void): void;
/** Hook called immediately when destroy() is first invoked */
_predestroy(): void;
/** Hook called just before 'finish' is emitted */
_final(cb: (err?: Error) => void): void;
/** Write data to the stream */
write(data: any): boolean;
/** End the writable stream gracefully */
end(): Writable;
/** Forcefully destroy the stream */
destroy(err?: Error): void;
}
interface WritableOptions {
/** Maximum buffer size in bytes (default: 16384) */
highWaterMark?: number;
/** Optional function to map input data */
map?: (data: any) => any;
/** Optional function to calculate byte size of data */
byteLength?: (data: any) => number;
/** AbortSignal that triggers destroy when aborted */
signal?: AbortSignal;
/** Shorthand for _write method */
write?: (data: any, cb: (err?: Error) => void) => void;
/** Shorthand for _writev method */
writev?: (batch: any[], cb: (err?: Error) => void) => void;
/** Shorthand for _final method */
final?: (cb: (err?: Error) => void) => void;
/** Shorthand for _open method */
open?: (cb: (err?: Error) => void) => void;
/** Shorthand for _destroy method */
destroy?: (cb: (err?: Error) => void) => void;
/** Shorthand for _predestroy method */
predestroy?: () => void;
}Usage Examples:
const { Writable } = require('streamx');
// Basic writable stream
const writable = new Writable({
write(data, cb) {
console.log('Received:', data.toString());
cb(); // Signal completion
}
});
// Write some data
writable.write('Hello, ');
writable.write('World!');
writable.end(); // End the stream
writable.on('finish', () => {
console.log('All writes completed');
});
// Writable with lifecycle hooks
const fileWriter = new Writable({
open(cb) {
console.log('Opening file for writing...');
// Open file or resource
cb();
},
write(data, cb) {
console.log('Writing:', data.toString());
// Write to file
cb();
},
final(cb) {
console.log('Finalizing writes...');
// Flush buffers, etc.
cb();
},
destroy(cb) {
console.log('Closing file...');
// Clean up resources
cb();
}
});StreamX supports efficient batch writing through the _writev method.
/**
* Override this method to implement batch write operations
* @param batch - Array of data items to write
* @param cb - Callback to signal completion
*/
_writev(batch: any[], cb: (err?: Error) => void): void;Batch Writing Example:
const batchWriter = new Writable({
writev(batch, cb) {
console.log(`Writing batch of ${batch.length} items:`);
batch.forEach((item, index) => {
console.log(` ${index}: ${item.toString()}`);
});
cb();
}
});
// Multiple writes will be batched automatically
batchWriter.write('item 1');
batchWriter.write('item 2');
batchWriter.write('item 3');
batchWriter.end();StreamX provides static utility methods for writable stream inspection and management.
/**
* Check if a writable stream is under backpressure
* @param stream - The writable stream to check
* @returns True if the stream is backpressured
*/
static isBackpressured(stream: Writable): boolean;
/**
* Wait for a stream to drain the currently queued writes
* @param stream - The writable stream to wait for
* @returns Promise that resolves when drained or false if destroyed
*/
static drained(stream: Writable): Promise<boolean>;Static Method Examples:
const { Writable } = require('streamx');
const writable = new Writable({
write(data, cb) {
// Simulate slow writing
setTimeout(() => {
console.log('Written:', data.toString());
cb();
}, 100);
}
});
// Check backpressure
if (Writable.isBackpressured(writable)) {
console.log('Stream is backpressured, waiting...');
// Wait for drain
Writable.drained(writable).then((success) => {
if (success) {
console.log('Stream drained successfully');
} else {
console.log('Stream was destroyed');
}
});
}
// Write data
for (let i = 0; i < 10; i++) {
const canContinue = writable.write(`Message ${i}`);
if (!canContinue) {
console.log('Backpressure detected');
break;
}
}Writable streams emit various events during their lifecycle.
interface WritableEvents {
/** Emitted when the stream buffer is drained and ready for more writes */
'drain': () => void;
/** Emitted when all writes have been flushed after end() is called */
'finish': () => void;
/** Emitted when the stream has been fully closed */
'close': () => void;
/** Emitted when an error occurs */
'error': (err: Error) => void;
/** Emitted when a readable stream is piped to this writable */
'pipe': (src: Readable) => void;
}interface WritableProperties {
/** Boolean indicating whether the stream has been destroyed */
destroyed: boolean;
}StreamX writable streams support advanced configuration for specialized use cases.
Backpressure Handling:
const writable = new Writable({
highWaterMark: 1024, // Small buffer for demonstration
write(data, cb) {
console.log('Writing:', data.toString());
// Simulate async write
setTimeout(cb, 10);
}
});
function writeWithBackpressure(data) {
const canContinue = writable.write(data);
if (!canContinue) {
console.log('Backpressure detected, waiting for drain...');
writable.once('drain', () => {
console.log('Stream drained, can continue writing');
});
}
return canContinue;
}
// Write data with backpressure handling
for (let i = 0; i < 100; i++) {
writeWithBackpressure(`Data chunk ${i}`);
}Data Transformation:
const transformWriter = new Writable({
map: (data) => {
// Transform data before writing
if (typeof data === 'string') {
return Buffer.from(data.toUpperCase());
}
return data;
},
write(data, cb) {
console.log('Transformed data:', data.toString());
cb();
}
});
transformWriter.write('hello world'); // Will be transformed to uppercaseAbortSignal Integration:
const controller = new AbortController();
const writable = new Writable({
signal: controller.signal,
write(data, cb) {
// Check if aborted before processing
if (controller.signal.aborted) {
return cb(new Error('Aborted'));
}
console.log('Writing:', data.toString());
cb();
}
});
// Write some data
writable.write('test data');
// Abort after 1 second
setTimeout(() => {
controller.abort();
console.log('Write operation aborted');
}, 1000);StreamX provides comprehensive error handling with automatic cleanup.
const errorProneWriter = new Writable({
write(data, cb) {
if (data.toString().includes('error')) {
// Pass error to callback
return cb(new Error('Write failed'));
}
console.log('Successfully wrote:', data.toString());
cb();
}
});
errorProneWriter.on('error', (err) => {
console.error('Stream error:', err.message);
});
errorProneWriter.on('close', () => {
console.log('Stream closed (cleanup completed)');
});
// This will trigger an error
errorProneWriter.write('This contains error');Install with Tessl CLI
npx tessl i tessl/npm-streamx