Apache Flink CSV format support for reading and writing CSV data in stream and batch processing
—
Traditional batch processing of CSV files using RowCsvInputFormat with DataSet API integration, configurable field selection, and comprehensive error handling for large-scale CSV data processing.
Input format for reading CSV files into Row objects for batch processing scenarios.
/**
* Input format that reads CSV files into Row objects for batch processing
* Extends AbstractCsvInputFormat with Row-specific functionality
*/
public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
/**
* Create a builder for configuring the CSV input format
* @param typeInfo Type information for the Row structure
* @param filePaths Paths to CSV files to process
* @return Builder instance for configuration
*/
public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);
/**
* Open a file input split for reading
* @param split The file input split to open
* @throws IOException if file cannot be opened
*/
public void open(FileInputSplit split) throws IOException;
/**
* Check if the end of the current split has been reached
* @return true if no more records are available
* @throws IOException if I/O error occurs
*/
public boolean reachedEnd() throws IOException;
/**
* Read the next record from the input split
* @param reuse Row object to reuse for the next record (can be null)
* @return Row containing the next record, or null if end reached
* @throws IOException if I/O error occurs
*/
public Row nextRecord(Row reuse) throws IOException;
/**
* Builder class for configuring CSV input format options
*/
public static class Builder {
/**
* Set the field delimiter character (default: ',')
* @param delimiter Character used to separate fields
* @return Builder instance for method chaining
*/
public Builder setFieldDelimiter(char delimiter);
/**
* Enable or disable comment line processing (default: false)
* Lines starting with '#' will be ignored when enabled
* @param allowComments Whether to ignore comment lines
* @return Builder instance for method chaining
*/
public Builder setAllowComments(boolean allowComments);
/**
* Set the array element delimiter for complex types (default: ';')
* @param delimiter String used to separate array elements
* @return Builder instance for method chaining
*/
public Builder setArrayElementDelimiter(String delimiter);
/**
* Set the quote character for field enclosure (default: '"')
* @param quoteCharacter Character used to quote fields with special characters
* @return Builder instance for method chaining
*/
public Builder setQuoteCharacter(char quoteCharacter);
/**
* Set the escape character for escaping special characters (no default)
* @param escapeCharacter Character used for escaping within quoted fields
* @return Builder instance for method chaining
*/
public Builder setEscapeCharacter(char escapeCharacter);
/**
* Set the null literal string for null value representation (no default)
* @param nullLiteral String that represents null values in CSV
* @return Builder instance for method chaining
*/
public Builder setNullLiteral(String nullLiteral);
/**
* Configure parse error handling (default: false)
* When true, malformed records are skipped instead of failing the job
* @param ignoreParseErrors Whether to skip malformed records
* @return Builder instance for method chaining
*/
public Builder setIgnoreParseErrors(boolean ignoreParseErrors);
/**
* Select specific fields by index for projection (optional)
* Only specified field indices will be read and included in output
* @param selectedFields Array of field indices to include (0-based)
* @return Builder instance for method chaining
*/
public Builder setSelectedFields(int[] selectedFields);
/**
* Build the configured CSV input format
* @return RowCsvInputFormat instance with specified configuration
*/
public RowCsvInputFormat build();
}
}import org.apache.flink.formats.csv.RowCsvInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.Path;
// Define row type information
RowTypeInfo typeInfo = new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO, // name
BasicTypeInfo.INT_TYPE_INFO, // age
BasicTypeInfo.BOOLEAN_TYPE_INFO // active
);
// Create CSV input format
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(typeInfo, new Path("employees.csv"))
.setFieldDelimiter(',')
.build();
// Use with DataSet API
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Row> csvData = env.createInput(inputFormat);
// Process the data
csvData.print();// Configure for pipe-delimited files with custom quoting
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(typeInfo, new Path("data.psv"))
.setFieldDelimiter('|')
.setQuoteCharacter('\'')
.setEscapeCharacter('\\')
.build();
// Handles files like: 'John|Doe'|25|'Software\\Engineer'// Configure for robust parsing with error tolerance
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(typeInfo, new Path("messy-data.csv"))
.setIgnoreParseErrors(true) // Skip malformed records
.setAllowComments(true) // Ignore lines starting with #
.setNullLiteral("NULL") // Treat "NULL" strings as null values
.build();
DataSet<Row> cleanData = env.createInput(inputFormat);
// Malformed records and comment lines will be automatically skipped// Read only specific fields (name and age, skip active field)
int[] selectedFields = {0, 1}; // Include only first two fields
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(
new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO, // name
BasicTypeInfo.INT_TYPE_INFO // age
),
new Path("employees.csv")
)
.setSelectedFields(selectedFields)
.build();
// Only name and age fields will be read, improving performance
DataSet<Row> projectedData = env.createInput(inputFormat);// Process multiple CSV files with the same schema
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(
typeInfo,
new Path("2021-data.csv"),
new Path("2022-data.csv"),
new Path("2023-data.csv")
)
.setFieldDelimiter(',')
.build();
DataSet<Row> allData = env.createInput(inputFormat);import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
// Handle arrays in CSV fields
RowTypeInfo typeInfo = new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO, // name
PrimitiveArrayTypeInfo.STRING_ARRAY_TYPE_INFO // tags
);
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(typeInfo, new Path("tagged-data.csv"))
.setArrayElementDelimiter(";")
.build();
// CSV: "John Doe","java;flink;streaming"
// Result: Row with name="John Doe", tags=["java", "flink", "streaming"]DataSet<Row> csvData = env.createInput(inputFormat);
// Filter records
DataSet<Row> adults = csvData.filter(row -> (Integer) row.getField(1) >= 18);
// Transform records
DataSet<Row> transformed = csvData.map(row -> {
Row newRow = new Row(3);
newRow.setField(0, ((String) row.getField(0)).toUpperCase()); // Uppercase name
newRow.setField(1, row.getField(1)); // Keep age
newRow.setField(2, row.getField(2)); // Keep active status
return newRow;
});// Group by active status and count
DataSet<Tuple2<Boolean, Long>> counts = csvData
.map(row -> new Tuple2<>((Boolean) row.getField(2), 1L))
.groupBy(0)
.sum(1);
// Calculate average age by active status
DataSet<Tuple2<Boolean, Double>> avgAge = csvData
.map(row -> new Tuple3<>((Boolean) row.getField(2), (Integer) row.getField(1), 1))
.groupBy(0)
.aggregate(Aggregations.SUM, 1)
.aggregate(Aggregations.SUM, 2)
.map(tuple -> new Tuple2<>(tuple.f0, (double) tuple.f1 / tuple.f2));// Read another CSV file
RowCsvInputFormat departmentFormat = RowCsvInputFormat
.builder(departmentTypeInfo, new Path("departments.csv"))
.build();
DataSet<Row> departments = env.createInput(departmentFormat);
// Join employees with departments
DataSet<Row> enriched = csvData
.join(departments)
.where(row -> row.getField(3)) // Employee department ID
.equalTo(row -> row.getField(0)) // Department ID
.with((emp, dept) -> {
Row result = new Row(5);
result.setField(0, emp.getField(0)); // Employee name
result.setField(1, emp.getField(1)); // Employee age
result.setField(2, emp.getField(2)); // Employee active
result.setField(3, dept.getField(1)); // Department name
result.setField(4, dept.getField(2)); // Department budget
return result;
});// Configure parallelism for CSV reading
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // Use 4 parallel instances
// Input format will automatically split large files for parallel processing
DataSet<Row> csvData = env.createInput(inputFormat);// Configure for large files with limited memory
Configuration config = new Configuration();
config.setLong("taskmanager.memory.process.size", 2048L * 1024 * 1024); // 2GB
// Use field projection to reduce memory usage
int[] selectedFields = {0, 1}; // Read only needed fields
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(projectedTypeInfo, new Path("large-file.csv"))
.setSelectedFields(selectedFields)
.build();Large CSV files are automatically split by Flink for parallel processing:
// Configure comprehensive error handling
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(typeInfo, new Path("unreliable-data.csv"))
.setIgnoreParseErrors(true) // Skip malformed records
.setAllowComments(true) // Skip comment lines
.setNullLiteral("N/A") // Handle various null representations
.build();
// Monitor processing with counters
DataSet<Row> processed = env.createInput(inputFormat)
.map(new RichMapFunction<Row, Row>() {
private Counter recordCounter;
private Counter errorCounter;
@Override
public void open(Configuration parameters) {
recordCounter = getRuntimeContext().getCounter("records-processed");
errorCounter = getRuntimeContext().getCounter("parse-errors");
}
@Override
public Row map(Row row) throws Exception {
recordCounter.add(1);
// Validate record and count errors
if (row.getField(0) == null) {
errorCounter.add(1);
}
return row;
}
});// Add data quality checks
DataSet<Row> validated = csvData
.filter(new FilterFunction<Row>() {
@Override
public boolean filter(Row row) throws Exception {
// Validate required fields
if (row.getField(0) == null || ((String) row.getField(0)).trim().isEmpty()) {
return false; // Skip records with empty names
}
// Validate age range
Integer age = (Integer) row.getField(1);
if (age == null || age < 0 || age > 150) {
return false; // Skip records with invalid ages
}
return true;
}
});The RowCsvInputFormat supports various CSV dialects and formats:
.setFieldDelimiter('\t').setFieldDelimiter('|')Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-csv