The CsvParserStream class is a Node.js Transform stream that handles CSV parsing with chainable transformation and validation capabilities. It extends the standard Transform stream interface while providing additional methods for data processing.
Core streaming CSV parser with transformation and validation support.
/**
* CSV parsing transform stream with chainable operations
*/
class CsvParserStream<I extends Row, O extends Row> extends Transform {
/**
* Create a new CSV parser stream
* @param parserOptions - Configuration options for parsing behavior
*/
constructor(parserOptions: ParserOptions);
/**
* Set a transformation function for processing each row
* @param transformFunction - Function to transform each parsed row
* @returns This instance for method chaining
*/
transform(transformFunction: RowTransformFunction<I, O>): CsvParserStream<I, O>;
/**
* Set a validation function for validating each row
* @param validateFunction - Function to validate each processed row
* @returns This instance for method chaining
*/
validate(validateFunction: RowValidate<O>): CsvParserStream<I, O>;
/**
* Override emit to handle end event properly
* @param event - Event name
* @param rest - Event arguments
* @returns True if event had listeners
*/
emit(event: string | symbol, ...rest: any[]): boolean;
}Transform each parsed row using synchronous or asynchronous functions.
type SyncRowTransform<I extends Row, O extends Row> = (row: I) => O;
type AsyncRowTransform<I extends Row, O extends Row> = (row: I, cb: RowTransformCallback<O>) => void;
type RowTransformFunction<I extends Row, O extends Row> = SyncRowTransform<I, O> | AsyncRowTransform<I, O>;
type RowTransformCallback<R extends Row> = (error?: Error | null, row?: R) => void;Synchronous Transform Examples:
import { parse } from "@fast-csv/parse";
// Basic data type conversion
parse({ headers: true })
.transform(row => ({
id: parseInt(row.id),
name: row.name.trim(),
email: row.email.toLowerCase(),
active: row.active === "true"
}))
.on("data", row => console.log(row));
// Complex object restructuring
parse({ headers: true })
.transform(row => ({
user: {
id: parseInt(row.user_id),
profile: {
name: row.name,
email: row.email
}
},
metadata: {
created: new Date(row.created_at),
active: row.status === "active"
}
}))
.on("data", row => console.log(row));Asynchronous Transform Examples:
import { parse } from "@fast-csv/parse";
// Async transformation with database lookup
parse({ headers: true })
.transform((row, cb) => {
// Simulate async operation
setTimeout(() => {
const transformedRow = {
...row,
timestamp: Date.now()
};
cb(null, transformedRow);
}, 10);
})
.on("data", row => console.log(row));
// Error handling in async transform
parse({ headers: true })
.transform((row, cb) => {
try {
if (!row.email.includes("@")) {
return cb(new Error(`Invalid email: ${row.email}`));
}
cb(null, { ...row, emailValid: true });
} catch (error) {
cb(error);
}
})
.on("error", error => console.error(error));Validate processed rows using synchronous or asynchronous validation functions.
type SyncRowValidate<R extends Row> = (row: R) => boolean;
type AsyncRowValidate<R extends Row> = (row: R, cb: RowValidateCallback) => void;
type RowValidate<R extends Row> = AsyncRowValidate<R> | SyncRowValidate<R>;
type RowValidateCallback = (error?: Error | null, isValid?: boolean, reason?: string) => void;
interface RowValidationResult<R extends Row> {
row: R | null;
isValid: boolean;
reason?: string;
}Synchronous Validation Examples:
import { parse } from "@fast-csv/parse";
// Simple validation
parse({ headers: true })
.validate(row => row.age >= 18)
.on("data", row => console.log(row)) // Only adults
.on("error", error => console.error(error));
// Complex validation with multiple conditions
parse({ headers: true })
.transform(row => ({
id: parseInt(row.id),
email: row.email,
age: parseInt(row.age)
}))
.validate(row => {
return row.id > 0 &&
row.email.includes("@") &&
row.age >= 18 &&
row.age <= 120;
})
.on("data", row => console.log(row));Asynchronous Validation Examples:
import { parse } from "@fast-csv/parse";
// Async validation with detailed error reporting
parse({ headers: true })
.validate((row, cb) => {
// Simulate async validation (e.g., database check)
setTimeout(() => {
if (!row.email.includes("@")) {
return cb(null, false, "Invalid email format");
}
if (parseInt(row.age) < 18) {
return cb(null, false, "Must be 18 or older");
}
cb(null, true);
}, 5);
})
.on("data", row => console.log(row))
.on("error", error => console.error(error));Transform and validate operations can be chained together:
import { parseFile } from "@fast-csv/parse";
parseFile("users.csv", { headers: true })
.transform(row => ({
id: parseInt(row.id),
name: row.name.trim().toUpperCase(),
email: row.email.toLowerCase(),
age: parseInt(row.age),
active: row.status === "active"
}))
.validate(row => row.age >= 18 && row.email.includes("@"))
.on("data", validUser => {
console.log("Valid adult user:", validUser);
})
.on("data-invalid", (invalidRow, rowNumber, reason) => {
console.log(`Row ${rowNumber} failed validation:`, invalidRow);
console.log(`Validation failed because: ${reason || "validation function returned false"}`);
})
.on("error", error => {
console.error("Processing error:", error);
})
.on("end", rowCount => {
console.log(`Processing complete. Total rows processed: ${rowCount}`);
});The CsvParserStream provides comprehensive error handling for both parsing errors and validation failures:
When validation fails, the data-invalid event is emitted instead of the data event:
import { parseFile } from "@fast-csv/parse";
parseFile("users.csv", { headers: true })
.validate((row, callback) => {
// Async validation with detailed error reporting
if (!row.email || !row.email.includes("@")) {
return callback(null, false, "Invalid email address");
}
if (!row.age || parseInt(row.age) < 18) {
return callback(null, false, "Age must be 18 or older");
}
callback(null, true);
})
.on("data", validRow => {
console.log("Valid row:", validRow);
})
.on("data-invalid", (invalidRow, rowNumber, reason) => {
console.error(`Row ${rowNumber} validation failed:`, reason);
console.error("Row data:", invalidRow);
// Log to error tracking system, skip row, etc.
})
.on("end", totalRows => {
console.log(`Finished processing ${totalRows} total rows`);
});import { parseFile } from "@fast-csv/parse";
let validRows = 0;
let invalidRows = 0;
let errorRows = 0;
parseFile("data.csv", { headers: true })
.transform(row => {
// Transform and potentially throw errors
try {
return {
id: parseInt(row.id),
email: row.email.toLowerCase(),
age: parseInt(row.age)
};
} catch (error) {
throw new Error(`Transform failed: ${error.message}`);
}
})
.validate(row => row.id > 0 && row.email.includes("@") && row.age >= 0)
.on("data", row => {
validRows++;
console.log("Processing valid row:", row);
})
.on("data-invalid", (row, rowNumber, reason) => {
invalidRows++;
console.warn(`Invalid row ${rowNumber}: ${reason}`);
})
.on("error", error => {
errorRows++;
console.error("Processing error:", error.message);
})
.on("end", totalRows => {
console.log("Processing Summary:");
console.log(`- Total rows: ${totalRows}`);
console.log(`- Valid rows: ${validRows}`);
console.log(`- Invalid rows: ${invalidRows}`);
console.log(`- Error rows: ${errorRows}`);
});CsvParserStream emits standard Node.js stream events plus additional CSV-specific events:
data: Emitted for each parsed and processed rowend: Emitted when all data has been processed (includes total row count as parameter)error: Emitted when parsing or processing errors occurclose: Emitted when the stream is closedheaders: Emitted when headers are parsed (only when headers option is used)data-invalid: Emitted when row validation fails (includes the row, row number, and validation reason)import { parseFile } from "@fast-csv/parse";
parseFile("data.csv", { headers: true })
.on("headers", headers => {
console.log("CSV headers:", headers);
})
.on("data", row => {
console.log("Row data:", row);
})
.on("data-invalid", (row, rowNumber, reason) => {
console.log(`Invalid row ${rowNumber}:`, row, `Reason: ${reason}`);
})
.on("end", rowCount => {
console.log(`Parsing complete. Processed ${rowCount} rows`);
})
.on("error", error => {
console.error("Error:", error);
});