Node.js Streams, a user-land copy of the stream library from Node.js
—
Promise-based versions of utility functions that integrate seamlessly with modern async/await patterns. These functions provide the same functionality as their callback-based counterparts but return promises for cleaner async code.
Promise-based version of the pipeline utility function for composing streams.
/**
* Promise-based pipeline for composing multiple streams
* @param streams - Sequence of streams to pipe together
* @returns Promise that resolves when the pipeline completes successfully
*/
const promises = {
pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
};Usage Examples:
const { promises, Transform } = require('readable-stream');
const fs = require('fs');
// Async/await pipeline
async function processFile() {
try {
await promises.pipeline(
fs.createReadStream('input.txt'),
new Transform({
transform(chunk, encoding, callback) {
// Convert to uppercase
this.push(chunk.toString().toUpperCase());
callback();
}
}),
fs.createWriteStream('output.txt')
);
console.log('File processed successfully');
} catch (error) {
console.error('Pipeline failed:', error);
}
}
// Multiple transform pipeline
async function complexProcessing() {
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
const addLineNumbers = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach((line, index) => {
if (line.trim()) {
this.push(`${index + 1}: ${line}\n`);
}
});
callback();
}
});
try {
await promises.pipeline(
fs.createReadStream('source.txt'),
upperCase,
addLineNumbers,
fs.createWriteStream('numbered.txt')
);
console.log('Complex processing completed');
} catch (error) {
console.error('Processing failed:', error);
}
}Promise-based version of the finished utility function for monitoring stream completion.
/**
* Promise-based stream completion monitoring
* @param stream - Stream to monitor for completion
* @param options - Optional configuration for what events to wait for
* @returns Promise that resolves when the stream is finished
*/
const promises = {
finished: (
stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream,
options?: FinishedOptions
) => Promise<void>;
};Usage Examples:
const { promises, Readable, Writable } = require('readable-stream');
// Monitor readable stream completion
async function monitorReadable() {
const readable = new Readable({
read() {
this.push('chunk 1');
this.push('chunk 2');
this.push(null); // End stream
}
});
// Start reading the stream
readable.on('data', (chunk) => {
console.log('Read:', chunk.toString());
});
try {
await promises.finished(readable);
console.log('Readable stream finished successfully');
} catch (error) {
console.error('Readable stream error:', error);
}
}
// Monitor writable stream completion
async function monitorWritable() {
const writable = new Writable({
write(chunk, encoding, callback) {
console.log('Writing:', chunk.toString());
// Simulate async operation
setTimeout(callback, 100);
}
});
// Start writing to the stream
writable.write('data 1');
writable.write('data 2');
writable.end();
try {
await promises.finished(writable);
console.log('Writable stream finished successfully');
} catch (error) {
console.error('Writable stream error:', error);
}
}
// Monitor with specific options
async function monitorWithOptions() {
const readable = new Readable({
read() {
// Simulate data production
setTimeout(() => {
this.push('data');
}, 100);
}
});
try {
await promises.finished(readable, {
readable: true,
writable: false, // Don't wait for writable events
error: true // Wait for error events
});
console.log('Stream monitoring completed');
} catch (error) {
console.error('Stream monitoring failed:', error);
}
}The Promise API works seamlessly with stream operators:
const { promises, Readable } = require('readable-stream');
async function processDataWithOperators() {
// Create a readable stream
const dataSource = new Readable({
objectMode: true,
read() {
// Simulate data source
const data = [1, 2, 3, 4, 5];
data.forEach(item => this.push(item));
this.push(null);
}
});
// Transform with operators
const transformed = dataSource
.map(x => x * 2)
.filter(x => x > 5);
// Use promises to collect results
try {
const results = await transformed.toArray();
console.log('Results:', results); // [6, 8, 10]
} catch (error) {
console.error('Processing failed:', error);
}
}
// Pipeline with operator-transformed streams
async function pipelineWithOperators() {
const source = Readable.from([1, 2, 3, 4, 5]);
const doubled = source.map(x => x * 2);
const writable = new Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log('Received:', chunk);
callback();
}
});
try {
await promises.pipeline(doubled, writable);
console.log('Pipeline with operators completed');
} catch (error) {
console.error('Pipeline failed:', error);
}
}The Promise API provides clean error handling patterns:
const { promises, Readable, Transform } = require('readable-stream');
// Graceful error handling
async function robustProcessing() {
const source = new Readable({
read() {
this.push('valid data');
this.push('invalid data');
this.push(null);
}
});
const validator = new Transform({
transform(chunk, encoding, callback) {
const data = chunk.toString();
if (data === 'invalid data') {
callback(new Error('Data validation failed'));
return;
}
this.push(data.toUpperCase());
callback();
}
});
const output = new Writable({
write(chunk, encoding, callback) {
console.log('Processed:', chunk.toString());
callback();
}
});
try {
await promises.pipeline(source, validator, output);
console.log('Processing completed successfully');
} catch (error) {
console.error('Processing failed:', error.message);
// Handle cleanup or recovery here
}
}
// Multiple pipeline error handling
async function multipleOperations() {
const operations = [
() => promises.pipeline(/* pipeline 1 */),
() => promises.pipeline(/* pipeline 2 */),
() => promises.pipeline(/* pipeline 3 */)
];
const results = await Promise.allSettled(
operations.map(op => op())
);
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`Operation ${index + 1} succeeded`);
} else {
console.error(`Operation ${index + 1} failed:`, result.reason);
}
});
}The Promise API supports AbortSignal for cancellation:
const { promises, Readable, Transform } = require('readable-stream');
async function cancellableOperation() {
const controller = new AbortController();
const { signal } = controller;
// Cancel after 5 seconds
setTimeout(() => {
controller.abort();
}, 5000);
const slowSource = new Readable({
read() {
// Simulate slow data production
setTimeout(() => {
this.push('data');
}, 1000);
}
});
const slowTransform = new Transform({
transform(chunk, encoding, callback) {
// Simulate slow processing
setTimeout(() => {
this.push(chunk.toString().toUpperCase());
callback();
}, 2000);
}
});
try {
await promises.finished(slowSource, { signal });
console.log('Operation completed');
} catch (error) {
if (error.name === 'AbortError') {
console.log('Operation was cancelled');
} else {
console.error('Operation failed:', error);
}
}
}The Promise API integrates well with other Node.js Promise APIs:
const { promises, Readable } = require('readable-stream');
const { promises: fs } = require('fs');
async function fileProcessingWorkflow() {
try {
// Read file list
const files = await fs.readdir('./data');
// Create stream from file list
const fileStream = Readable.from(files);
// Process each file
const processedFiles = await fileStream
.filter(filename => filename.endsWith('.txt'))
.map(async (filename) => {
const content = await fs.readFile(`./data/${filename}`, 'utf8');
return { filename, content, size: content.length };
})
.toArray();
console.log('Processed files:', processedFiles);
} catch (error) {
console.error('Workflow failed:', error);
}
}interface FinishedOptions {
error?: boolean; // Wait for error event (default: true)
readable?: boolean; // Wait for readable to end (default: true)
writable?: boolean; // Wait for writable to finish (default: true)
signal?: AbortSignal; // AbortSignal for cancellation
}
// Promise API namespace
interface StreamPromises {
pipeline: (...streams: Array<NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream>) => Promise<void>;
finished: (stream: NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream, options?: FinishedOptions) => Promise<void>;
}Install with Tessl CLI
npx tessl i tessl/npm-readable-stream