0
# Stream Processing
1
2
Streaming CSV file processing with the `CsvReaderFormat` class, providing type-safe deserialization, automatic schema detection, and configurable error handling for DataStream API integration.
3
4
## Capabilities
5
6
### CsvReaderFormat Class
7
8
Main class for reading CSV files in streaming applications, extending Flink's `SimpleStreamFormat`.
9
10
```java { .api }
11
/**
12
* StreamFormat for reading CSV files with type safety and error handling
13
* @param <T> The type of objects to deserialize CSV records into
14
*/
15
public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
16
17
/**
18
* Create a CSV reader format for POJO classes with automatic schema detection
19
* @param pojoType The POJO class to deserialize into
20
* @return CsvReaderFormat instance configured for the POJO type
21
*/
22
public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType);
23
24
/**
25
* Create a CSV reader format with predefined schema and type information
26
* @param schema Jackson CsvSchema defining the CSV structure
27
* @param typeInformation Flink TypeInformation for the target type
28
* @return CsvReaderFormat instance configured with the provided schema
29
*/
30
public static <T> CsvReaderFormat<T> forSchema(CsvSchema schema, TypeInformation<T> typeInformation);
31
32
/**
33
* Create a CSV reader format with custom mapper factory and schema generator
34
* @param mapperFactory Supplier for creating Jackson CsvMapper instances
35
* @param schemaGenerator Function to generate CsvSchema from CsvMapper
36
* @param typeInformation Flink TypeInformation for the target type
37
* @return CsvReaderFormat instance with custom configuration
38
*/
39
public static <T> CsvReaderFormat<T> forSchema(
40
SerializableSupplier<CsvMapper> mapperFactory,
41
SerializableFunction<CsvMapper, CsvSchema> schemaGenerator,
42
TypeInformation<T> typeInformation
43
);
44
45
/**
46
* Configure the reader to ignore parsing errors and skip malformed records
47
* @return New CsvReaderFormat instance with error ignoring enabled
48
*/
49
public CsvReaderFormat<T> withIgnoreParseErrors();
50
51
/**
52
* Create a reader instance for processing a specific input stream
53
* @param config Flink configuration
54
* @param stream Input stream to read from
55
* @return StreamFormat.Reader instance for reading records
56
*/
57
public StreamFormat.Reader<T> createReader(Configuration config, FSDataInputStream stream);
58
59
/**
60
* Get the type information for objects produced by this format
61
* @return TypeInformation describing the output type
62
*/
63
public TypeInformation<T> getProducedType();
64
}
65
```
66
67
## Usage Examples
68
69
### Basic POJO Reading
70
71
```java
72
import org.apache.flink.formats.csv.CsvReaderFormat;
73
import org.apache.flink.connector.file.src.FileSource;
74
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
75
76
// Define POJO class
77
public class Person {
78
public String name;
79
public int age;
80
public boolean active;
81
82
public Person() {} // Default constructor required
83
}
84
85
// Create CSV reader format
86
CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forPojo(Person.class);
87
88
// Create file source
89
FileSource<Person> source = FileSource
90
.forRecordStreamFormat(csvFormat, new Path("persons.csv"))
91
.build();
92
93
// Use in streaming job
94
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
95
DataStream<Person> persons = env.fromSource(
96
source,
97
WatermarkStrategy.noWatermarks(),
98
"csv-source"
99
);
100
101
persons.print();
102
env.execute("CSV Reading Job");
103
```
104
105
### Schema-Based Reading
106
107
```java
108
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
109
import org.apache.flink.api.common.typeinfo.TypeInformation;
110
111
// Define custom schema
112
CsvSchema schema = CsvSchema.builder()
113
.addColumn("name", CsvSchema.ColumnType.STRING)
114
.addColumn("age", CsvSchema.ColumnType.NUMBER)
115
.addColumn("salary", CsvSchema.ColumnType.NUMBER)
116
.setUseHeader(true)
117
.setColumnSeparator('|')
118
.build();
119
120
// Create reader format with schema
121
CsvReaderFormat<Row> csvFormat = CsvReaderFormat.forSchema(
122
schema,
123
TypeInformation.of(Row.class)
124
);
125
126
// Use with file source
127
FileSource<Row> source = FileSource
128
.forRecordStreamFormat(csvFormat, new Path("employees.csv"))
129
.build();
130
```
131
132
### Error Handling
133
134
```java
135
// Create reader format that ignores parsing errors
136
CsvReaderFormat<Person> csvFormat = CsvReaderFormat
137
.forPojo(Person.class)
138
.withIgnoreParseErrors();
139
140
// Malformed records will be skipped instead of causing job failure
141
FileSource<Person> source = FileSource
142
.forRecordStreamFormat(csvFormat, new Path("dirty-data.csv"))
143
.build();
144
145
DataStream<Person> cleanData = env.fromSource(
146
source,
147
WatermarkStrategy.noWatermarks(),
148
"fault-tolerant-csv-source"
149
);
150
```
151
152
### Custom Mapper Configuration
153
154
```java
155
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
156
import com.fasterxml.jackson.databind.DeserializationFeature;
157
158
// Create custom mapper factory
159
SerializableSupplier<CsvMapper> mapperFactory = () -> {
160
CsvMapper mapper = new CsvMapper();
161
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
162
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
163
return mapper;
164
};
165
166
// Create schema generator
167
SerializableFunction<CsvMapper, CsvSchema> schemaGenerator = mapper ->
168
mapper.schemaFor(Person.class).withHeader();
169
170
// Create reader format with custom configuration
171
CsvReaderFormat<Person> csvFormat = CsvReaderFormat.forSchema(
172
mapperFactory,
173
schemaGenerator,
174
TypeInformation.of(Person.class)
175
);
176
```
177
178
## Integration with File Sources
179
180
### Monitoring Directory
181
182
```java
183
import org.apache.flink.connector.file.src.enumerate.ContinuousFileEnumerator;
184
185
// Monitor directory for new CSV files
186
FileSource<Person> source = FileSource
187
.forRecordStreamFormat(csvFormat, new Path("input-directory/"))
188
.monitorContinuously(Duration.ofSeconds(10))
189
.build();
190
191
DataStream<Person> stream = env.fromSource(
192
source,
193
WatermarkStrategy.noWatermarks(),
194
"monitoring-csv-source"
195
);
196
```
197
198
### Processing Multiple Files
199
200
```java
201
// Process multiple CSV files with same format
202
FileSource<Person> source = FileSource
203
.forRecordStreamFormat(
204
csvFormat,
205
new Path("file1.csv"),
206
new Path("file2.csv"),
207
new Path("file3.csv")
208
)
209
.build();
210
```
211
212
## Type Safety and Validation
213
214
The `CsvReaderFormat` provides compile-time type safety through generic type parameters and runtime validation through Jackson's deserialization:
215
216
- **Compile-time safety**: Generic type parameter `<T>` ensures type consistency
217
- **Runtime validation**: Jackson validates field types and structure during deserialization
218
- **Error recovery**: `withIgnoreParseErrors()` allows graceful handling of malformed records
219
- **Schema validation**: Custom schemas can enforce strict field requirements
220
221
## Performance Considerations
222
223
- **Schema reuse**: Create `CsvReaderFormat` instances once and reuse them across multiple sources
224
- **POJO optimization**: Simple POJOs with public fields provide better performance than complex beans
225
- **Error handling**: Enable `withIgnoreParseErrors()` for better throughput when data quality issues are expected
226
- **File splitting**: Large CSV files are automatically split for parallel processing by Flink's file source