or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdparser-stream.mdparsing-functions.md
tile.json

parser-stream.mddocs/

CSV Parser Stream

The CsvParserStream class is a Node.js Transform stream that handles CSV parsing with chainable transformation and validation capabilities. It extends the standard Transform stream interface while providing additional methods for data processing.

Capabilities

CsvParserStream Class

Core streaming CSV parser with transformation and validation support.

/**
 * CSV parsing transform stream with chainable operations
 */
class CsvParserStream<I extends Row, O extends Row> extends Transform {
  /**
   * Create a new CSV parser stream
   * @param parserOptions - Configuration options for parsing behavior
   */
  constructor(parserOptions: ParserOptions);
  
  /**
   * Set a transformation function for processing each row
   * @param transformFunction - Function to transform each parsed row
   * @returns This instance for method chaining
   */
  transform(transformFunction: RowTransformFunction<I, O>): CsvParserStream<I, O>;
  
  /**
   * Set a validation function for validating each row
   * @param validateFunction - Function to validate each processed row
   * @returns This instance for method chaining
   */
  validate(validateFunction: RowValidate<O>): CsvParserStream<I, O>;
  
  /**
   * Override emit to handle end event properly
   * @param event - Event name
   * @param rest - Event arguments
   * @returns True if event had listeners
   */
  emit(event: string | symbol, ...rest: any[]): boolean;
}

Row Transformation

Transform each parsed row using synchronous or asynchronous functions.

type SyncRowTransform<I extends Row, O extends Row> = (row: I) => O;
type AsyncRowTransform<I extends Row, O extends Row> = (row: I, cb: RowTransformCallback<O>) => void;
type RowTransformFunction<I extends Row, O extends Row> = SyncRowTransform<I, O> | AsyncRowTransform<I, O>;

type RowTransformCallback<R extends Row> = (error?: Error | null, row?: R) => void;

Synchronous Transform Examples:

import { parse } from "@fast-csv/parse";

// Basic data type conversion
parse({ headers: true })
  .transform(row => ({
    id: parseInt(row.id),
    name: row.name.trim(),
    email: row.email.toLowerCase(),
    active: row.active === "true"
  }))
  .on("data", row => console.log(row));

// Complex object restructuring
parse({ headers: true })
  .transform(row => ({
    user: {
      id: parseInt(row.user_id),
      profile: {
        name: row.name,
        email: row.email
      }
    },
    metadata: {
      created: new Date(row.created_at),
      active: row.status === "active"
    }
  }))
  .on("data", row => console.log(row));

Asynchronous Transform Examples:

import { parse } from "@fast-csv/parse";

// Async transformation with database lookup
parse({ headers: true })
  .transform((row, cb) => {
    // Simulate async operation
    setTimeout(() => {
      const transformedRow = {
        ...row,
        timestamp: Date.now()
      };
      cb(null, transformedRow);
    }, 10);
  })
  .on("data", row => console.log(row));

// Error handling in async transform
parse({ headers: true })
  .transform((row, cb) => {
    try {
      if (!row.email.includes("@")) {
        return cb(new Error(`Invalid email: ${row.email}`));
      }
      cb(null, { ...row, emailValid: true });
    } catch (error) {
      cb(error);
    }
  })
  .on("error", error => console.error(error));

Row Validation

Validate processed rows using synchronous or asynchronous validation functions.

type SyncRowValidate<R extends Row> = (row: R) => boolean;
type AsyncRowValidate<R extends Row> = (row: R, cb: RowValidateCallback) => void;
type RowValidate<R extends Row> = AsyncRowValidate<R> | SyncRowValidate<R>;

type RowValidateCallback = (error?: Error | null, isValid?: boolean, reason?: string) => void;

interface RowValidationResult<R extends Row> {
  row: R | null;
  isValid: boolean;
  reason?: string;
}

Synchronous Validation Examples:

import { parse } from "@fast-csv/parse";

// Simple validation
parse({ headers: true })
  .validate(row => row.age >= 18)
  .on("data", row => console.log(row)) // Only adults
  .on("error", error => console.error(error));

// Complex validation with multiple conditions
parse({ headers: true })
  .transform(row => ({
    id: parseInt(row.id),
    email: row.email,
    age: parseInt(row.age)
  }))
  .validate(row => {
    return row.id > 0 && 
           row.email.includes("@") && 
           row.age >= 18 && 
           row.age <= 120;
  })
  .on("data", row => console.log(row));

Asynchronous Validation Examples:

import { parse } from "@fast-csv/parse";

// Async validation with detailed error reporting
parse({ headers: true })
  .validate((row, cb) => {
    // Simulate async validation (e.g., database check)
    setTimeout(() => {
      if (!row.email.includes("@")) {
        return cb(null, false, "Invalid email format");
      }
      if (parseInt(row.age) < 18) {
        return cb(null, false, "Must be 18 or older");
      }
      cb(null, true);
    }, 5);
  })
  .on("data", row => console.log(row))
  .on("error", error => console.error(error));

Method Chaining

Transform and validate operations can be chained together:

import { parseFile } from "@fast-csv/parse";

parseFile("users.csv", { headers: true })
  .transform(row => ({
    id: parseInt(row.id),
    name: row.name.trim().toUpperCase(),
    email: row.email.toLowerCase(),
    age: parseInt(row.age),
    active: row.status === "active"
  }))
  .validate(row => row.age >= 18 && row.email.includes("@"))
  .on("data", validUser => {
    console.log("Valid adult user:", validUser);
  })
  .on("data-invalid", (invalidRow, rowNumber, reason) => {
    console.log(`Row ${rowNumber} failed validation:`, invalidRow);
    console.log(`Validation failed because: ${reason || "validation function returned false"}`);
  })
  .on("error", error => {
    console.error("Processing error:", error);
  })
  .on("end", rowCount => {
    console.log(`Processing complete. Total rows processed: ${rowCount}`);
  });

Error Handling and Validation Events

The CsvParserStream provides comprehensive error handling for both parsing errors and validation failures:

Validation Error Handling

When validation fails, the data-invalid event is emitted instead of the data event:

import { parseFile } from "@fast-csv/parse";

parseFile("users.csv", { headers: true })
  .validate((row, callback) => {
    // Async validation with detailed error reporting
    if (!row.email || !row.email.includes("@")) {
      return callback(null, false, "Invalid email address");
    }
    if (!row.age || parseInt(row.age) < 18) {
      return callback(null, false, "Age must be 18 or older");
    }
    callback(null, true);
  })
  .on("data", validRow => {
    console.log("Valid row:", validRow);
  })
  .on("data-invalid", (invalidRow, rowNumber, reason) => {
    console.error(`Row ${rowNumber} validation failed:`, reason);
    console.error("Row data:", invalidRow);
    // Log to error tracking system, skip row, etc.
  })
  .on("end", totalRows => {
    console.log(`Finished processing ${totalRows} total rows`);
  });

Comprehensive Error Handling Pattern

import { parseFile } from "@fast-csv/parse";

let validRows = 0;
let invalidRows = 0;
let errorRows = 0;

parseFile("data.csv", { headers: true })
  .transform(row => {
    // Transform and potentially throw errors
    try {
      return {
        id: parseInt(row.id),
        email: row.email.toLowerCase(),
        age: parseInt(row.age)
      };
    } catch (error) {
      throw new Error(`Transform failed: ${error.message}`);
    }
  })
  .validate(row => row.id > 0 && row.email.includes("@") && row.age >= 0)
  .on("data", row => {
    validRows++;
    console.log("Processing valid row:", row);
  })
  .on("data-invalid", (row, rowNumber, reason) => {
    invalidRows++;
    console.warn(`Invalid row ${rowNumber}: ${reason}`);
  })
  .on("error", error => {
    errorRows++;
    console.error("Processing error:", error.message);
  })
  .on("end", totalRows => {
    console.log("Processing Summary:");
    console.log(`- Total rows: ${totalRows}`);
    console.log(`- Valid rows: ${validRows}`);
    console.log(`- Invalid rows: ${invalidRows}`);
    console.log(`- Error rows: ${errorRows}`);
  });

Stream Events

CsvParserStream emits standard Node.js stream events plus additional CSV-specific events:

Standard Stream Events

  • data: Emitted for each parsed and processed row
  • end: Emitted when all data has been processed (includes total row count as parameter)
  • error: Emitted when parsing or processing errors occur
  • close: Emitted when the stream is closed

CSV-Specific Events

  • headers: Emitted when headers are parsed (only when headers option is used)
  • data-invalid: Emitted when row validation fails (includes the row, row number, and validation reason)
import { parseFile } from "@fast-csv/parse";

parseFile("data.csv", { headers: true })
  .on("headers", headers => {
    console.log("CSV headers:", headers);
  })
  .on("data", row => {
    console.log("Row data:", row);
  })
  .on("data-invalid", (row, rowNumber, reason) => {
    console.log(`Invalid row ${rowNumber}:`, row, `Reason: ${reason}`);
  })
  .on("end", rowCount => {
    console.log(`Parsing complete. Processed ${rowCount} rows`);
  })
  .on("error", error => {
    console.error("Error:", error);
  });