Apache Flink CSV format support for reading and writing CSV data in stream and batch processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-csv@2.1.00
# Apache Flink CSV Format
1
2
Apache Flink CSV format module provides comprehensive CSV format support for Apache Flink, enabling reading and writing of CSV data in both stream and batch processing scenarios. It includes serialization and deserialization schemas for converting between CSV format and Flink's internal row data structures, bulk writers for efficient CSV output, input formats for reading CSV files, and configurable options for handling various CSV dialects.
3
4
## Package Information
5
6
- **Package Name**: flink-csv
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-csv
11
- **Version**: 2.1.0
12
- **Installation**: Add to Maven dependencies:
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-csv</artifactId>
18
<version>2.1.0</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
```java
25
import org.apache.flink.formats.csv.CsvReaderFormat;
26
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
27
import org.apache.flink.formats.csv.RowCsvInputFormat;
28
import org.apache.flink.formats.csv.CsvFormatOptions;
29
import org.apache.flink.formats.csv.CsvRowSchemaConverter;
30
```
31
32
## Basic Usage
33
34
### Reading CSV files with DataStream API
35
36
```java
37
import org.apache.flink.connector.file.src.FileSource;
38
import org.apache.flink.formats.csv.CsvReaderFormat;
39
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
40
41
// Create CSV reader format for POJO
42
CsvReaderFormat<MyPojo> csvFormat = CsvReaderFormat.forPojo(MyPojo.class);
43
44
// Create file source
45
FileSource<MyPojo> source = FileSource
46
.forRecordStreamFormat(csvFormat, path)
47
.build();
48
49
// Use in streaming job
50
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
51
DataStream<MyPojo> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "csv-source");
52
```
53
54
### Writing CSV data with serialization schema
55
56
```java
57
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
58
import org.apache.flink.table.types.logical.RowType;
59
60
// Create serialization schema
61
CsvRowDataSerializationSchema schema = new CsvRowDataSerializationSchema.Builder(rowType)
62
.setFieldDelimiter('|')
63
.setQuoteCharacter('"')
64
.setIgnoreParseErrors(true)
65
.build();
66
67
// Serialize row data
68
byte[] csvBytes = schema.serialize(rowData);
69
```
70
71
### Batch processing with input format
72
73
```java
74
import org.apache.flink.formats.csv.RowCsvInputFormat;
75
import org.apache.flink.core.fs.Path;
76
77
// Create CSV input format
78
RowCsvInputFormat inputFormat = RowCsvInputFormat
79
.builder(typeInfo, new Path("path/to/file.csv"))
80
.setFieldDelimiter(',')
81
.setIgnoreParseErrors(true)
82
.build();
83
84
// Use with DataSet API
85
DataSet<Row> csvData = env.createInput(inputFormat);
86
```
87
88
## Architecture
89
90
The Apache Flink CSV format module is organized around several key components:
91
92
- **Reader Formats**: `CsvReaderFormat` for streaming file sources with type-safe POJO and schema-based reading
93
- **Serialization Schemas**: `CsvRowDataSerializationSchema` for converting internal row data to CSV bytes
94
- **Input Formats**: `RowCsvInputFormat` for batch processing of CSV files
95
- **Configuration Options**: `CsvFormatOptions` defining all configurable CSV dialect options
96
- **Schema Conversion**: `CsvRowSchemaConverter` for converting Flink type information to Jackson CSV schemas
97
- **Format Factories**: Internal factories for Table API/SQL integration with automatic format discovery
98
99
## Capabilities
100
101
### Stream Processing (Reader Formats)
102
103
Streaming CSV file processing with type-safe deserialization, automatic schema detection, and configurable error handling.
104
105
```java { .api }
106
public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
107
public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);
108
public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);
109
public CsvReaderFormat<T> withIgnoreParseErrors();
110
}
111
```
112
113
[Stream Processing](./stream-processing.md)
114
115
### Data Serialization
116
117
Convert Flink's internal row data structures to CSV format with extensive configuration options for different CSV dialects.
118
119
```java { .api }
120
public class CsvRowDataSerializationSchema implements SerializationSchema<RowData> {
121
public byte[] serialize(RowData element);
122
123
public static class Builder {
124
public Builder(RowType rowType);
125
public Builder setFieldDelimiter(char delimiter);
126
public Builder setQuoteCharacter(char quoteCharacter);
127
public Builder disableQuoteCharacter();
128
public Builder setEscapeCharacter(char escapeCharacter);
129
public CsvRowDataSerializationSchema build();
130
}
131
}
132
```
133
134
[Data Serialization](./serialization.md)
135
136
### Batch Processing (Input Formats)
137
138
Traditional batch processing of CSV files with DataSet API integration and configurable field selection.
139
140
```java { .api }
141
public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
142
public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);
143
144
public static class Builder {
145
public Builder setFieldDelimiter(char delimiter);
146
public Builder setQuoteCharacter(char quoteCharacter);
147
public Builder setIgnoreParseErrors(boolean ignoreParseErrors);
148
public Builder setSelectedFields(int[] selectedFields);
149
public RowCsvInputFormat build();
150
}
151
}
152
```
153
154
[Batch Processing](./batch-processing.md)
155
156
### Configuration Options
157
158
Comprehensive configuration system for handling various CSV dialects and processing requirements.
159
160
```java { .api }
161
public class CsvFormatOptions {
162
public static final ConfigOption<String> FIELD_DELIMITER;
163
public static final ConfigOption<String> QUOTE_CHARACTER;
164
public static final ConfigOption<Boolean> DISABLE_QUOTE_CHARACTER;
165
public static final ConfigOption<Boolean> ALLOW_COMMENTS;
166
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS;
167
public static final ConfigOption<String> ARRAY_ELEMENT_DELIMITER;
168
public static final ConfigOption<String> ESCAPE_CHARACTER;
169
public static final ConfigOption<String> NULL_LITERAL;
170
public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
171
}
172
```
173
174
[Configuration Options](./configuration.md)
175
176
### Schema Conversion
177
178
Utility functions for converting between Flink type information and Jackson CSV schemas.
179
180
```java { .api }
181
public class CsvRowSchemaConverter {
182
public static CsvSchema convert(RowTypeInfo rowType);
183
public static CsvSchema convert(RowType rowType);
184
}
185
```
186
187
[Schema Conversion](./schema-conversion.md)
188
189
## Types
190
191
```java { .api }
192
// Core Flink types used throughout the API
193
import org.apache.flink.table.data.RowData;
194
import org.apache.flink.table.types.logical.RowType;
195
import org.apache.flink.api.common.typeinfo.TypeInformation;
196
import org.apache.flink.api.java.typeutils.RowTypeInfo;
197
import org.apache.flink.types.Row;
198
import org.apache.flink.core.fs.Path;
199
import org.apache.flink.configuration.ConfigOption;
200
201
// Jackson CSV types for schema definition
202
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
203
204
// Serialization interfaces
205
import org.apache.flink.api.common.serialization.SerializationSchema;
206
207
// File processing interfaces
208
import org.apache.flink.connector.file.src.reader.StreamFormat;
209
import org.apache.flink.api.common.io.FileInputFormat;
210
```