CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-csv

Apache Flink CSV format support for reading and writing CSV data in stream and batch processing

Pending
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Streaming CSV file processing with the CsvReaderFormat class, providing type-safe deserialization, automatic schema detection, and configurable error handling for DataStream API integration.

Capabilities

CsvReaderFormat Class

Main class for reading CSV files in streaming applications, extending Flink's SimpleStreamFormat.

/**
 * StreamFormat for reading CSV files with type safety and error handling
 * @param <T> The type of objects to deserialize CSV records into
 */
public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
    
    /**
     * Create a CSV reader format for POJO classes with automatic schema detection
     * @param pojoType The POJO class to deserialize into
     * @return CsvReaderFormat instance configured for the POJO type
     */
    public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);
    
    /**
     * Create a CSV reader format with predefined schema and type information
     * @param schema Jackson CsvSchema defining the CSV structure
     * @param typeInformation Flink TypeInformation for the target type
     * @return CsvReaderFormat instance configured with the provided schema
     */
    public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);
    
    /**
     * Create a CSV reader format with custom mapper factory and schema generator
     * @param mapperFactory Supplier for creating Jackson CsvMapper instances  
     * @param schemaGenerator Function to generate CsvSchema from CsvMapper
     * @param typeInformation Flink TypeInformation for the target type
     * @return CsvReaderFormat instance with custom configuration
     */
    public static <T> CsvReaderFormat<T> forSchema(
        SerializableSupplier<CsvMapper> mapperFactory,
        SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,
        TypeInformation<T> typeInformation
    );
    
    /**
     * Configure the reader to ignore parsing errors and skip malformed records
     * @return New CsvReaderFormat instance with error ignoring enabled
     */
    public CsvReaderFormat<T> withIgnoreParseErrors();
    
    /**
     * Create a reader instance for processing a specific input stream
     * @param config Flink configuration
     * @param stream Input stream to read from
     * @return StreamFormat.Reader instance for reading records
     */
    public StreamFormat.Reader<T> createReader(Configuration config, FSDataInputStream stream);
    
    /**
     * Get the type information for objects produced by this format
     * @return TypeInformation describing the output type
     */
    public TypeInformation<T> getProducedType();
}

Usage Examples

Basic POJO Reading

import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Define POJO class
public class Person {
    public String name;
    public int age;
    public boolean active;
    
    public Person() {} // Default constructor required
}

// Create CSV reader format
CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forPojo(Person.class);

// Create file source
FileSource<Person> source = FileSource
    .forRecordStreamFormat(csvFormat, new Path("persons.csv"))
    .build();

// Use in streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> persons = env.fromSource(
    source, 
    WatermarkStrategy.noWatermarks(), 
    "csv-source"
);

persons.print();
env.execute("CSV Reading Job");

Schema-Based Reading

import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

// Define custom schema
CsvSchema schema = CsvSchema.builder()
    .addColumn("name", CsvSchema.ColumnType.STRING)
    .addColumn("age", CsvSchema.ColumnType.NUMBER)  
    .addColumn("salary", CsvSchema.ColumnType.NUMBER)
    .setUseHeader(true)
    .setColumnSeparator('|')
    .build();

// Create reader format with schema
CsvReaderFormat<Row> csvFormat = CsvReaderFormat.forSchema(
    schema,
    TypeInformation.of(Row.class)
);

// Use with file source
FileSource<Row> source = FileSource
    .forRecordStreamFormat(csvFormat, new Path("employees.csv"))
    .build();

Error Handling

// Create reader format that ignores parsing errors
CsvReaderFormat<Person> csvFormat = CsvReaderFormat
    .forPojo(Person.class)
    .withIgnoreParseErrors();

// Malformed records will be skipped instead of causing job failure
FileSource<Person> source = FileSource
    .forRecordStreamFormat(csvFormat, new Path("dirty-data.csv"))
    .build();

DataStream<Person> cleanData = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "fault-tolerant-csv-source"
);

Custom Mapper Configuration

import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.databind.DeserializationFeature;

// Create custom mapper factory
SerializableSupplier<CsvMapper> mapperFactory = () -> {
    CsvMapper mapper = new CsvMapper();
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
    return mapper;
};

// Create schema generator
SerializableFunction<CsvMapper, CsvSchema> schemaGenerator = mapper -> 
    mapper.schemaFor(Person.class).withHeader();

// Create reader format with custom configuration
CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forSchema(
    mapperFactory,
    schemaGenerator,
    TypeInformation.of(Person.class)
);

Integration with File Sources

Monitoring Directory

import org.apache.flink.connector.file.src.enumerate.ContinuousFileEnumerator;

// Monitor directory for new CSV files
FileSource<Person> source = FileSource
    .forRecordStreamFormat(csvFormat, new Path("input-directory/"))
    .monitorContinuously(Duration.ofSeconds(10))
    .build();

DataStream<Person> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "monitoring-csv-source"
);

Processing Multiple Files

// Process multiple CSV files with same format
FileSource<Person> source = FileSource
    .forRecordStreamFormat(
        csvFormat,
        new Path("file1.csv"),
        new Path("file2.csv"),
        new Path("file3.csv")
    )
    .build();

Type Safety and Validation

The CsvReaderFormat provides compile-time type safety through generic type parameters and runtime validation through Jackson's deserialization:

  • Compile-time safety: Generic type parameter <T> ensures type consistency
  • Runtime validation: Jackson validates field types and structure during deserialization
  • Error recovery: withIgnoreParseErrors() allows graceful handling of malformed records
  • Schema validation: Custom schemas can enforce strict field requirements

Performance Considerations

  • Schema reuse: Create CsvReaderFormat instances once and reuse them across multiple sources
  • POJO optimization: Simple POJOs with public fields provide better performance than complex beans
  • Error handling: Enable withIgnoreParseErrors() for better throughput when data quality issues are expected
  • File splitting: Large CSV files are automatically split for parallel processing by Flink's file source

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-csv

docs

batch-processing.md

configuration.md

index.md

schema-conversion.md

serialization.md

stream-processing.md

tile.json