or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdindex.mdschema-conversion.mdserialization.mdstream-processing.md

batch-processing.mddocs/

0

# Batch Processing

1

2

Traditional batch processing of CSV files using `RowCsvInputFormat` with DataSet API integration, configurable field selection, and comprehensive error handling for large-scale CSV data processing.

3

4

## Capabilities

5

6

### RowCsvInputFormat Class

7

8

Input format for reading CSV files into `Row` objects for batch processing scenarios.

9

10

```java { .api }

11

/**

12

* Input format that reads CSV files into Row objects for batch processing

13

* Extends AbstractCsvInputFormat with Row-specific functionality

14

*/

15

public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {

16

17

/**

18

* Create a builder for configuring the CSV input format

19

* @param typeInfo Type information for the Row structure

20

* @param filePaths Paths to CSV files to process

21

* @return Builder instance for configuration

22

*/

23

public static Builder builder(TypeInformation<Row> typeInfo, Path... filePaths);

24

25

/**

26

* Open a file input split for reading

27

* @param split The file input split to open

28

* @throws IOException if file cannot be opened

29

*/

30

public void open(FileInputSplit split) throws IOException;

31

32

/**

33

* Check if the end of the current split has been reached

34

* @return true if no more records are available

35

* @throws IOException if I/O error occurs

36

*/

37

public boolean reachedEnd() throws IOException;

38

39

/**

40

* Read the next record from the input split

41

* @param reuse Row object to reuse for the next record (can be null)

42

* @return Row containing the next record, or null if end reached

43

* @throws IOException if I/O error occurs

44

*/

45

public Row nextRecord(Row reuse) throws IOException;

46

47

/**

48

* Builder class for configuring CSV input format options

49

*/

50

public static class Builder {

51

52

/**

53

* Set the field delimiter character (default: ',')

54

* @param delimiter Character used to separate fields

55

* @return Builder instance for method chaining

56

*/

57

public Builder setFieldDelimiter(char delimiter);

58

59

/**

60

* Enable or disable comment line processing (default: false)

61

* Lines starting with '#' will be ignored when enabled

62

* @param allowComments Whether to ignore comment lines

63

* @return Builder instance for method chaining

64

*/

65

public Builder setAllowComments(boolean allowComments);

66

67

/**

68

* Set the array element delimiter for complex types (default: ';')

69

* @param delimiter String used to separate array elements

70

* @return Builder instance for method chaining

71

*/

72

public Builder setArrayElementDelimiter(String delimiter);

73

74

/**

75

* Set the quote character for field enclosure (default: '"')

76

* @param quoteCharacter Character used to quote fields with special characters

77

* @return Builder instance for method chaining

78

*/

79

public Builder setQuoteCharacter(char quoteCharacter);

80

81

/**

82

* Set the escape character for escaping special characters (no default)

83

* @param escapeCharacter Character used for escaping within quoted fields

84

* @return Builder instance for method chaining

85

*/

86

public Builder setEscapeCharacter(char escapeCharacter);

87

88

/**

89

* Set the null literal string for null value representation (no default)

90

* @param nullLiteral String that represents null values in CSV

91

* @return Builder instance for method chaining

92

*/

93

public Builder setNullLiteral(String nullLiteral);

94

95

/**

96

* Configure parse error handling (default: false)

97

* When true, malformed records are skipped instead of failing the job

98

* @param ignoreParseErrors Whether to skip malformed records

99

* @return Builder instance for method chaining

100

*/

101

public Builder setIgnoreParseErrors(boolean ignoreParseErrors);

102

103

/**

104

* Select specific fields by index for projection (optional)

105

* Only specified field indices will be read and included in output

106

* @param selectedFields Array of field indices to include (0-based)

107

* @return Builder instance for method chaining

108

*/

109

public Builder setSelectedFields(int[] selectedFields);

110

111

/**

112

* Build the configured CSV input format

113

* @return RowCsvInputFormat instance with specified configuration

114

*/

115

public RowCsvInputFormat build();

116

}

117

}

118

```

119

120

## Usage Examples

121

122

### Basic CSV Reading

123

124

```java

125

import org.apache.flink.formats.csv.RowCsvInputFormat;

126

import org.apache.flink.api.common.typeinfo.TypeInformation;

127

import org.apache.flink.api.java.typeutils.RowTypeInfo;

128

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

129

import org.apache.flink.api.java.ExecutionEnvironment;

130

import org.apache.flink.core.fs.Path;

131

132

// Define row type information

133

RowTypeInfo typeInfo = new RowTypeInfo(

134

BasicTypeInfo.STRING_TYPE_INFO, // name

135

BasicTypeInfo.INT_TYPE_INFO, // age

136

BasicTypeInfo.BOOLEAN_TYPE_INFO // active

137

);

138

139

// Create CSV input format

140

RowCsvInputFormat inputFormat = RowCsvInputFormat

141

.builder(typeInfo, new Path("employees.csv"))

142

.setFieldDelimiter(',')

143

.build();

144

145

// Use with DataSet API

146

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

147

DataSet<Row> csvData = env.createInput(inputFormat);

148

149

// Process the data

150

csvData.print();

151

```

152

153

### Custom Delimiter and Quoting

154

155

```java

156

// Configure for pipe-delimited files with custom quoting

157

RowCsvInputFormat inputFormat = RowCsvInputFormat

158

.builder(typeInfo, new Path("data.psv"))

159

.setFieldDelimiter('|')

160

.setQuoteCharacter('\'')

161

.setEscapeCharacter('\\')

162

.build();

163

164

// Handles files like: 'John|Doe'|25|'Software\\Engineer'

165

```

166

167

### Error Handling and Comments

168

169

```java

170

// Configure for robust parsing with error tolerance

171

RowCsvInputFormat inputFormat = RowCsvInputFormat

172

.builder(typeInfo, new Path("messy-data.csv"))

173

.setIgnoreParseErrors(true) // Skip malformed records

174

.setAllowComments(true) // Ignore lines starting with #

175

.setNullLiteral("NULL") // Treat "NULL" strings as null values

176

.build();

177

178

DataSet<Row> cleanData = env.createInput(inputFormat);

179

// Malformed records and comment lines will be automatically skipped

180

```

181

182

### Field Projection

183

184

```java

185

// Read only specific fields (name and age, skip active field)

186

int[] selectedFields = {0, 1}; // Include only first two fields

187

188

RowCsvInputFormat inputFormat = RowCsvInputFormat

189

.builder(

190

new RowTypeInfo(

191

BasicTypeInfo.STRING_TYPE_INFO, // name

192

BasicTypeInfo.INT_TYPE_INFO // age

193

),

194

new Path("employees.csv")

195

)

196

.setSelectedFields(selectedFields)

197

.build();

198

199

// Only name and age fields will be read, improving performance

200

DataSet<Row> projectedData = env.createInput(inputFormat);

201

```

202

203

### Multiple File Processing

204

205

```java

206

// Process multiple CSV files with the same schema

207

RowCsvInputFormat inputFormat = RowCsvInputFormat

208

.builder(

209

typeInfo,

210

new Path("2021-data.csv"),

211

new Path("2022-data.csv"),

212

new Path("2023-data.csv")

213

)

214

.setFieldDelimiter(',')

215

.build();

216

217

DataSet<Row> allData = env.createInput(inputFormat);

218

```

219

220

### Complex Type Handling

221

222

```java

223

import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;

224

225

// Handle arrays in CSV fields

226

RowTypeInfo typeInfo = new RowTypeInfo(

227

BasicTypeInfo.STRING_TYPE_INFO, // name

228

PrimitiveArrayTypeInfo.STRING_ARRAY_TYPE_INFO // tags

229

);

230

231

RowCsvInputFormat inputFormat = RowCsvInputFormat

232

.builder(typeInfo, new Path("tagged-data.csv"))

233

.setArrayElementDelimiter(";")

234

.build();

235

236

// CSV: "John Doe","java;flink;streaming"

237

// Result: Row with name="John Doe", tags=["java", "flink", "streaming"]

238

```

239

240

## Integration with DataSet Operations

241

242

### Filtering and Transformation

243

244

```java

245

DataSet<Row> csvData = env.createInput(inputFormat);

246

247

// Filter records

248

DataSet<Row> adults = csvData.filter(row -> (Integer) row.getField(1) >= 18);

249

250

// Transform records

251

DataSet<Row> transformed = csvData.map(row -> {

252

Row newRow = new Row(3);

253

newRow.setField(0, ((String) row.getField(0)).toUpperCase()); // Uppercase name

254

newRow.setField(1, row.getField(1)); // Keep age

255

newRow.setField(2, row.getField(2)); // Keep active status

256

return newRow;

257

});

258

```

259

260

### Aggregation

261

262

```java

263

// Group by active status and count

264

DataSet<Tuple2<Boolean, Long>> counts = csvData

265

.map(row -> new Tuple2<>((Boolean) row.getField(2), 1L))

266

.groupBy(0)

267

.sum(1);

268

269

// Calculate average age by active status

270

DataSet<Tuple2<Boolean, Double>> avgAge = csvData

271

.map(row -> new Tuple3<>((Boolean) row.getField(2), (Integer) row.getField(1), 1))

272

.groupBy(0)

273

.aggregate(Aggregations.SUM, 1)

274

.aggregate(Aggregations.SUM, 2)

275

.map(tuple -> new Tuple2<>(tuple.f0, (double) tuple.f1 / tuple.f2));

276

```

277

278

### Joining with Other DataSets

279

280

```java

281

// Read another CSV file

282

RowCsvInputFormat departmentFormat = RowCsvInputFormat

283

.builder(departmentTypeInfo, new Path("departments.csv"))

284

.build();

285

286

DataSet<Row> departments = env.createInput(departmentFormat);

287

288

// Join employees with departments

289

DataSet<Row> enriched = csvData

290

.join(departments)

291

.where(row -> row.getField(3)) // Employee department ID

292

.equalTo(row -> row.getField(0)) // Department ID

293

.with((emp, dept) -> {

294

Row result = new Row(5);

295

result.setField(0, emp.getField(0)); // Employee name

296

result.setField(1, emp.getField(1)); // Employee age

297

result.setField(2, emp.getField(2)); // Employee active

298

result.setField(3, dept.getField(1)); // Department name

299

result.setField(4, dept.getField(2)); // Department budget

300

return result;

301

});

302

```

303

304

## Performance Optimization

305

306

### Parallelism Configuration

307

308

```java

309

// Configure parallelism for CSV reading

310

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

311

env.setParallelism(4); // Use 4 parallel instances

312

313

// Input format will automatically split large files for parallel processing

314

DataSet<Row> csvData = env.createInput(inputFormat);

315

```

316

317

### Memory Management

318

319

```java

320

// Configure for large files with limited memory

321

Configuration config = new Configuration();

322

config.setLong("taskmanager.memory.process.size", 2048L * 1024 * 1024); // 2GB

323

324

// Use field projection to reduce memory usage

325

int[] selectedFields = {0, 1}; // Read only needed fields

326

RowCsvInputFormat inputFormat = RowCsvInputFormat

327

.builder(projectedTypeInfo, new Path("large-file.csv"))

328

.setSelectedFields(selectedFields)

329

.build();

330

```

331

332

### File Splitting

333

334

Large CSV files are automatically split by Flink for parallel processing:

335

336

- **Automatic splitting**: Files larger than the configured split size are divided

337

- **Balanced distribution**: Splits are distributed evenly across available task slots

338

- **Header handling**: First split includes header, subsequent splits skip headers

339

- **Record boundary**: Splits occur at record boundaries to maintain data integrity

340

341

## Error Handling and Monitoring

342

343

### Parse Error Recovery

344

345

```java

346

// Configure comprehensive error handling

347

RowCsvInputFormat inputFormat = RowCsvInputFormat

348

.builder(typeInfo, new Path("unreliable-data.csv"))

349

.setIgnoreParseErrors(true) // Skip malformed records

350

.setAllowComments(true) // Skip comment lines

351

.setNullLiteral("N/A") // Handle various null representations

352

.build();

353

354

// Monitor processing with counters

355

DataSet<Row> processed = env.createInput(inputFormat)

356

.map(new RichMapFunction<Row, Row>() {

357

private Counter recordCounter;

358

private Counter errorCounter;

359

360

@Override

361

public void open(Configuration parameters) {

362

recordCounter = getRuntimeContext().getCounter("records-processed");

363

errorCounter = getRuntimeContext().getCounter("parse-errors");

364

}

365

366

@Override

367

public Row map(Row row) throws Exception {

368

recordCounter.add(1);

369

370

// Validate record and count errors

371

if (row.getField(0) == null) {

372

errorCounter.add(1);

373

}

374

375

return row;

376

}

377

});

378

```

379

380

### Data Quality Validation

381

382

```java

383

// Add data quality checks

384

DataSet<Row> validated = csvData

385

.filter(new FilterFunction<Row>() {

386

@Override

387

public boolean filter(Row row) throws Exception {

388

// Validate required fields

389

if (row.getField(0) == null || ((String) row.getField(0)).trim().isEmpty()) {

390

return false; // Skip records with empty names

391

}

392

393

// Validate age range

394

Integer age = (Integer) row.getField(1);

395

if (age == null || age < 0 || age > 150) {

396

return false; // Skip records with invalid ages

397

}

398

399

return true;

400

}

401

});

402

```

403

404

## File Format Support

405

406

The `RowCsvInputFormat` supports various CSV dialects and formats:

407

408

- **Standard CSV**: RFC 4180 compliant CSV files

409

- **Tab-separated**: Using tab delimiter with `.setFieldDelimiter('\t')`

410

- **Pipe-delimited**: Common in data warehousing with `.setFieldDelimiter('|')`

411

- **Custom delimiters**: Any single character delimiter

412

- **Quoted fields**: Proper handling of quoted fields with embedded delimiters

413

- **Escaped content**: Support for escape characters within quoted fields

414

- **Comment lines**: Optional skipping of comment lines beginning with #