CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-csv

Apache Flink CSV format support for reading and writing CSV data in stream and batch processing

Pending
Overview
Eval results
Files

batch-processing.mddocs/

Batch Processing

Traditional batch processing of CSV files using RowCsvInputFormat with DataSet API integration, configurable field selection, and comprehensive error handling for large-scale CSV data processing.

Capabilities

RowCsvInputFormat Class

Input format for reading CSV files into Row objects for batch processing scenarios.

/**
 * Input format that reads CSV files into Row objects for batch processing
 * Extends AbstractCsvInputFormat with Row-specific functionality
 */
public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
    
    /**
     * Create a builder for configuring the CSV input format
     * @param typeInfo Type information for the Row structure
     * @param filePaths Paths to CSV files to process
     * @return Builder instance for configuration
     */
    public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);
    
    /**
     * Open a file input split for reading
     * @param split The file input split to open
     * @throws IOException if file cannot be opened
     */
    public void open(FileInputSplit split) throws IOException;
    
    /**
     * Check if the end of the current split has been reached
     * @return true if no more records are available
     * @throws IOException if I/O error occurs
     */
    public boolean reachedEnd() throws IOException;
    
    /**
     * Read the next record from the input split
     * @param reuse Row object to reuse for the next record (can be null)
     * @return Row containing the next record, or null if end reached
     * @throws IOException if I/O error occurs
     */
    public Row nextRecord(Row reuse) throws IOException;
    
    /**
     * Builder class for configuring CSV input format options
     */
    public static class Builder {
        
        /**
         * Set the field delimiter character (default: ',')
         * @param delimiter Character used to separate fields
         * @return Builder instance for method chaining
         */
        public Builder setFieldDelimiter(char delimiter);
        
        /**
         * Enable or disable comment line processing (default: false)
         * Lines starting with '#' will be ignored when enabled
         * @param allowComments Whether to ignore comment lines
         * @return Builder instance for method chaining
         */
        public Builder setAllowComments(boolean allowComments);
        
        /**
         * Set the array element delimiter for complex types (default: ';')
         * @param delimiter String used to separate array elements
         * @return Builder instance for method chaining
         */
        public Builder setArrayElementDelimiter(String delimiter);
        
        /**
         * Set the quote character for field enclosure (default: '"')
         * @param quoteCharacter Character used to quote fields with special characters
         * @return Builder instance for method chaining
         */
        public Builder setQuoteCharacter(char quoteCharacter);
        
        /**
         * Set the escape character for escaping special characters (no default)
         * @param escapeCharacter Character used for escaping within quoted fields
         * @return Builder instance for method chaining
         */
        public Builder setEscapeCharacter(char escapeCharacter);
        
        /**
         * Set the null literal string for null value representation (no default)
         * @param nullLiteral String that represents null values in CSV
         * @return Builder instance for method chaining
         */
        public Builder setNullLiteral(String nullLiteral);
        
        /**
         * Configure parse error handling (default: false)
         * When true, malformed records are skipped instead of failing the job
         * @param ignoreParseErrors Whether to skip malformed records
         * @return Builder instance for method chaining
         */
        public Builder setIgnoreParseErrors(boolean ignoreParseErrors);
        
        /**
         * Select specific fields by index for projection (optional)
         * Only specified field indices will be read and included in output
         * @param selectedFields Array of field indices to include (0-based)
         * @return Builder instance for method chaining
         */
        public Builder setSelectedFields(int[] selectedFields);
        
        /**
         * Build the configured CSV input format
         * @return RowCsvInputFormat instance with specified configuration
         */
        public RowCsvInputFormat build();
    }
}

Usage Examples

Basic CSV Reading

import org.apache.flink.formats.csv.RowCsvInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.Path;

// Define row type information
RowTypeInfo typeInfo = new RowTypeInfo(
    BasicTypeInfo.STRING_TYPE_INFO,   // name
    BasicTypeInfo.INT_TYPE_INFO,      // age
    BasicTypeInfo.BOOLEAN_TYPE_INFO   // active
);

// Create CSV input format
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(typeInfo, new Path("employees.csv"))
    .setFieldDelimiter(',')
    .build();

// Use with DataSet API
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Row> csvData = env.createInput(inputFormat);

// Process the data
csvData.print();

Custom Delimiter and Quoting

// Configure for pipe-delimited files with custom quoting
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(typeInfo, new Path("data.psv"))
    .setFieldDelimiter('|')
    .setQuoteCharacter('\'')
    .setEscapeCharacter('\\')
    .build();

// Handles files like: 'John|Doe'|25|'Software\\Engineer'

Error Handling and Comments

// Configure for robust parsing with error tolerance
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(typeInfo, new Path("messy-data.csv"))
    .setIgnoreParseErrors(true)     // Skip malformed records
    .setAllowComments(true)         // Ignore lines starting with #
    .setNullLiteral("NULL")         // Treat "NULL" strings as null values
    .build();

DataSet<Row> cleanData = env.createInput(inputFormat);
// Malformed records and comment lines will be automatically skipped

Field Projection

// Read only specific fields (name and age, skip active field)
int[] selectedFields = {0, 1}; // Include only first two fields

RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(
        new RowTypeInfo(
            BasicTypeInfo.STRING_TYPE_INFO,  // name
            BasicTypeInfo.INT_TYPE_INFO      // age
        ),
        new Path("employees.csv")
    )
    .setSelectedFields(selectedFields)
    .build();

// Only name and age fields will be read, improving performance
DataSet<Row> projectedData = env.createInput(inputFormat);

Multiple File Processing

// Process multiple CSV files with the same schema
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(
        typeInfo,
        new Path("2021-data.csv"),
        new Path("2022-data.csv"),
        new Path("2023-data.csv")
    )
    .setFieldDelimiter(',')
    .build();

DataSet<Row> allData = env.createInput(inputFormat);

Complex Type Handling

import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;

// Handle arrays in CSV fields
RowTypeInfo typeInfo = new RowTypeInfo(
    BasicTypeInfo.STRING_TYPE_INFO,              // name
    PrimitiveArrayTypeInfo.STRING_ARRAY_TYPE_INFO // tags
);

RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(typeInfo, new Path("tagged-data.csv"))
    .setArrayElementDelimiter(";")
    .build();

// CSV: "John Doe","java;flink;streaming"
// Result: Row with name="John Doe", tags=["java", "flink", "streaming"]

Integration with DataSet Operations

Filtering and Transformation

DataSet<Row> csvData = env.createInput(inputFormat);

// Filter records
DataSet<Row> adults = csvData.filter(row -> (Integer) row.getField(1) >= 18);

// Transform records
DataSet<Row> transformed = csvData.map(row -> {
    Row newRow = new Row(3);
    newRow.setField(0, ((String) row.getField(0)).toUpperCase()); // Uppercase name
    newRow.setField(1, row.getField(1)); // Keep age
    newRow.setField(2, row.getField(2)); // Keep active status
    return newRow;
});

Aggregation

// Group by active status and count
DataSet<Tuple2<Boolean, Long>> counts = csvData
    .map(row -> new Tuple2<>((Boolean) row.getField(2), 1L))
    .groupBy(0)
    .sum(1);

// Calculate average age by active status
DataSet<Tuple2<Boolean, Double>> avgAge = csvData
    .map(row -> new Tuple3<>((Boolean) row.getField(2), (Integer) row.getField(1), 1))
    .groupBy(0)
    .aggregate(Aggregations.SUM, 1)
    .aggregate(Aggregations.SUM, 2)
    .map(tuple -> new Tuple2<>(tuple.f0, (double) tuple.f1 / tuple.f2));

Joining with Other DataSets

// Read another CSV file
RowCsvInputFormat departmentFormat = RowCsvInputFormat
    .builder(departmentTypeInfo, new Path("departments.csv"))
    .build();

DataSet<Row> departments = env.createInput(departmentFormat);

// Join employees with departments
DataSet<Row> enriched = csvData
    .join(departments)
    .where(row -> row.getField(3)) // Employee department ID
    .equalTo(row -> row.getField(0)) // Department ID
    .with((emp, dept) -> {
        Row result = new Row(5);
        result.setField(0, emp.getField(0)); // Employee name
        result.setField(1, emp.getField(1)); // Employee age
        result.setField(2, emp.getField(2)); // Employee active
        result.setField(3, dept.getField(1)); // Department name
        result.setField(4, dept.getField(2)); // Department budget
        return result;
    });

Performance Optimization

Parallelism Configuration

// Configure parallelism for CSV reading
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // Use 4 parallel instances

// Input format will automatically split large files for parallel processing
DataSet<Row> csvData = env.createInput(inputFormat);

Memory Management

// Configure for large files with limited memory
Configuration config = new Configuration();
config.setLong("taskmanager.memory.process.size", 2048L * 1024 * 1024); // 2GB

// Use field projection to reduce memory usage
int[] selectedFields = {0, 1}; // Read only needed fields
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(projectedTypeInfo, new Path("large-file.csv"))
    .setSelectedFields(selectedFields)
    .build();

File Splitting

Large CSV files are automatically split by Flink for parallel processing:

  • Automatic splitting: Files larger than the configured split size are divided
  • Balanced distribution: Splits are distributed evenly across available task slots
  • Header handling: First split includes header, subsequent splits skip headers
  • Record boundary: Splits occur at record boundaries to maintain data integrity

Error Handling and Monitoring

Parse Error Recovery

// Configure comprehensive error handling
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(typeInfo, new Path("unreliable-data.csv"))
    .setIgnoreParseErrors(true)      // Skip malformed records
    .setAllowComments(true)          // Skip comment lines
    .setNullLiteral("N/A")           // Handle various null representations
    .build();

// Monitor processing with counters
DataSet<Row> processed = env.createInput(inputFormat)
    .map(new RichMapFunction<Row, Row>() {
        private Counter recordCounter;
        private Counter errorCounter;
        
        @Override
        public void open(Configuration parameters) {
            recordCounter = getRuntimeContext().getCounter("records-processed");
            errorCounter = getRuntimeContext().getCounter("parse-errors");
        }
        
        @Override
        public Row map(Row row) throws Exception {
            recordCounter.add(1);
            
            // Validate record and count errors
            if (row.getField(0) == null) {
                errorCounter.add(1);
            }
            
            return row;
        }
    });

Data Quality Validation

// Add data quality checks
DataSet<Row> validated = csvData
    .filter(new FilterFunction<Row>() {
        @Override
        public boolean filter(Row row) throws Exception {
            // Validate required fields
            if (row.getField(0) == null || ((String) row.getField(0)).trim().isEmpty()) {
                return false; // Skip records with empty names
            }
            
            // Validate age range
            Integer age = (Integer) row.getField(1);
            if (age == null || age < 0 || age > 150) {
                return false; // Skip records with invalid ages
            }
            
            return true;
        }
    });

File Format Support

The RowCsvInputFormat supports various CSV dialects and formats:

  • Standard CSV: RFC 4180 compliant CSV files
  • Tab-separated: Using tab delimiter with .setFieldDelimiter('\t')
  • Pipe-delimited: Common in data warehousing with .setFieldDelimiter('|')
  • Custom delimiters: Any single character delimiter
  • Quoted fields: Proper handling of quoted fields with embedded delimiters
  • Escaped content: Support for escape characters within quoted fields
  • Comment lines: Optional skipping of comment lines beginning with #

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-csv

docs

batch-processing.md

configuration.md

index.md

schema-conversion.md

serialization.md

stream-processing.md

tile.json