Object transformations implementing the Node.js stream.Transform API
—
Convenient callback-based API for processing data arrays with automatic result collection. The callback API provides a simple interface for transforming datasets where you want all results collected and returned via a callback function.
Process an array of records with automatic result collection via callback.
/**
* Transform an array of records with callback
* @param records - Array of data to transform
* @param handler - Function to transform each record
* @param callback - Completion callback receiving results
* @returns Transformer stream instance
*/
function transform<T, U>(
records: Array<T>,
handler: Handler<T, U>,
callback?: Callback
): Transformer;
/**
* Transform with options and callback
* @param records - Array of data to transform
* @param options - Configuration options
* @param handler - Function to transform each record
* @param callback - Completion callback receiving results
* @returns Transformer stream instance
*/
function transform<T, U>(
records: Array<T>,
options: Options,
handler: Handler<T, U>,
callback?: Callback
): Transformer;
/**
* Completion callback for result collection
* @param err - Error if transformation failed
* @param output - Array of transformed results
*/
type Callback = (err?: null | Error, output?: any[]) => void;Usage Examples:
import { transform } from "stream-transform";
// Basic callback transformation
transform([
["a", "b", "c"],
["1", "2", "3"],
["x", "y", "z"]
], (record) => {
return record.join("|");
}, (err, results) => {
if (err) {
console.error("Transformation failed:", err);
return;
}
console.log(results); // ["a|b|c", "1|2|3", "x|y|z"]
});
// Object transformation with callback
const users = [
{ name: " Alice ", email: "ALICE@EXAMPLE.COM", age: "25" },
{ name: " Bob ", email: "BOB@EXAMPLE.COM", age: "30" }
];
transform(users, (user) => {
return {
name: user.name.trim(),
email: user.email.toLowerCase(),
age: parseInt(user.age)
};
}, (err, cleanUsers) => {
if (err) throw err;
console.log("Cleaned users:", cleanUsers);
});Enhanced callback API with configuration options for parallel processing and custom parameters.
/**
* Transform with options and callback
* @param records - Array of data to transform
* @param options - Configuration for transformation
* @param handler - Function to transform each record
* @param callback - Completion callback receiving results
* @returns Transformer stream instance
*/
function transform<T, U>(
records: Array<T>,
options: Options,
handler: Handler<T, U>,
callback?: Callback
): Transformer;Usage Examples:
import { transform } from "stream-transform";
// Parallel processing with callback
const largeDataset = Array.from({ length: 10000 }, (_, i) => ({ id: i, value: Math.random() }));
transform(largeDataset, {
parallel: 50 // Process 50 records concurrently
}, async (record) => {
// Simulate async processing
await new Promise(resolve => setTimeout(resolve, 10));
return {
...record,
processed: true,
timestamp: Date.now()
};
}, (err, results) => {
if (err) throw err;
console.log(`Processed ${results.length} records in parallel`);
});
// Custom parameters with callback
const config = {
apiEndpoint: "https://api.example.com",
timeout: 5000
};
transform([1, 2, 3, 4, 5], {
parallel: 3,
params: config
}, async (id, params) => {
const response = await fetch(`${params.apiEndpoint}/items/${id}`, {
timeout: params.timeout
});
return await response.json();
}, (err, apiResults) => {
if (err) {
console.error("API transformation failed:", err);
return;
}
console.log("API results:", apiResults);
});Callback API automatically enables consumption mode, collecting all results without requiring explicit stream handling.
// Auto-consumption is automatically enabled when callback is provided
// No need to manually read from the stream - results are collected automaticallyUsage Examples:
import { transform } from "stream-transform";
// Automatic result collection - no stream handling needed
transform(["apple", "banana", "cherry"], (fruit) => {
return fruit.toUpperCase();
}, (err, results) => {
// Results are automatically collected
console.log(results); // ["APPLE", "BANANA", "CHERRY"]
});
// Even with complex async operations
const urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"
];
transform(urls, {
parallel: 2
}, async (url) => {
const response = await fetch(url);
return await response.json();
}, (err, users) => {
if (err) {
console.error("Failed to fetch users:", err);
return;
}
// All users are automatically collected
console.log(`Fetched ${users.length} users`);
});Callback API supports all handler execution patterns with automatic result collection.
/**
* Synchronous handler for callback API
*/
type SyncHandler<T, U> = (record: T, params?: any) => U;
/**
* Asynchronous handler for callback API
*/
type AsyncHandler<T, U> = (record: T, callback: HandlerCallback<U>, params?: any) => void;
/**
* Promise-based handler for callback API
*/
type PromiseHandler<T, U> = (record: T, params?: any) => Promise<U>;Usage Examples:
import { transform } from "stream-transform";
// Synchronous handler with callback
transform([1, 2, 3, 4, 5], (num) => {
return num * num;
}, (err, squares) => {
console.log(squares); // [1, 4, 9, 16, 25]
});
// Asynchronous handler with callback
transform(["file1.txt", "file2.txt"], (filename, callback) => {
fs.readFile(filename, 'utf8', (err, content) => {
if (err) return callback(err);
callback(null, { filename, content, size: content.length });
});
}, (err, fileData) => {
if (err) throw err;
console.log("Files read:", fileData);
});
// Promise-based handler with callback
transform(["user1", "user2", "user3"], async (username) => {
const user = await database.findUser(username);
return {
username,
profile: user,
lastSeen: user.lastLoginAt
};
}, (err, userProfiles) => {
if (err) throw err;
console.log("User profiles loaded:", userProfiles);
});Comprehensive error handling with callback-based error reporting.
/**
* Error handling in callback API
* @param err - Error object if transformation failed, null if successful
* @param results - Transformed results if successful, undefined if error
*/
type Callback = (err?: null | Error, results?: any[]) => void;Usage Examples:
import { transform } from "stream-transform";
// Basic error handling
transform([1, 2, "invalid", 4], (value) => {
if (typeof value !== "number") {
throw new Error(`Invalid input: expected number, got ${typeof value}`);
}
return value * 2;
}, (err, results) => {
if (err) {
console.error("Transformation failed:", err.message);
// results will be undefined
return;
}
console.log("Results:", results);
});
// Async error handling
transform(["valid", "also-valid", ""], (input, callback) => {
if (!input || input.trim() === "") {
return callback(new Error("Empty input not allowed"));
}
// Simulate async processing
setTimeout(() => {
callback(null, input.toUpperCase());
}, 10);
}, (err, results) => {
if (err) {
console.error("Processing error:", err.message);
return;
}
console.log("Processed:", results);
});
// Promise rejection handling
transform([1, 2, 3], async (num) => {
if (num === 2) {
throw new Error("Number 2 is not allowed");
}
return num * 10;
}, (err, results) => {
if (err) {
console.error("Promise rejected:", err.message);
return;
}
console.log("Results:", results);
});The callback API is optimized for convenience but has performance characteristics to consider.
// Memory usage: All results are held in memory until completion
// Best for: Small to medium datasets (< 100,000 records)
// Consider stream API for: Large datasets or memory-constrained environmentsUsage Examples:
import { transform } from "stream-transform";
// Good: Small dataset with callback
const smallDataset = Array.from({ length: 1000 }, (_, i) => i);
transform(smallDataset, (num) => num * 2, (err, results) => {
// Memory usage is acceptable
console.log(`Processed ${results.length} items`);
});
// Consider alternatives: Large dataset
const largeDataset = Array.from({ length: 1000000 }, (_, i) => i);
// Option 1: Use stream API for large datasets
const streamTransform = transform((num) => num * 2);
// Process with streams to avoid memory buildup
// Option 2: If callback needed, consider chunking
const chunkSize = 10000;
async function processInChunks(data, handler) {
const results = [];
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
const chunkResults = await new Promise((resolve, reject) => {
transform(chunk, handler, (err, res) => {
if (err) reject(err);
else resolve(res);
});
});
results.push(...chunkResults);
}
return results;
}Install with Tessl CLI
npx tessl i tessl/npm-stream-transform