Apache Flink CSV format support for reading and writing CSV data in stream and batch processing
—
Streaming CSV file processing with the CsvReaderFormat class, providing type-safe deserialization, automatic schema detection, and configurable error handling for DataStream API integration.
Main class for reading CSV files in streaming applications, extending Flink's SimpleStreamFormat.
/**
* StreamFormat for reading CSV files with type safety and error handling
* @param <T> The type of objects to deserialize CSV records into
*/
public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
/**
* Create a CSV reader format for POJO classes with automatic schema detection
* @param pojoType The POJO class to deserialize into
* @return CsvReaderFormat instance configured for the POJO type
*/
public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);
/**
* Create a CSV reader format with predefined schema and type information
* @param schema Jackson CsvSchema defining the CSV structure
* @param typeInformation Flink TypeInformation for the target type
* @return CsvReaderFormat instance configured with the provided schema
*/
public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);
/**
* Create a CSV reader format with custom mapper factory and schema generator
* @param mapperFactory Supplier for creating Jackson CsvMapper instances
* @param schemaGenerator Function to generate CsvSchema from CsvMapper
* @param typeInformation Flink TypeInformation for the target type
* @return CsvReaderFormat instance with custom configuration
*/
public static <T> CsvReaderFormat<T> forSchema(
SerializableSupplier<CsvMapper> mapperFactory,
SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,
TypeInformation<T> typeInformation
);
/**
* Configure the reader to ignore parsing errors and skip malformed records
* @return New CsvReaderFormat instance with error ignoring enabled
*/
public CsvReaderFormat<T> withIgnoreParseErrors();
/**
* Create a reader instance for processing a specific input stream
* @param config Flink configuration
* @param stream Input stream to read from
* @return StreamFormat.Reader instance for reading records
*/
public StreamFormat.Reader<T> createReader(Configuration config, FSDataInputStream stream);
/**
* Get the type information for objects produced by this format
* @return TypeInformation describing the output type
*/
public TypeInformation<T> getProducedType();
}import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Define POJO class
public class Person {
public String name;
public int age;
public boolean active;
public Person() {} // Default constructor required
}
// Create CSV reader format
CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forPojo(Person.class);
// Create file source
FileSource<Person> source = FileSource
.forRecordStreamFormat(csvFormat, new Path("persons.csv"))
.build();
// Use in streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> persons = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"csv-source"
);
persons.print();
env.execute("CSV Reading Job");import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
// Define custom schema
CsvSchema schema = CsvSchema.builder()
.addColumn("name", CsvSchema.ColumnType.STRING)
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("salary", CsvSchema.ColumnType.NUMBER)
.setUseHeader(true)
.setColumnSeparator('|')
.build();
// Create reader format with schema
CsvReaderFormat<Row> csvFormat = CsvReaderFormat.forSchema(
schema,
TypeInformation.of(Row.class)
);
// Use with file source
FileSource<Row> source = FileSource
.forRecordStreamFormat(csvFormat, new Path("employees.csv"))
.build();// Create reader format that ignores parsing errors
CsvReaderFormat<Person> csvFormat = CsvReaderFormat
.forPojo(Person.class)
.withIgnoreParseErrors();
// Malformed records will be skipped instead of causing job failure
FileSource<Person> source = FileSource
.forRecordStreamFormat(csvFormat, new Path("dirty-data.csv"))
.build();
DataStream<Person> cleanData = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"fault-tolerant-csv-source"
);import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.databind.DeserializationFeature;
// Create custom mapper factory
SerializableSupplier<CsvMapper> mapperFactory = () -> {
CsvMapper mapper = new CsvMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
return mapper;
};
// Create schema generator
SerializableFunction<CsvMapper, CsvSchema> schemaGenerator = mapper ->
mapper.schemaFor(Person.class).withHeader();
// Create reader format with custom configuration
CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forSchema(
mapperFactory,
schemaGenerator,
TypeInformation.of(Person.class)
);import org.apache.flink.connector.file.src.enumerate.ContinuousFileEnumerator;
// Monitor directory for new CSV files
FileSource<Person> source = FileSource
.forRecordStreamFormat(csvFormat, new Path("input-directory/"))
.monitorContinuously(Duration.ofSeconds(10))
.build();
DataStream<Person> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"monitoring-csv-source"
);// Process multiple CSV files with same format
FileSource<Person> source = FileSource
.forRecordStreamFormat(
csvFormat,
new Path("file1.csv"),
new Path("file2.csv"),
new Path("file3.csv")
)
.build();The CsvReaderFormat provides compile-time type safety through generic type parameters and runtime validation through Jackson's deserialization:
<T> ensures type consistencywithIgnoreParseErrors() allows graceful handling of malformed recordsCsvReaderFormat instances once and reuse them across multiple sourceswithIgnoreParseErrors() for better throughput when data quality issues are expectedInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-csv