CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-stream-transform

Object transformations implementing the Node.js stream.Transform API

Pending
Overview
Eval results
Files

callback-api.mddocs/

Callback API

Convenient callback-based API for processing data arrays with automatic result collection. The callback API provides a simple interface for transforming datasets where you want all results collected and returned via a callback function.

Capabilities

Transform with Records and Callback

Process an array of records with automatic result collection via callback.

/**
 * Transform an array of records with callback
 * @param records - Array of data to transform
 * @param handler - Function to transform each record
 * @param callback - Completion callback receiving results
 * @returns Transformer stream instance
 */
function transform<T, U>(
  records: Array<T>, 
  handler: Handler<T, U>, 
  callback?: Callback
): Transformer;

/**
 * Transform with options and callback
 * @param records - Array of data to transform
 * @param options - Configuration options
 * @param handler - Function to transform each record
 * @param callback - Completion callback receiving results
 * @returns Transformer stream instance
 */
function transform<T, U>(
  records: Array<T>, 
  options: Options, 
  handler: Handler<T, U>, 
  callback?: Callback
): Transformer;

/**
 * Completion callback for result collection
 * @param err - Error if transformation failed
 * @param output - Array of transformed results
 */
type Callback = (err?: null | Error, output?: any[]) => void;

Usage Examples:

import { transform } from "stream-transform";

// Basic callback transformation
transform([
  ["a", "b", "c"],
  ["1", "2", "3"],
  ["x", "y", "z"]
], (record) => {
  return record.join("|");
}, (err, results) => {
  if (err) {
    console.error("Transformation failed:", err);
    return;
  }
  console.log(results); // ["a|b|c", "1|2|3", "x|y|z"]
});

// Object transformation with callback
const users = [
  { name: " Alice ", email: "ALICE@EXAMPLE.COM", age: "25" },
  { name: " Bob ", email: "BOB@EXAMPLE.COM", age: "30" }
];

transform(users, (user) => {
  return {
    name: user.name.trim(),
    email: user.email.toLowerCase(),
    age: parseInt(user.age)
  };
}, (err, cleanUsers) => {
  if (err) throw err;
  console.log("Cleaned users:", cleanUsers);
});

Transform with Options and Callback

Enhanced callback API with configuration options for parallel processing and custom parameters.

/**
 * Transform with options and callback
 * @param records - Array of data to transform
 * @param options - Configuration for transformation
 * @param handler - Function to transform each record
 * @param callback - Completion callback receiving results
 * @returns Transformer stream instance
 */
function transform<T, U>(
  records: Array<T>, 
  options: Options, 
  handler: Handler<T, U>, 
  callback?: Callback
): Transformer;

Usage Examples:

import { transform } from "stream-transform";

// Parallel processing with callback
const largeDataset = Array.from({ length: 10000 }, (_, i) => ({ id: i, value: Math.random() }));

transform(largeDataset, {
  parallel: 50 // Process 50 records concurrently
}, async (record) => {
  // Simulate async processing
  await new Promise(resolve => setTimeout(resolve, 10));
  return {
    ...record,
    processed: true,
    timestamp: Date.now()
  };
}, (err, results) => {
  if (err) throw err;
  console.log(`Processed ${results.length} records in parallel`);
});

// Custom parameters with callback
const config = {
  apiEndpoint: "https://api.example.com",
  timeout: 5000
};

transform([1, 2, 3, 4, 5], {
  parallel: 3,
  params: config
}, async (id, params) => {
  const response = await fetch(`${params.apiEndpoint}/items/${id}`, {
    timeout: params.timeout
  });
  return await response.json();
}, (err, apiResults) => {
  if (err) {
    console.error("API transformation failed:", err);
    return;
  }
  console.log("API results:", apiResults);
});

Auto-consumption Mode

Callback API automatically enables consumption mode, collecting all results without requiring explicit stream handling.

// Auto-consumption is automatically enabled when callback is provided
// No need to manually read from the stream - results are collected automatically

Usage Examples:

import { transform } from "stream-transform";

// Automatic result collection - no stream handling needed
transform(["apple", "banana", "cherry"], (fruit) => {
  return fruit.toUpperCase();
}, (err, results) => {
  // Results are automatically collected
  console.log(results); // ["APPLE", "BANANA", "CHERRY"]
});

// Even with complex async operations
const urls = [
  "https://api.example.com/users/1",
  "https://api.example.com/users/2", 
  "https://api.example.com/users/3"
];

transform(urls, {
  parallel: 2
}, async (url) => {
  const response = await fetch(url);
  return await response.json();
}, (err, users) => {
  if (err) {
    console.error("Failed to fetch users:", err);
    return;
  }
  // All users are automatically collected
  console.log(`Fetched ${users.length} users`);
});

Handler Function Patterns

Callback API supports all handler execution patterns with automatic result collection.

/**
 * Synchronous handler for callback API
 */
type SyncHandler<T, U> = (record: T, params?: any) => U;

/**
 * Asynchronous handler for callback API
 */
type AsyncHandler<T, U> = (record: T, callback: HandlerCallback<U>, params?: any) => void;

/**
 * Promise-based handler for callback API
 */
type PromiseHandler<T, U> = (record: T, params?: any) => Promise<U>;

Usage Examples:

import { transform } from "stream-transform";

// Synchronous handler with callback
transform([1, 2, 3, 4, 5], (num) => {
  return num * num;
}, (err, squares) => {
  console.log(squares); // [1, 4, 9, 16, 25]
});

// Asynchronous handler with callback
transform(["file1.txt", "file2.txt"], (filename, callback) => {
  fs.readFile(filename, 'utf8', (err, content) => {
    if (err) return callback(err);
    callback(null, { filename, content, size: content.length });
  });
}, (err, fileData) => {
  if (err) throw err;
  console.log("Files read:", fileData);
});

// Promise-based handler with callback
transform(["user1", "user2", "user3"], async (username) => {
  const user = await database.findUser(username);
  return {
    username,
    profile: user,
    lastSeen: user.lastLoginAt
  };
}, (err, userProfiles) => {
  if (err) throw err;
  console.log("User profiles loaded:", userProfiles);
});

Error Handling

Comprehensive error handling with callback-based error reporting.

/**
 * Error handling in callback API
 * @param err - Error object if transformation failed, null if successful
 * @param results - Transformed results if successful, undefined if error
 */
type Callback = (err?: null | Error, results?: any[]) => void;

Usage Examples:

import { transform } from "stream-transform";

// Basic error handling
transform([1, 2, "invalid", 4], (value) => {
  if (typeof value !== "number") {
    throw new Error(`Invalid input: expected number, got ${typeof value}`);
  }
  return value * 2;
}, (err, results) => {
  if (err) {
    console.error("Transformation failed:", err.message);
    // results will be undefined
    return;
  }
  console.log("Results:", results);
});

// Async error handling
transform(["valid", "also-valid", ""], (input, callback) => {
  if (!input || input.trim() === "") {
    return callback(new Error("Empty input not allowed"));
  }
  
  // Simulate async processing
  setTimeout(() => {
    callback(null, input.toUpperCase());
  }, 10);
}, (err, results) => {
  if (err) {
    console.error("Processing error:", err.message);
    return;
  }
  console.log("Processed:", results);
});

// Promise rejection handling
transform([1, 2, 3], async (num) => {
  if (num === 2) {
    throw new Error("Number 2 is not allowed");
  }
  return num * 10;
}, (err, results) => {
  if (err) {
    console.error("Promise rejected:", err.message);
    return;
  }
  console.log("Results:", results);
});

Performance Considerations

The callback API is optimized for convenience but has performance characteristics to consider.

// Memory usage: All results are held in memory until completion
// Best for: Small to medium datasets (< 100,000 records)
// Consider stream API for: Large datasets or memory-constrained environments

Usage Examples:

import { transform } from "stream-transform";

// Good: Small dataset with callback
const smallDataset = Array.from({ length: 1000 }, (_, i) => i);
transform(smallDataset, (num) => num * 2, (err, results) => {
  // Memory usage is acceptable
  console.log(`Processed ${results.length} items`);
});

// Consider alternatives: Large dataset
const largeDataset = Array.from({ length: 1000000 }, (_, i) => i);

// Option 1: Use stream API for large datasets
const streamTransform = transform((num) => num * 2);
// Process with streams to avoid memory buildup

// Option 2: If callback needed, consider chunking
const chunkSize = 10000;
async function processInChunks(data, handler) {
  const results = [];
  for (let i = 0; i < data.length; i += chunkSize) {
    const chunk = data.slice(i, i + chunkSize);
    const chunkResults = await new Promise((resolve, reject) => {
      transform(chunk, handler, (err, res) => {
        if (err) reject(err);
        else resolve(res);
      });
    });
    results.push(...chunkResults);
  }
  return results;
}

Install with Tessl CLI

npx tessl i tessl/npm-stream-transform

docs

callback-api.md

index.md

stream-api.md

sync-api.md

tile.json