0
# Batch Processing
1
2
Traditional batch processing of CSV files using `RowCsvInputFormat` with DataSet API integration, configurable field selection, and comprehensive error handling for large-scale CSV data processing.
3
4
## Capabilities
5
6
### RowCsvInputFormat Class
7
8
Input format for reading CSV files into `Row` objects for batch processing scenarios.
9
10
```java { .api }
11
/**
12
* Input format that reads CSV files into Row objects for batch processing
13
* Extends AbstractCsvInputFormat with Row-specific functionality
14
*/
15
public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
16
17
/**
18
* Create a builder for configuring the CSV input format
19
* @param typeInfo Type information for the Row structure
20
* @param filePaths Paths to CSV files to process
21
* @return Builder instance for configuration
22
*/
23
public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);
24
25
/**
26
* Open a file input split for reading
27
* @param split The file input split to open
28
* @throws IOException if file cannot be opened
29
*/
30
public void open(FileInputSplit split) throws IOException;
31
32
/**
33
* Check if the end of the current split has been reached
34
* @return true if no more records are available
35
* @throws IOException if I/O error occurs
36
*/
37
public boolean reachedEnd() throws IOException;
38
39
/**
40
* Read the next record from the input split
41
* @param reuse Row object to reuse for the next record (can be null)
42
* @return Row containing the next record, or null if end reached
43
* @throws IOException if I/O error occurs
44
*/
45
public Row nextRecord(Row reuse) throws IOException;
46
47
/**
48
* Builder class for configuring CSV input format options
49
*/
50
public static class Builder {
51
52
/**
53
* Set the field delimiter character (default: ',')
54
* @param delimiter Character used to separate fields
55
* @return Builder instance for method chaining
56
*/
57
public Builder setFieldDelimiter(char delimiter);
58
59
/**
60
* Enable or disable comment line processing (default: false)
61
* Lines starting with '#' will be ignored when enabled
62
* @param allowComments Whether to ignore comment lines
63
* @return Builder instance for method chaining
64
*/
65
public Builder setAllowComments(boolean allowComments);
66
67
/**
68
* Set the array element delimiter for complex types (default: ';')
69
* @param delimiter String used to separate array elements
70
* @return Builder instance for method chaining
71
*/
72
public Builder setArrayElementDelimiter(String delimiter);
73
74
/**
75
* Set the quote character for field enclosure (default: '"')
76
* @param quoteCharacter Character used to quote fields with special characters
77
* @return Builder instance for method chaining
78
*/
79
public Builder setQuoteCharacter(char quoteCharacter);
80
81
/**
82
* Set the escape character for escaping special characters (no default)
83
* @param escapeCharacter Character used for escaping within quoted fields
84
* @return Builder instance for method chaining
85
*/
86
public Builder setEscapeCharacter(char escapeCharacter);
87
88
/**
89
* Set the null literal string for null value representation (no default)
90
* @param nullLiteral String that represents null values in CSV
91
* @return Builder instance for method chaining
92
*/
93
public Builder setNullLiteral(String nullLiteral);
94
95
/**
96
* Configure parse error handling (default: false)
97
* When true, malformed records are skipped instead of failing the job
98
* @param ignoreParseErrors Whether to skip malformed records
99
* @return Builder instance for method chaining
100
*/
101
public Builder setIgnoreParseErrors(boolean ignoreParseErrors);
102
103
/**
104
* Select specific fields by index for projection (optional)
105
* Only specified field indices will be read and included in output
106
* @param selectedFields Array of field indices to include (0-based)
107
* @return Builder instance for method chaining
108
*/
109
public Builder setSelectedFields(int[] selectedFields);
110
111
/**
112
* Build the configured CSV input format
113
* @return RowCsvInputFormat instance with specified configuration
114
*/
115
public RowCsvInputFormat build();
116
}
117
}
118
```
119
120
## Usage Examples
121
122
### Basic CSV Reading
123
124
```java
125
import org.apache.flink.formats.csv.RowCsvInputFormat;
126
import org.apache.flink.api.common.typeinfo.TypeInformation;
127
import org.apache.flink.api.java.typeutils.RowTypeInfo;
128
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
129
import org.apache.flink.api.java.ExecutionEnvironment;
130
import org.apache.flink.core.fs.Path;
131
132
// Define row type information
133
RowTypeInfo typeInfo = new RowTypeInfo(
134
BasicTypeInfo.STRING_TYPE_INFO, // name
135
BasicTypeInfo.INT_TYPE_INFO, // age
136
BasicTypeInfo.BOOLEAN_TYPE_INFO // active
137
);
138
139
// Create CSV input format
140
RowCsvInputFormat inputFormat = RowCsvInputFormat
141
.builder(typeInfo, new Path("employees.csv"))
142
.setFieldDelimiter(',')
143
.build();
144
145
// Use with DataSet API
146
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
147
DataSet<Row> csvData = env.createInput(inputFormat);
148
149
// Process the data
150
csvData.print();
151
```
152
153
### Custom Delimiter and Quoting
154
155
```java
156
// Configure for pipe-delimited files with custom quoting
157
RowCsvInputFormat inputFormat = RowCsvInputFormat
158
.builder(typeInfo, new Path("data.psv"))
159
.setFieldDelimiter('|')
160
.setQuoteCharacter('\'')
161
.setEscapeCharacter('\\')
162
.build();
163
164
// Handles files like: 'John|Doe'|25|'Software\\Engineer'
165
```
166
167
### Error Handling and Comments
168
169
```java
170
// Configure for robust parsing with error tolerance
171
RowCsvInputFormat inputFormat = RowCsvInputFormat
172
.builder(typeInfo, new Path("messy-data.csv"))
173
.setIgnoreParseErrors(true) // Skip malformed records
174
.setAllowComments(true) // Ignore lines starting with #
175
.setNullLiteral("NULL") // Treat "NULL" strings as null values
176
.build();
177
178
DataSet<Row> cleanData = env.createInput(inputFormat);
179
// Malformed records and comment lines will be automatically skipped
180
```
181
182
### Field Projection
183
184
```java
185
// Read only specific fields (name and age, skip active field)
186
int[] selectedFields = {0, 1}; // Include only first two fields
187
188
RowCsvInputFormat inputFormat = RowCsvInputFormat
189
.builder(
190
new RowTypeInfo(
191
BasicTypeInfo.STRING_TYPE_INFO, // name
192
BasicTypeInfo.INT_TYPE_INFO // age
193
),
194
new Path("employees.csv")
195
)
196
.setSelectedFields(selectedFields)
197
.build();
198
199
// Only name and age fields will be read, improving performance
200
DataSet<Row> projectedData = env.createInput(inputFormat);
201
```
202
203
### Multiple File Processing
204
205
```java
206
// Process multiple CSV files with the same schema
207
RowCsvInputFormat inputFormat = RowCsvInputFormat
208
.builder(
209
typeInfo,
210
new Path("2021-data.csv"),
211
new Path("2022-data.csv"),
212
new Path("2023-data.csv")
213
)
214
.setFieldDelimiter(',')
215
.build();
216
217
DataSet<Row> allData = env.createInput(inputFormat);
218
```
219
220
### Complex Type Handling
221
222
```java
223
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
224
225
// Handle arrays in CSV fields
226
RowTypeInfo typeInfo = new RowTypeInfo(
227
BasicTypeInfo.STRING_TYPE_INFO, // name
228
PrimitiveArrayTypeInfo.STRING_ARRAY_TYPE_INFO // tags
229
);
230
231
RowCsvInputFormat inputFormat = RowCsvInputFormat
232
.builder(typeInfo, new Path("tagged-data.csv"))
233
.setArrayElementDelimiter(";")
234
.build();
235
236
// CSV: "John Doe","java;flink;streaming"
237
// Result: Row with name="John Doe", tags=["java", "flink", "streaming"]
238
```
239
240
## Integration with DataSet Operations
241
242
### Filtering and Transformation
243
244
```java
245
DataSet<Row> csvData = env.createInput(inputFormat);
246
247
// Filter records
248
DataSet<Row> adults = csvData.filter(row -> (Integer) row.getField(1) >= 18);
249
250
// Transform records
251
DataSet<Row> transformed = csvData.map(row -> {
252
Row newRow = new Row(3);
253
newRow.setField(0, ((String) row.getField(0)).toUpperCase()); // Uppercase name
254
newRow.setField(1, row.getField(1)); // Keep age
255
newRow.setField(2, row.getField(2)); // Keep active status
256
return newRow;
257
});
258
```
259
260
### Aggregation
261
262
```java
263
// Group by active status and count
264
DataSet<Tuple2<Boolean, Long>> counts = csvData
265
.map(row -> new Tuple2<>((Boolean) row.getField(2), 1L))
266
.groupBy(0)
267
.sum(1);
268
269
// Calculate average age by active status
270
DataSet<Tuple2<Boolean, Double>> avgAge = csvData
271
.map(row -> new Tuple3<>((Boolean) row.getField(2), (Integer) row.getField(1), 1))
272
.groupBy(0)
273
.aggregate(Aggregations.SUM, 1)
274
.aggregate(Aggregations.SUM, 2)
275
.map(tuple -> new Tuple2<>(tuple.f0, (double) tuple.f1 / tuple.f2));
276
```
277
278
### Joining with Other DataSets
279
280
```java
281
// Read another CSV file
282
RowCsvInputFormat departmentFormat = RowCsvInputFormat
283
.builder(departmentTypeInfo, new Path("departments.csv"))
284
.build();
285
286
DataSet<Row> departments = env.createInput(departmentFormat);
287
288
// Join employees with departments
289
DataSet<Row> enriched = csvData
290
.join(departments)
291
.where(row -> row.getField(3)) // Employee department ID
292
.equalTo(row -> row.getField(0)) // Department ID
293
.with((emp, dept) -> {
294
Row result = new Row(5);
295
result.setField(0, emp.getField(0)); // Employee name
296
result.setField(1, emp.getField(1)); // Employee age
297
result.setField(2, emp.getField(2)); // Employee active
298
result.setField(3, dept.getField(1)); // Department name
299
result.setField(4, dept.getField(2)); // Department budget
300
return result;
301
});
302
```
303
304
## Performance Optimization
305
306
### Parallelism Configuration
307
308
```java
309
// Configure parallelism for CSV reading
310
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
311
env.setParallelism(4); // Use 4 parallel instances
312
313
// Input format will automatically split large files for parallel processing
314
DataSet<Row> csvData = env.createInput(inputFormat);
315
```
316
317
### Memory Management
318
319
```java
320
// Configure for large files with limited memory
321
Configuration config = new Configuration();
322
config.setLong("taskmanager.memory.process.size", 2048L * 1024 * 1024); // 2GB
323
324
// Use field projection to reduce memory usage
325
int[] selectedFields = {0, 1}; // Read only needed fields
326
RowCsvInputFormat inputFormat = RowCsvInputFormat
327
.builder(projectedTypeInfo, new Path("large-file.csv"))
328
.setSelectedFields(selectedFields)
329
.build();
330
```
331
332
### File Splitting
333
334
Large CSV files are automatically split by Flink for parallel processing:
335
336
- **Automatic splitting**: Files larger than the configured split size are divided
337
- **Balanced distribution**: Splits are distributed evenly across available task slots
338
- **Header handling**: First split includes header, subsequent splits skip headers
339
- **Record boundary**: Splits occur at record boundaries to maintain data integrity
340
341
## Error Handling and Monitoring
342
343
### Parse Error Recovery
344
345
```java
346
// Configure comprehensive error handling
347
RowCsvInputFormat inputFormat = RowCsvInputFormat
348
.builder(typeInfo, new Path("unreliable-data.csv"))
349
.setIgnoreParseErrors(true) // Skip malformed records
350
.setAllowComments(true) // Skip comment lines
351
.setNullLiteral("N/A") // Handle various null representations
352
.build();
353
354
// Monitor processing with counters
355
DataSet<Row> processed = env.createInput(inputFormat)
356
.map(new RichMapFunction<Row, Row>() {
357
private Counter recordCounter;
358
private Counter errorCounter;
359
360
@Override
361
public void open(Configuration parameters) {
362
recordCounter = getRuntimeContext().getCounter("records-processed");
363
errorCounter = getRuntimeContext().getCounter("parse-errors");
364
}
365
366
@Override
367
public Row map(Row row) throws Exception {
368
recordCounter.add(1);
369
370
// Validate record and count errors
371
if (row.getField(0) == null) {
372
errorCounter.add(1);
373
}
374
375
return row;
376
}
377
});
378
```
379
380
### Data Quality Validation
381
382
```java
383
// Add data quality checks
384
DataSet<Row> validated = csvData
385
.filter(new FilterFunction<Row>() {
386
@Override
387
public boolean filter(Row row) throws Exception {
388
// Validate required fields
389
if (row.getField(0) == null || ((String) row.getField(0)).trim().isEmpty()) {
390
return false; // Skip records with empty names
391
}
392
393
// Validate age range
394
Integer age = (Integer) row.getField(1);
395
if (age == null || age < 0 || age > 150) {
396
return false; // Skip records with invalid ages
397
}
398
399
return true;
400
}
401
});
402
```
403
404
## File Format Support
405
406
The `RowCsvInputFormat` supports various CSV dialects and formats:
407
408
- **Standard CSV**: RFC 4180 compliant CSV files
409
- **Tab-separated**: Using tab delimiter with `.setFieldDelimiter('\t')`
410
- **Pipe-delimited**: Common in data warehousing with `.setFieldDelimiter('|')`
411
- **Custom delimiters**: Any single character delimiter
412
- **Quoted fields**: Proper handling of quoted fields with embedded delimiters
413
- **Escaped content**: Support for escape characters within quoted fields
414
- **Comment lines**: Optional skipping of comment lines beginning with #