CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-csv

Apache Flink CSV format support for reading and writing CSV data in stream and batch processing

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration Options

Comprehensive configuration system for handling various CSV dialects and processing requirements through the CsvFormatOptions class, providing standardized configuration keys for Table API/SQL integration and programmatic usage.

Capabilities

CsvFormatOptions Class

Configuration options class providing standardized configuration keys for CSV format processing.

/**
 * Configuration options for CSV format processing
 * Contains standard configuration keys used across CSV components
 */
public class CsvFormatOptions {
    
    /**
     * Field delimiter character for separating values
     * Default: "," (comma)
     * Accepts: Any single character as string
     */
    public static final ConfigOption<String> FIELD_DELIMITER;
    
    /**
     * Quote character for enclosing field values containing special characters
     * Default: "\"" (double quote)
     * Accepts: Any single character as string
     */
    public static final ConfigOption<String> QUOTE_CHARACTER;
    
    /**
     * Flag to disable quote character usage entirely
     * Default: false
     * When true, fields are never quoted regardless of content
     */
    public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER;
    
    /**
     * Allow comment lines starting with '#' to be ignored during parsing
     * Default: false
     * When true, lines beginning with '#' are skipped
     */
    public static final ConfigOption<Boolean> ALLOW_COMMENTS;
    
    /**
     * Skip fields and rows with parse errors instead of failing
     * Default: false
     * When true, malformed records are ignored and processing continues
     */
    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
    
    /**
     * Array element delimiter for separating elements within array fields
     * Default: ";" (semicolon)
     * Used when serializing/deserializing array and collection types
     */
    public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER;
    
    /**
     * Escape character for escaping special characters within quoted fields
     * Default: null (no escape character)
     * When set, this character can escape quotes and other special characters
     */
    public static final ConfigOption<String> ESCAPE_CHARACTER;
    
    /**
     * Null literal string that represents null values in CSV data
     * Default: null (empty string represents null)
     * When set, this exact string is treated as null during parsing
     */
    public static final ConfigOption<String> NULL_LITERAL;
    
    /**
     * Control BigDecimal output format for numeric serialization
     * Default: true (use scientific notation)
     * When false, uses standard decimal notation for large numbers
     */
    public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
}

Usage Examples

Table API/SQL Configuration

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.formats.csv.CsvFormatOptions;

// Configure CSV format in Table API
TableEnvironment tEnv = TableEnvironment.create(settings);

// Create table with custom CSV configuration
tEnv.executeSql(
    "CREATE TABLE csv_source (" +
    "  name STRING," +
    "  age INT," +
    "  salary DECIMAL(10,2)" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/path/to/file.csv'," +
    "  'format' = 'csv'," +
    "  'csv.field-delimiter' = '|'," +
    "  'csv.quote-character' = '''," +
    "  'csv.ignore-parse-errors' = 'true'," +
    "  'csv.null-literal' = 'NULL'" +
    ")"
);

Programmatic Configuration

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.csv.CsvFormatOptions;

// Create configuration object
Configuration config = new Configuration();

// Set CSV format options
config.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
config.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
config.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
config.setString(CsvFormatOptions.NULL_LITERAL, "NULL");
config.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);

// Use configuration with format factories
// Configuration is automatically applied to CSV components

Builder Pattern Integration

The configuration options integrate seamlessly with builder patterns in CSV components:

import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.formats.csv.RowCsvInputFormat;

// Extract values from configuration
Configuration config = new Configuration();
String delimiter = config.getString(CsvFormatOptions.FIELD_DELIMITER, ",");
String quoteChar = config.getString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
Boolean ignoreErrors = config.getBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, false);

// Apply to serialization schema
CsvRowDataSerializationSchema serializer = new CsvRowDataSerializationSchema.Builder(rowType)
    .setFieldDelimiter(delimiter.charAt(0))
    .setQuoteCharacter(quoteChar.charAt(0))
    .build();

// Apply to input format  
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(typeInfo, paths)
    .setFieldDelimiter(delimiter.charAt(0))
    .setQuoteCharacter(quoteChar.charAt(0))
    .setIgnoreParseErrors(ignoreErrors)
    .build();

Configuration Scenarios

Standard CSV (RFC 4180)

// Default configuration matches RFC 4180 standard
Configuration standardCsv = new Configuration();
standardCsv.setString(CsvFormatOptions.FIELD_DELIMITER, ",");
standardCsv.setString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
standardCsv.setBoolean(CsvFormatOptions.DISABLE_QUOTE_CHARACTER, false);
standardCsv.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, false);
standardCsv.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, false);

// Handles: "name","age","active"
//          "John Doe",25,true
//          "Jane Smith",30,false

Tab-Separated Values (TSV)

// Configure for tab-delimited files
Configuration tsvConfig = new Configuration();
tsvConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "\t");
tsvConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "\"");

// Handles: name    age    active
//          John Doe    25    true
//          Jane Smith    30    false

Pipe-Delimited with Custom Quoting

// Configure for pipe-delimited data warehouse format
Configuration pipeConfig = new Configuration();
pipeConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
pipeConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
pipeConfig.setString(CsvFormatOptions.ESCAPE_CHARACTER, "\\");
pipeConfig.setString(CsvFormatOptions.NULL_LITERAL, "NULL");

// Handles: 'John Doe'|25|NULL
//          'Jane O\'Brien'|30|true

Unquoted Format

// Configure for unquoted, simple delimiter format
Configuration unquotedConfig = new Configuration();
unquotedConfig.setString(CsvFormatOptions.FIELD_DELIMITER, ",");
unquotedConfig.setBoolean(CsvFormatOptions.DISABLE_QUOTE_CHARACTER, true);
unquotedConfig.setString(CsvFormatOptions.NULL_LITERAL, "NULL");

// Handles: John Doe,25,NULL
//          Jane Smith,30,true

Fault-Tolerant Configuration

// Configure for robust parsing of messy data
Configuration robustConfig = new Configuration();
robustConfig.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
robustConfig.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);
robustConfig.setString(CsvFormatOptions.NULL_LITERAL, "NULL");

// Handles files with:
// # This is a comment line
// name,age,active
// John Doe,25,true
// Jane Smith,invalid_age,false  # This record will be skipped
// Bob Johnson,35,true

Scientific Notation Control

// Configure numeric formatting for financial data
Configuration financialConfig = new Configuration();
financialConfig.setBoolean(CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION, false);
financialConfig.setString(CsvFormatOptions.FIELD_DELIMITER, ",");

// Large numbers output as: 1234567890.12 instead of 1.23456789012E9

Complex Array Handling

// Configure for complex data types with arrays
Configuration arrayConfig = new Configuration();
arrayConfig.setString(CsvFormatOptions.ARRAY_ELEMENT_DELIMITER, "::");
arrayConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
arrayConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "\"");

// Handles: "John Doe"|"java::scala::python"|"senior"
//          "Jane Smith"|"javascript::typescript"|"junior"

Integration Patterns

Configuration Inheritance

// Base configuration for organization standard
Configuration baseConfig = new Configuration();
baseConfig.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
baseConfig.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
baseConfig.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);

// Project-specific overrides
Configuration projectConfig = new Configuration(baseConfig);
projectConfig.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
projectConfig.setString(CsvFormatOptions.NULL_LITERAL, "N/A");

Environment-Based Configuration

// Load configuration from environment or properties
Configuration envConfig = new Configuration();

// Check environment variables
String delimiter = System.getenv("CSV_DELIMITER");
if (delimiter != null) {
    envConfig.setString(CsvFormatOptions.FIELD_DELIMITER, delimiter);
}

String ignoreErrors = System.getenv("CSV_IGNORE_ERRORS");
if ("true".equalsIgnoreCase(ignoreErrors)) {
    envConfig.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
}

Dynamic Configuration

// Configure based on data source characteristics
public Configuration createConfigForSource(String sourceType) {
    Configuration config = new Configuration();
    
    switch (sourceType) {
        case "financial":
            config.setString(CsvFormatOptions.FIELD_DELIMITER, ",");
            config.setBoolean(CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION, false);
            config.setString(CsvFormatOptions.NULL_LITERAL, "NULL");
            break;
            
        case "log_files":
            config.setString(CsvFormatOptions.FIELD_DELIMITER, "\t");
            config.setBoolean(CsvFormatOptions.ALLOW_COMMENTS, true);
            config.setBoolean(CsvFormatOptions.IGNORE_PARSE_ERRORS, true);
            break;
            
        case "data_warehouse":
            config.setString(CsvFormatOptions.FIELD_DELIMITER, "|");
            config.setString(CsvFormatOptions.QUOTE_CHARACTER, "'");
            config.setString(CsvFormatOptions.ESCAPE_CHARACTER, "\\");
            break;
    }
    
    return config;
}

Validation and Error Handling

Configuration Validation

// Validate configuration consistency
public void validateCsvConfig(Configuration config) {
    String delimiter = config.getString(CsvFormatOptions.FIELD_DELIMITER, ",");
    String quote = config.getString(CsvFormatOptions.QUOTE_CHARACTER, "\"");
    Boolean disableQuote = config.getBoolean(CsvFormatOptions.DISABLE_QUOTE_CHARACTER, false);
    
    // Validate delimiter is single character
    if (delimiter.length() != 1) {
        throw new IllegalArgumentException("Field delimiter must be a single character");
    }
    
    // Validate quote character consistency
    if (!disableQuote && quote.length() != 1) {
        throw new IllegalArgumentException("Quote character must be a single character");
    }
    
    // Validate delimiter and quote are different
    if (!disableQuote && delimiter.equals(quote)) {
        throw new IllegalArgumentException("Field delimiter and quote character must be different");
    }
}

Default Value Handling

All configuration options have sensible defaults that work for standard CSV files:

  • Field delimiter: Comma (,) - most common CSV delimiter
  • Quote character: Double quote (") - RFC 4180 standard
  • Disable quoting: false - preserves data integrity
  • Allow comments: false - strict parsing by default
  • Ignore errors: false - fail-fast approach for data quality
  • Array delimiter: Semicolon (;) - doesn't conflict with common CSV content
  • Escape character: None - relies on proper quoting
  • Null literal: None - empty strings represent null values
  • Scientific notation: true - preserves precision for large numbers

These defaults ensure that CSV components work correctly with minimal configuration while providing flexibility for customization when needed.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-csv

docs

batch-processing.md

configuration.md

index.md

schema-conversion.md

serialization.md

stream-processing.md

tile.json