or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

errors.mdindex.mdoptions.mdstreaming.mdsync.md
tile.json

streaming.mddocs/

Streaming API

The streaming API provides the core CSV parsing functionality through Node.js Transform streams, ideal for processing large files and real-time data streams with memory efficiency.

Capabilities

Parse Function

Creates a Parser instance that implements the Node.js Transform stream interface. Supports multiple calling patterns for flexible usage.

/**
 * Creates a Parser transform stream for CSV processing
 * @param input - Optional CSV input data
 * @param options - Parsing configuration options
 * @param callback - Optional callback for collecting all records
 * @returns Parser transform stream instance
 */
function parse(): Parser;
function parse(options: Options): Parser;
function parse(callback: Callback): Parser;
function parse(options: Options, callback: Callback): Parser;
function parse(input: string | Buffer | Uint8Array): Parser;
function parse(input: string | Buffer | Uint8Array, callback: Callback): Parser;
function parse(input: string | Buffer | Uint8Array, options: Options): Parser;
function parse(input: string | Buffer | Uint8Array, options: Options, callback: Callback): Parser;

Usage Examples:

import { parse } from "csv-parse";

// Transform stream without callback
const parser = parse({ columns: true });
parser.on("readable", function () {
  let record;
  while ((record = parser.read()) !== null) {
    console.log(record);
  }
});

// With input data and callback
parse("name,age\nAlice,25", { columns: true }, (err, records) => {
  if (err) throw err;
  console.log(records); // [{ name: 'Alice', age: '25' }]
});

// Stream processing of large files
import fs from "fs";

fs.createReadStream("large-file.csv")
  .pipe(parse({ columns: true }))
  .on("data", (record) => {
    // Process each record as it's parsed
    console.log(record);
  })
  .on("error", (err) => {
    console.error("Parsing error:", err);
  })
  .on("end", () => {
    console.log("Parsing complete");
  });

Parser Class

Transform stream class that extends Node.js stream.Transform for CSV parsing operations.

/**
 * CSV parser transform stream extending Node.js Transform
 */
class Parser extends Transform {
  /** 
   * Creates a new Parser instance
   * @param options - Configuration options for parsing behavior
   */
  constructor(options: Options);
  
  /** Normalized parsing options */
  readonly options: OptionsNormalized;
  
  /** Current parsing state and statistics */
  readonly info: Info;
  
  /**
   * Internal method for pushing parsed records
   * @param line - Parsed record data
   * @returns CsvError if parsing error occurred, undefined otherwise
   */
  __push(line: any): CsvError | undefined;
}

Usage Examples:

import { Parser } from "csv-parse";

// Manual parser instantiation
const parser = new Parser({
  columns: true,
  skip_empty_lines: true,
  trim: true
});

// Access parser state
parser.on("readable", function () {
  console.log("Current info:", this.info);
  console.log("Records processed:", this.info.records);
  console.log("Lines processed:", this.info.lines);
  
  let record;
  while ((record = this.read()) !== null) {
    console.log(record);
  }
});

// Write data to parser
parser.write("name,age,city\n");
parser.write("Alice,25,New York\n");
parser.write("Bob,30,Boston\n");
parser.end();

Stream Events

The Parser emits standard Node.js stream events plus additional CSV-specific events.

// Standard Transform stream events
parser.on("readable", () => void);
parser.on("data", (record: any) => void);
parser.on("end", () => void);
parser.on("error", (err: CsvError) => void);
parser.on("close", () => void);

// CSV-specific events
parser.on("skip", (err: CsvError, chunk: string) => void);

Usage Examples:

import { parse } from "csv-parse";

const parser = parse({ 
  columns: true,
  skip_records_with_error: true
});

// Handle successfully parsed records
parser.on("data", (record) => {
  console.log("Parsed record:", record);
});

// Handle skipped records (when skip_records_with_error is true)
parser.on("skip", (err, rawRecord) => {
  console.warn("Skipped record due to error:", rawRecord);
  console.warn("Error:", err.message);
});

// Handle fatal errors
parser.on("error", (err) => {
  console.error("Fatal parsing error:", err.code, err.message);
});

// Handle completion
parser.on("end", () => {
  console.log("Parsing completed successfully");
  console.log("Final statistics:", parser.info);
});

Web Streams Compatibility

For environments supporting Web Streams API, use the dedicated csv-parse/stream entry point which provides a different parse function that returns a TransformStream instead of a Node.js Transform stream.

// Import from csv-parse/stream
import { 
  parse,
  CsvError,
  CastingContext,
  CastingFunction, 
  CastingDateFunction,
  ColumnOption,
  Options,
  OptionsNormalized,
  Info,
  CsvErrorCode
} from "csv-parse/stream";

/**
 * Creates a Web Streams compatible TransformStream for CSV parsing
 * Note: This is a different function from the main parse() function
 * @param options - Parsing configuration options
 * @returns TransformStream instance for Web Streams API
 */
function parse(options?: Options): TransformStream;

Usage Examples:

import { parse } from "csv-parse/stream";

// Web Streams API usage
const csvData = new ReadableStream({
  start(controller) {
    controller.enqueue("name,age\n");
    controller.enqueue("Alice,25\n");
    controller.enqueue("Bob,30\n");
    controller.close();
  }
});

const parser = parse({ columns: true });

csvData
  .pipeThrough(parser)
  .pipeTo(new WritableStream({
    write(record) {
      console.log("Parsed record:", record);
    }
  }));

Advanced Stream Usage

Complex streaming scenarios with backpressure handling and error recovery.

Usage Examples:

import { parse } from "csv-parse";
import { pipeline } from "stream/promises";
import fs from "fs";

// Pipeline with error handling
try {
  await pipeline(
    fs.createReadStream("input.csv"),
    parse({ 
      columns: true,
      skip_records_with_error: true,
      relax_column_count: true
    }),
    async function* (source) {
      for await (const record of source) {
        // Transform each record
        yield {
          ...record,
          processed_at: new Date().toISOString()
        };
      }
    },
    fs.createWriteStream("output.json")
  );
  console.log("Pipeline completed successfully");
} catch (error) {
  console.error("Pipeline failed:", error);
}

// Manual backpressure handling
const parser = parse({ columns: true });
let backpressure = false;

parser.on("data", (record) => {
  if (!backpressure) {
    const canContinue = processRecord(record);
    if (!canContinue) {
      backpressure = true;
      parser.pause();
      
      // Resume when ready
      setTimeout(() => {
        backpressure = false;
        parser.resume();
      }, 100);
    }
  }
});

function processRecord(record) {
  // Process record and return false if backpressure needed
  console.log("Processing:", record);
  return true;
}