Apache Flink CSV format support for reading and writing CSV data in stream and batch processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-csv@2.1.0Apache Flink CSV format module provides comprehensive CSV format support for Apache Flink, enabling reading and writing of CSV data in both stream and batch processing scenarios. It includes serialization and deserialization schemas for converting between CSV format and Flink's internal row data structures, bulk writers for efficient CSV output, input formats for reading CSV files, and configurable options for handling various CSV dialects.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.formats.csv.RowCsvInputFormat;
import org.apache.flink.formats.csv.CsvFormatOptions;
import org.apache.flink.formats.csv.CsvRowSchemaConverter;import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create CSV reader format for POJO
CsvReaderFormat<MyPojo> csvFormat = CsvReaderFormat.forPojo(MyPojo.class);
// Create file source
FileSource<MyPojo> source = FileSource
.forRecordStreamFormat(csvFormat, path)
.build();
// Use in streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyPojo> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "csv-source");import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.table.types.logical.RowType;
// Create serialization schema
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
.setFieldDelimiter('|')
.setQuoteCharacter('"')
.setIgnoreParseErrors(true)
.build();
// Serialize row data
byte[] csvBytes = schema.serialize(rowData);import org.apache.flink.formats.csv.RowCsvInputFormat;
import org.apache.flink.core.fs.Path;
// Create CSV input format
RowCsvInputFormat inputFormat = RowCsvInputFormat
.builder(typeInfo, new Path("path/to/file.csv"))
.setFieldDelimiter(',')
.setIgnoreParseErrors(true)
.build();
// Use with DataSet API
DataSet<Row> csvData = env.createInput(inputFormat);The Apache Flink CSV format module is organized around several key components:
CsvReaderFormat for streaming file sources with type-safe POJO and schema-based readingCsvRowDataSerializationSchema for converting internal row data to CSV bytesRowCsvInputFormat for batch processing of CSV filesCsvFormatOptions defining all configurable CSV dialect optionsCsvRowSchemaConverter for converting Flink type information to Jackson CSV schemasStreaming CSV file processing with type-safe deserialization, automatic schema detection, and configurable error handling.
public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);
public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);
public CsvReaderFormat<T> withIgnoreParseErrors();
}Convert Flink's internal row data structures to CSV format with extensive configuration options for different CSV dialects.
public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {
public byte[] serialize(RowData element);
public static class Builder {
public Builder(RowType rowType);
public Builder setFieldDelimiter(char delimiter);
public Builder setQuoteCharacter(char quoteCharacter);
public Builder disableQuoteCharacter();
public Builder setEscapeCharacter(char escapeCharacter);
public CsvRowDataSerializationSchema build();
}
}Traditional batch processing of CSV files with DataSet API integration and configurable field selection.
public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);
public static class Builder {
public Builder setFieldDelimiter(char delimiter);
public Builder setQuoteCharacter(char quoteCharacter);
public Builder setIgnoreParseErrors(boolean ignoreParseErrors);
public Builder setSelectedFields(int[] selectedFields);
public RowCsvInputFormat build();
}
}Comprehensive configuration system for handling various CSV dialects and processing requirements.
public class CsvFormatOptions {
public static final ConfigOption<String> FIELD_DELIMITER;
public static final ConfigOption<String> QUOTE_CHARACTER;
public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER;
public static final ConfigOption<Boolean> ALLOW_COMMENTS;
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER;
public static final ConfigOption<String> ESCAPE_CHARACTER;
public static final ConfigOption<String> NULL_LITERAL;
public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
}Utility functions for converting between Flink type information and Jackson CSV schemas.
public class CsvRowSchemaConverter {
public static CsvSchema convert(RowTypeInfo rowType);
public static CsvSchema convert(RowType rowType);
}// Core Flink types used throughout the API
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.ConfigOption;
// Jackson CSV types for schema definition
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
// Serialization interfaces
import org.apache.flink.api.common.serialization.SerializationSchema;
// File processing interfaces
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.api.common.io.FileInputFormat;