Apache Flink CSV format support for reading and writing CSV data in stream and batch processing
—
Comprehensive configuration system for handling various CSV dialects and processing requirements through the CsvFormatOptions class, providing standardized configuration keys for Table API/SQL integration and programmatic usage.
Configuration options class providing standardized configuration keys for CSV format processing.
/**
* Configuration options for CSV format processing
* Contains standard configuration keys used across CSV components
*/
public class CsvFormatOptions {
/**
* Field delimiter character for separating values
* Default: "," (comma)
* Accepts: Any single character as string
*/
public static final ConfigOption<String> FIELD_DELIMITER;
/**
* Quote character for enclosing field values containing special characters
* Default: "\"" (double quote)
* Accepts: Any single character as string
*/
public static final ConfigOption<String> QUOTE_CHARACTER;
/**
* Flag to disable quote character usage entirely
* Default: false
* When true, fields are never quoted regardless of content
*/
public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER;
/**
* Allow comment lines starting with '#' to be ignored during parsing
* Default: false
* When true, lines beginning with '#' are skipped
*/
public static final ConfigOption<Boolean> ALLOW_COMMENTS;
/**
* Skip fields and rows with parse errors instead of failing
* Default: false
* When true, malformed records are ignored and processing continues
*/
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
/**
* Array element delimiter for separating elements within array fields
* Default: ";" (semicolon)
* Used when serializing/deserializing array and collection types
*/
public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER;
/**
* Escape character for escaping special characters within quoted fields
* Default: null (no escape character)
* When set, this character can escape quotes and other special characters
*/
public static final ConfigOption<String> ESCAPE_CHARACTER;
/**
* Null literal string that represents null values in CSV data
* Default: null (empty string represents null)
* When set, this exact string is treated as null during parsing
*/
public static final ConfigOption<String> NULL_LITERAL;
/**
* Control BigDecimal output format for numeric serialization
* Default: true (use scientific notation)
* When false, uses standard decimal notation for large numbers
*/
public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
}import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.formats.csv.CsvFormatOptions;
// Configure CSV format in Table API
TableEnvironment tEnv = TableEnvironment.create(settings);
// Create table with custom CSV configuration
tEnv.executeSql(
"CREATE TABLE csv_source (" +
" name STRING," +
" age INT," +
" salary DECIMAL(10,2)" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/path/to/file.csv'," +
" 'format' = 'csv'," +
" 'csv.field-delimiter' = '|'," +
" 'csv.quote-character' = '''," +
" 'csv.ignore-parse-errors' = 'true'," +
" 'csv.null-literal' = 'NULL'" +
")"
);import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.csv.CsvFormatOptions;
// Create configuration object
Configuration config = new Configuration();
// Set CSV format options
config.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
config.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
config.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
config.setString(CsvFormatOptions.NULL_LITERAL, "NULL");
config.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);
// Use configuration with format factories
// Configuration is automatically applied to CSV componentsThe configuration options integrate seamlessly with builder patterns in CSV components:
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.formats.csv.RowCsvInputFormat;
// Extract values from configuration
Configuration config = new Configuration();
String delimiter = config.getString(CsvFormatOptions.FIELD_DELIMITER, ",");
String quoteChar = config.getString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
Boolean ignoreErrors = config.getBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, false);
// Apply to serialization schema
CsvRowDataSerializationSchema serializer = new CsvRowDataSerializationSchema.Builder(rowType)
.setFieldDelimiter(delimiter.charAt(0))
.setQuoteCharacter(quoteChar.charAt(0))
.build();
// Apply to input format
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(typeInfo, paths)
.setFieldDelimiter(delimiter.charAt(0))
.setQuoteCharacter(quoteChar.charAt(0))
.setIgnoreParseErrors(ignoreErrors)
.build();// Default configuration matches RFC 4180 standard
Configuration standardCsv = new Configuration();
standardCsv.setString(CsvFormatOptions.FIELD_DELIMITER, ",");
standardCsv.setString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
standardCsv.setBoolean(CsvFormatOptions.DISABLE_QUOTE_CHARACTER, false);
standardCsv.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, false);
standardCsv.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, false);
// Handles: "name","age","active"
// "John Doe",25,true
// "Jane Smith",30,false// Configure for tab-delimited files
Configuration tsvConfig = new Configuration();
tsvConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "\t");
tsvConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
// Handles: name age active
// John Doe 25 true
// Jane Smith 30 false// Configure for pipe-delimited data warehouse format
Configuration pipeConfig = new Configuration();
pipeConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
pipeConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
pipeConfig.setString(CsvFormatOptions.ESCAPE_CHARACTER, "\\");
pipeConfig.setString(CsvFormatOptions.NULL_LITERAL, "NULL");
// Handles: 'John Doe'|25|NULL
// 'Jane O\'Brien'|30|true// Configure for unquoted, simple delimiter format
Configuration unquotedConfig = new Configuration();
unquotedConfig.setString(CsvFormatOptions.FIELD_DELIMITER, ",");
unquotedConfig.setBoolean(CsvFormatOptions.DISABLE_QUOTE_CHARACTER, true);
unquotedConfig.setString(CsvFormatOptions.NULL_LITERAL, "NULL");
// Handles: John Doe,25,NULL
// Jane Smith,30,true// Configure for robust parsing of messy data
Configuration robustConfig = new Configuration();
robustConfig.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
robustConfig.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);
robustConfig.setString(CsvFormatOptions.NULL_LITERAL, "NULL");
// Handles files with:
// # This is a comment line
// name,age,active
// John Doe,25,true
// Jane Smith,invalid_age,false # This record will be skipped
// Bob Johnson,35,true// Configure numeric formatting for financial data
Configuration financialConfig = new Configuration();
financialConfig.setBoolean(CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION, false);
financialConfig.setString(CsvFormatOptions.FIELD_DELIMITER, ",");
// Large numbers output as: 1234567890.12 instead of 1.23456789012E9// Configure for complex data types with arrays
Configuration arrayConfig = new Configuration();
arrayConfig.setString(CsvFormatOptions.ARRAY_ELEMENT_DELIMITER, "::");
arrayConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
arrayConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
// Handles: "John Doe"|"java::scala::python"|"senior"
// "Jane Smith"|"javascript::typescript"|"junior"// Base configuration for organization standard
Configuration baseConfig = new Configuration();
baseConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
baseConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
baseConfig.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);
// Project-specific overrides
Configuration projectConfig = new Configuration(baseConfig);
projectConfig.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
projectConfig.setString(CsvFormatOptions.NULL_LITERAL, "N/A");// Load configuration from environment or properties
Configuration envConfig = new Configuration();
// Check environment variables
String delimiter = System.getenv("CSV_DELIMITER");
if (delimiter != null) {
envConfig.setString(CsvFormatOptions.FIELD_DELIMITER, delimiter);
}
String ignoreErrors = System.getenv("CSV_IGNORE_ERRORS");
if ("true".equalsIgnoreCase(ignoreErrors)) {
envConfig.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
}// Configure based on data source characteristics
public Configuration createConfigForSource(String sourceType) {
Configuration config = new Configuration();
switch (sourceType) {
case "financial":
config.setString(CsvFormatOptions.FIELD_DELIMITER, ",");
config.setBoolean(CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION, false);
config.setString(CsvFormatOptions.NULL_LITERAL, "NULL");
break;
case "log_files":
config.setString(CsvFormatOptions.FIELD_DELIMITER, "\t");
config.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);
config.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
break;
case "data_warehouse":
config.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
config.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
config.setString(CsvFormatOptions.ESCAPE_CHARACTER, "\\");
break;
}
return config;
}// Validate configuration consistency
public void validateCsvConfig(Configuration config) {
String delimiter = config.getString(CsvFormatOptions.FIELD_DELIMITER, ",");
String quote = config.getString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
Boolean disableQuote = config.getBoolean(CsvFormatOptions.DISABLE_QUOTE_CHARACTER, false);
// Validate delimiter is single character
if (delimiter.length() != 1) {
throw new IllegalArgumentException("Field delimiter must be a single character");
}
// Validate quote character consistency
if (!disableQuote && quote.length() != 1) {
throw new IllegalArgumentException("Quote character must be a single character");
}
// Validate delimiter and quote are different
if (!disableQuote && delimiter.equals(quote)) {
throw new IllegalArgumentException("Field delimiter and quote character must be different");
}
}All configuration options have sensible defaults that work for standard CSV files:
,) - most common CSV delimiter") - RFC 4180 standardfalse - preserves data integrityfalse - strict parsing by defaultfalse - fail-fast approach for data quality;) - doesn't conflict with common CSV contenttrue - preserves precision for large numbersThese defaults ensure that CSV components work correctly with minimal configuration while providing flexibility for customization when needed.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-csv