The streaming API provides the core CSV parsing functionality through Node.js Transform streams, ideal for processing large files and real-time data streams with memory efficiency.
Creates a Parser instance that implements the Node.js Transform stream interface. Supports multiple calling patterns for flexible usage.
/**
* Creates a Parser transform stream for CSV processing
* @param input - Optional CSV input data
* @param options - Parsing configuration options
* @param callback - Optional callback for collecting all records
* @returns Parser transform stream instance
*/
function parse(): Parser;
function parse(options: Options): Parser;
function parse(callback: Callback): Parser;
function parse(options: Options, callback: Callback): Parser;
function parse(input: string | Buffer | Uint8Array): Parser;
function parse(input: string | Buffer | Uint8Array, callback: Callback): Parser;
function parse(input: string | Buffer | Uint8Array, options: Options): Parser;
function parse(input: string | Buffer | Uint8Array, options: Options, callback: Callback): Parser;Usage Examples:
import { parse } from "csv-parse";
// Transform stream without callback
const parser = parse({ columns: true });
parser.on("readable", function () {
let record;
while ((record = parser.read()) !== null) {
console.log(record);
}
});
// With input data and callback
parse("name,age\nAlice,25", { columns: true }, (err, records) => {
if (err) throw err;
console.log(records); // [{ name: 'Alice', age: '25' }]
});
// Stream processing of large files
import fs from "fs";
fs.createReadStream("large-file.csv")
.pipe(parse({ columns: true }))
.on("data", (record) => {
// Process each record as it's parsed
console.log(record);
})
.on("error", (err) => {
console.error("Parsing error:", err);
})
.on("end", () => {
console.log("Parsing complete");
});Transform stream class that extends Node.js stream.Transform for CSV parsing operations.
/**
* CSV parser transform stream extending Node.js Transform
*/
class Parser extends Transform {
/**
* Creates a new Parser instance
* @param options - Configuration options for parsing behavior
*/
constructor(options: Options);
/** Normalized parsing options */
readonly options: OptionsNormalized;
/** Current parsing state and statistics */
readonly info: Info;
/**
* Internal method for pushing parsed records
* @param line - Parsed record data
* @returns CsvError if parsing error occurred, undefined otherwise
*/
__push(line: any): CsvError | undefined;
}Usage Examples:
import { Parser } from "csv-parse";
// Manual parser instantiation
const parser = new Parser({
columns: true,
skip_empty_lines: true,
trim: true
});
// Access parser state
parser.on("readable", function () {
console.log("Current info:", this.info);
console.log("Records processed:", this.info.records);
console.log("Lines processed:", this.info.lines);
let record;
while ((record = this.read()) !== null) {
console.log(record);
}
});
// Write data to parser
parser.write("name,age,city\n");
parser.write("Alice,25,New York\n");
parser.write("Bob,30,Boston\n");
parser.end();The Parser emits standard Node.js stream events plus additional CSV-specific events.
// Standard Transform stream events
parser.on("readable", () => void);
parser.on("data", (record: any) => void);
parser.on("end", () => void);
parser.on("error", (err: CsvError) => void);
parser.on("close", () => void);
// CSV-specific events
parser.on("skip", (err: CsvError, chunk: string) => void);Usage Examples:
import { parse } from "csv-parse";
const parser = parse({
columns: true,
skip_records_with_error: true
});
// Handle successfully parsed records
parser.on("data", (record) => {
console.log("Parsed record:", record);
});
// Handle skipped records (when skip_records_with_error is true)
parser.on("skip", (err, rawRecord) => {
console.warn("Skipped record due to error:", rawRecord);
console.warn("Error:", err.message);
});
// Handle fatal errors
parser.on("error", (err) => {
console.error("Fatal parsing error:", err.code, err.message);
});
// Handle completion
parser.on("end", () => {
console.log("Parsing completed successfully");
console.log("Final statistics:", parser.info);
});For environments supporting Web Streams API, use the dedicated csv-parse/stream entry point which provides a different parse function that returns a TransformStream instead of a Node.js Transform stream.
// Import from csv-parse/stream
import {
parse,
CsvError,
CastingContext,
CastingFunction,
CastingDateFunction,
ColumnOption,
Options,
OptionsNormalized,
Info,
CsvErrorCode
} from "csv-parse/stream";
/**
* Creates a Web Streams compatible TransformStream for CSV parsing
* Note: This is a different function from the main parse() function
* @param options - Parsing configuration options
* @returns TransformStream instance for Web Streams API
*/
function parse(options?: Options): TransformStream;Usage Examples:
import { parse } from "csv-parse/stream";
// Web Streams API usage
const csvData = new ReadableStream({
start(controller) {
controller.enqueue("name,age\n");
controller.enqueue("Alice,25\n");
controller.enqueue("Bob,30\n");
controller.close();
}
});
const parser = parse({ columns: true });
csvData
.pipeThrough(parser)
.pipeTo(new WritableStream({
write(record) {
console.log("Parsed record:", record);
}
}));Complex streaming scenarios with backpressure handling and error recovery.
Usage Examples:
import { parse } from "csv-parse";
import { pipeline } from "stream/promises";
import fs from "fs";
// Pipeline with error handling
try {
await pipeline(
fs.createReadStream("input.csv"),
parse({
columns: true,
skip_records_with_error: true,
relax_column_count: true
}),
async function* (source) {
for await (const record of source) {
// Transform each record
yield {
...record,
processed_at: new Date().toISOString()
};
}
},
fs.createWriteStream("output.json")
);
console.log("Pipeline completed successfully");
} catch (error) {
console.error("Pipeline failed:", error);
}
// Manual backpressure handling
const parser = parse({ columns: true });
let backpressure = false;
parser.on("data", (record) => {
if (!backpressure) {
const canContinue = processRecord(record);
if (!canContinue) {
backpressure = true;
parser.pause();
// Resume when ready
setTimeout(() => {
backpressure = false;
parser.resume();
}, 100);
}
}
});
function processRecord(record) {
// Process record and return false if backpressure needed
console.log("Processing:", record);
return true;
}