CSV parsing implementing the Node.js stream.Transform API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The streaming API provides the core CSV parsing functionality through Node.js Transform streams, ideal for processing large files and real-time data streams with memory efficiency.
Creates a Parser instance that implements the Node.js Transform stream interface. Supports multiple calling patterns for flexible usage.
/**
* Creates a Parser transform stream for CSV processing
* @param input - Optional CSV input data
* @param options - Parsing configuration options
* @param callback - Optional callback for collecting all records
* @returns Parser transform stream instance
*/
function parse(): Parser;
function parse(options: Options): Parser;
function parse(callback: Callback): Parser;
function parse(options: Options, callback: Callback): Parser;
function parse(input: string | Buffer | Uint8Array): Parser;
function parse(input: string | Buffer | Uint8Array, callback: Callback): Parser;
function parse(input: string | Buffer | Uint8Array, options: Options): Parser;
function parse(input: string | Buffer | Uint8Array, options: Options, callback: Callback): Parser;Usage Examples:
import { parse } from "csv-parse";
// Transform stream without callback
const parser = parse({ columns: true });
parser.on("readable", function () {
let record;
while ((record = parser.read()) !== null) {
console.log(record);
}
});
// With input data and callback
parse("name,age\nAlice,25", { columns: true }, (err, records) => {
if (err) throw err;
console.log(records); // [{ name: 'Alice', age: '25' }]
});
// Stream processing of large files
import fs from "fs";
fs.createReadStream("large-file.csv")
.pipe(parse({ columns: true }))
.on("data", (record) => {
// Process each record as it's parsed
console.log(record);
})
.on("error", (err) => {
console.error("Parsing error:", err);
})
.on("end", () => {
console.log("Parsing complete");
});Transform stream class that extends Node.js stream.Transform for CSV parsing operations.
/**
* CSV parser transform stream extending Node.js Transform
*/
class Parser extends Transform {
/**
* Creates a new Parser instance
* @param options - Configuration options for parsing behavior
*/
constructor(options: Options);
/** Normalized parsing options */
readonly options: OptionsNormalized;
/** Current parsing state and statistics */
readonly info: Info;
/**
* Internal method for pushing parsed records
* @param line - Parsed record data
* @returns CsvError if parsing error occurred, undefined otherwise
*/
__push(line: any): CsvError | undefined;
}Usage Examples:
import { Parser } from "csv-parse";
// Manual parser instantiation
const parser = new Parser({
columns: true,
skip_empty_lines: true,
trim: true
});
// Access parser state
parser.on("readable", function () {
console.log("Current info:", this.info);
console.log("Records processed:", this.info.records);
console.log("Lines processed:", this.info.lines);
let record;
while ((record = this.read()) !== null) {
console.log(record);
}
});
// Write data to parser
parser.write("name,age,city\n");
parser.write("Alice,25,New York\n");
parser.write("Bob,30,Boston\n");
parser.end();The Parser emits standard Node.js stream events plus additional CSV-specific events.
// Standard Transform stream events
parser.on("readable", () => void);
parser.on("data", (record: any) => void);
parser.on("end", () => void);
parser.on("error", (err: CsvError) => void);
parser.on("close", () => void);
// CSV-specific events
parser.on("skip", (err: CsvError, chunk: string) => void);Usage Examples:
import { parse } from "csv-parse";
const parser = parse({
columns: true,
skip_records_with_error: true
});
// Handle successfully parsed records
parser.on("data", (record) => {
console.log("Parsed record:", record);
});
// Handle skipped records (when skip_records_with_error is true)
parser.on("skip", (err, rawRecord) => {
console.warn("Skipped record due to error:", rawRecord);
console.warn("Error:", err.message);
});
// Handle fatal errors
parser.on("error", (err) => {
console.error("Fatal parsing error:", err.code, err.message);
});
// Handle completion
parser.on("end", () => {
console.log("Parsing completed successfully");
console.log("Final statistics:", parser.info);
});For environments supporting Web Streams API, use the dedicated csv-parse/stream entry point which provides a different parse function that returns a TransformStream instead of a Node.js Transform stream.
// Import from csv-parse/stream
import {
parse,
CsvError,
CastingContext,
CastingFunction,
CastingDateFunction,
ColumnOption,
Options,
OptionsNormalized,
Info,
CsvErrorCode
} from "csv-parse/stream";
/**
* Creates a Web Streams compatible TransformStream for CSV parsing
* Note: This is a different function from the main parse() function
* @param options - Parsing configuration options
* @returns TransformStream instance for Web Streams API
*/
function parse(options?: Options): TransformStream;Usage Examples:
import { parse } from "csv-parse/stream";
// Web Streams API usage
const csvData = new ReadableStream({
start(controller) {
controller.enqueue("name,age\n");
controller.enqueue("Alice,25\n");
controller.enqueue("Bob,30\n");
controller.close();
}
});
const parser = parse({ columns: true });
csvData
.pipeThrough(parser)
.pipeTo(new WritableStream({
write(record) {
console.log("Parsed record:", record);
}
}));Complex streaming scenarios with backpressure handling and error recovery.
Usage Examples:
import { parse } from "csv-parse";
import { pipeline } from "stream/promises";
import fs from "fs";
// Pipeline with error handling
try {
await pipeline(
fs.createReadStream("input.csv"),
parse({
columns: true,
skip_records_with_error: true,
relax_column_count: true
}),
async function* (source) {
for await (const record of source) {
// Transform each record
yield {
...record,
processed_at: new Date().toISOString()
};
}
},
fs.createWriteStream("output.json")
);
console.log("Pipeline completed successfully");
} catch (error) {
console.error("Pipeline failed:", error);
}
// Manual backpressure handling
const parser = parse({ columns: true });
let backpressure = false;
parser.on("data", (record) => {
if (!backpressure) {
const canContinue = processRecord(record);
if (!canContinue) {
backpressure = true;
parser.pause();
// Resume when ready
setTimeout(() => {
backpressure = false;
parser.resume();
}, 100);
}
}
});
function processRecord(record) {
// Process record and return false if backpressure needed
console.log("Processing:", record);
return true;
}Install with Tessl CLI
npx tessl i tessl/npm-csv-parse