or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconfiguration.mdindex.mdschema-conversion.mdserialization.mdstream-processing.md
tile.json

tessl/maven-org-apache-flink--flink-csv

Apache Flink CSV format support for reading and writing CSV data in stream and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-csv@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-csv@2.1.0

index.mddocs/

Apache Flink CSV Format

Apache Flink CSV format module provides comprehensive CSV format support for Apache Flink, enabling reading and writing of CSV data in both stream and batch processing scenarios. It includes serialization and deserialization schemas for converting between CSV format and Flink's internal row data structures, bulk writers for efficient CSV output, input formats for reading CSV files, and configurable options for handling various CSV dialects.

Package Information

  • Package Name: flink-csv
  • Package Type: Maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-csv
  • Version: 2.1.0
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>2.1.0</version>
</dependency>

Core Imports

import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.formats.csv.RowCsvInputFormat;
import org.apache.flink.formats.csv.CsvFormatOptions;
import org.apache.flink.formats.csv.CsvRowSchemaConverter;

Basic Usage

Reading CSV files with DataStream API

import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Create CSV reader format for POJO
CsvReaderFormat<MyPojo> csvFormat = CsvReaderFormat.forPojo(MyPojo.class);

// Create file source
FileSource<MyPojo> source = FileSource
    .forRecordStreamFormat(csvFormat, path)
    .build();

// Use in streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyPojo> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "csv-source");

Writing CSV data with serialization schema

import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.table.types.logical.RowType;

// Create serialization schema
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
    .setFieldDelimiter('|')
    .setQuoteCharacter('"')
    .setIgnoreParseErrors(true)
    .build();

// Serialize row data
byte[] csvBytes = schema.serialize(rowData);

Batch processing with input format

import org.apache.flink.formats.csv.RowCsvInputFormat;
import org.apache.flink.core.fs.Path;

// Create CSV input format
RowCsvInputFormat inputFormat = RowCsvInputFormat
    .builder(typeInfo, new Path("path/to/file.csv"))
    .setFieldDelimiter(',')
    .setIgnoreParseErrors(true)
    .build();

// Use with DataSet API
DataSet<Row> csvData = env.createInput(inputFormat);

Architecture

The Apache Flink CSV format module is organized around several key components:

  • Reader Formats: CsvReaderFormat for streaming file sources with type-safe POJO and schema-based reading
  • Serialization Schemas: CsvRowDataSerializationSchema for converting internal row data to CSV bytes
  • Input Formats: RowCsvInputFormat for batch processing of CSV files
  • Configuration Options: CsvFormatOptions defining all configurable CSV dialect options
  • Schema Conversion: CsvRowSchemaConverter for converting Flink type information to Jackson CSV schemas
  • Format Factories: Internal factories for Table API/SQL integration with automatic format discovery

Capabilities

Stream Processing (Reader Formats)

Streaming CSV file processing with type-safe deserialization, automatic schema detection, and configurable error handling.

public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
    public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);
    public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);
    public CsvReaderFormat<T> withIgnoreParseErrors();
}

Stream Processing

Data Serialization

Convert Flink's internal row data structures to CSV format with extensive configuration options for different CSV dialects.

public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {
    public byte[] serialize(RowData element);
    
    public static class Builder {
        public Builder(RowType rowType);
        public Builder setFieldDelimiter(char delimiter);
        public Builder setQuoteCharacter(char quoteCharacter);
        public Builder disableQuoteCharacter();
        public Builder setEscapeCharacter(char escapeCharacter);
        public CsvRowDataSerializationSchema build();
    }
}

Data Serialization

Batch Processing (Input Formats)

Traditional batch processing of CSV files with DataSet API integration and configurable field selection.

public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
    public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);
    
    public static class Builder {
        public Builder setFieldDelimiter(char delimiter);
        public Builder setQuoteCharacter(char quoteCharacter);
        public Builder setIgnoreParseErrors(boolean ignoreParseErrors);
        public Builder setSelectedFields(int[] selectedFields);
        public RowCsvInputFormat build();
    }
}

Batch Processing

Configuration Options

Comprehensive configuration system for handling various CSV dialects and processing requirements.

public class CsvFormatOptions {
    public static final ConfigOption<String> FIELD_DELIMITER;
    public static final ConfigOption<String> QUOTE_CHARACTER;
    public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER;
    public static final ConfigOption<Boolean> ALLOW_COMMENTS;
    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
    public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER;
    public static final ConfigOption<String> ESCAPE_CHARACTER;
    public static final ConfigOption<String> NULL_LITERAL;
    public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
}

Configuration Options

Schema Conversion

Utility functions for converting between Flink type information and Jackson CSV schemas.

public class CsvRowSchemaConverter {
    public static CsvSchema convert(RowTypeInfo rowType);
    public static CsvSchema convert(RowType rowType);
}

Schema Conversion

Types

// Core Flink types used throughout the API
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.ConfigOption;

// Jackson CSV types for schema definition
import com.fasterxml.jackson.dataformat.csv.CsvSchema;

// Serialization interfaces
import org.apache.flink.api.common.serialization.SerializationSchema;

// File processing interfaces
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.api.common.io.FileInputFormat;