or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

bulk-formats.mddocs/

0

# Bulk Formats

1

2

Bulk formats provide batch-oriented reading interfaces optimized for columnar formats like ORC, Parquet, and other high-performance file formats.

3

4

## Capabilities

5

6

### BulkFormat Interface

7

8

Core interface for implementing batch-oriented file reading with optimized performance.

9

10

```java { .api }

11

/**

12

* The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats

13

* are formats like ORC or Parquet.

14

*

15

* The outer 'BulkFormat' class acts mainly as a configuration holder and factory for the

16

* reader. The actual reading is done by the Reader, which is created in the

17

* createReader method. If a bulk reader is created based on a checkpoint during checkpointed

18

* streaming execution, then the reader is re-created in the restoreReader method.

19

*/

20

@PublicEvolving

21

public interface BulkFormat<T, SplitT extends FileSourceSplit>

22

extends Serializable, ResultTypeQueryable<T> {

23

24

/**

25

* Creates a new reader that reads from the split's path starting

26

* at the split's offset and reads length bytes after the offset.

27

*/

28

BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;

29

30

/**

31

* Creates a new reader that reads from split.path() starting at offset and

32

* reads until length bytes after the offset. A number of recordsToSkip records

33

* should be read and discarded after the offset. This is typically part of restoring a reader

34

* to a checkpointed position.

35

*/

36

BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;

37

38

/**

39

* Checks whether this format is splittable. Splittable formats allow Flink to create multiple

40

* splits per file, so that Flink can read multiple regions of the file concurrently.

41

*/

42

boolean isSplittable();

43

44

/**

45

* Gets the type produced by this format. This type will be the type produced by the file source

46

* as a whole.

47

*/

48

@Override

49

TypeInformation<T> getProducedType();

50

}

51

```

52

53

### BulkFormat.Reader Interface

54

55

Nested interface for reading batches of records with efficient iteration.

56

57

```java { .api }

58

/**

59

* The actual reader that reads the batches of records.

60

*/

61

interface Reader<T> extends Closeable {

62

63

/**

64

* Reads one batch. The method should return null when reaching the end of the input. The

65

* returned batch will be handed over to the processing threads as one.

66

*

67

* The returned iterator object and any contained objects may be held onto by the file

68

* source for some time, so it should not be immediately reused by the reader.

69

*

70

* To implement reuse and to save object allocation, consider using a Pool and recycle objects

71

* into the Pool in the the RecordIterator.releaseBatch() method.

72

*/

73

@Nullable

74

RecordIterator<T> readBatch() throws IOException;

75

76

/**

77

* Closes the reader and should release all resources.

78

*/

79

@Override

80

void close() throws IOException;

81

}

82

```

83

84

### BulkFormat.RecordIterator Interface

85

86

Iterator interface for efficiently processing batches of records.

87

88

```java { .api }

89

/**

90

* An iterator over records with their position in the file. The iterator is closeable to

91

* support clean resource release and recycling.

92

*

93

* @param <T> The type of the record.

94

*/

95

interface RecordIterator<T> {

96

97

/**

98

* Gets the next record from the file, together with its position.

99

*

100

* The position information returned with the record point to the record AFTER the

101

* returned record, because it defines the point where the reading should resume once the

102

* current record is emitted. The position information is put in the source's state when the

103

* record is emitted.

104

*

105

* Objects returned by this method may be reused by the iterator. By the time that this

106

* method is called again, no object returned from the previous call will be referenced any

107

* more. That makes it possible to have a single MutableRecordAndPosition object and

108

* return the same instance (with updated record and position) on every call.

109

*/

110

@Nullable

111

RecordAndPosition<T> next();

112

113

/**

114

* Releases the batch that this iterator iterated over. This is not supposed to close the

115

* reader and its resources, but is simply a signal that this iterator is no used any more.

116

* This method can be used as a hook to recycle/reuse heavyweight object structures.

117

*/

118

void releaseBatch();

119

}

120

```

121

122

**Usage Examples:**

123

124

```java

125

import org.apache.flink.connector.file.src.FileSource;

126

import org.apache.flink.connector.file.src.reader.BulkFormat;

127

import org.apache.flink.core.fs.Path;

128

129

// Example usage with a hypothetical Parquet bulk format

130

BulkFormat<RowData, FileSourceSplit> parquetFormat = ParquetBulkFormat.builder()

131

.setSchema(schema)

132

.setProjection(projection)

133

.build();

134

135

FileSource<RowData> parquetSource = FileSource

136

.forBulkFileFormat(parquetFormat, new Path("/data/parquet"))

137

.build();

138

139

// Use in DataStream API

140

DataStream<RowData> rows = env.fromSource(parquetSource, WatermarkStrategy.noWatermarks(), "parquet-source");

141

```

142

143

### Utility Record Iterators

144

145

Built-in implementations for common record iteration patterns.

146

147

```java { .api }

148

/**

149

* Record iterator wrapping an array of records

150

*/

151

public class ArrayResultIterator<T> implements BulkFormat.RecordIterator<T> {

152

/**

153

* Creates iterator for array of records

154

* @param records Array of records to iterate over

155

*/

156

public ArrayResultIterator(T[] records);

157

158

public T next();

159

public boolean hasNext();

160

public void releaseBatch();

161

}

162

163

/**

164

* Record iterator wrapping another iterator

165

*/

166

public class IteratorResultIterator<T> implements BulkFormat.RecordIterator<T> {

167

/**

168

* Creates iterator wrapping another iterator

169

* @param iterator Iterator to wrap

170

*/

171

public IteratorResultIterator(Iterator<T> iterator);

172

173

public T next();

174

public boolean hasNext();

175

public void releaseBatch();

176

}

177

178

/**

179

* Record iterator for single records

180

*/

181

public class SingletonResultIterator<T> implements BulkFormat.RecordIterator<T> {

182

/**

183

* Creates iterator for single record

184

* @param record Single record to return

185

*/

186

public SingletonResultIterator(T record);

187

188

public T next();

189

public boolean hasNext();

190

public void releaseBatch();

191

}

192

```

193

194

### Custom BulkFormat Implementation

195

196

Example of implementing a custom bulk format for efficient batch processing.

197

198

```java { .api }

199

/**

200

* Example custom bulk format for reading JSON records in batches

201

*/

202

public class JsonBulkFormat implements BulkFormat<JsonNode, FileSourceSplit> {

203

private final int batchSize;

204

private final ObjectMapper mapper;

205

206

public JsonBulkFormat(int batchSize) {

207

this.batchSize = batchSize;

208

this.mapper = new ObjectMapper();

209

}

210

211

@Override

212

public Reader<JsonNode> createReader(Configuration config, FileSourceSplit split)

213

throws IOException {

214

FSDataInputStream stream = FileSystem.get(split.path().toUri())

215

.open(split.path(), 4096);

216

stream.seek(split.offset());

217

return new JsonBulkReader(stream, split.length(), batchSize, mapper);

218

}

219

220

@Override

221

public Reader<JsonNode> restoreReader(Configuration config, FileSourceSplit split)

222

throws IOException {

223

// For simplicity, restart from beginning of split

224

return createReader(config, split);

225

}

226

227

@Override

228

public TypeInformation<JsonNode> getProducedType() {

229

return TypeInformation.of(JsonNode.class);

230

}

231

232

private static class JsonBulkReader implements BulkFormat.Reader<JsonNode> {

233

private final BufferedReader reader;

234

private final long splitLength;

235

private final int batchSize;

236

private final ObjectMapper mapper;

237

private long bytesRead = 0;

238

239

public JsonBulkReader(FSDataInputStream stream, long splitLength,

240

int batchSize, ObjectMapper mapper) {

241

this.reader = new BufferedReader(new InputStreamReader(stream));

242

this.splitLength = splitLength;

243

this.batchSize = batchSize;

244

this.mapper = mapper;

245

}

246

247

@Override

248

public BulkFormat.RecordIterator<JsonNode> readBatch() throws IOException {

249

if (bytesRead >= splitLength) {

250

return null;

251

}

252

253

List<JsonNode> batch = new ArrayList<>(batchSize);

254

String line;

255

int count = 0;

256

257

while (count < batchSize && (line = reader.readLine()) != null) {

258

if (bytesRead >= splitLength) break;

259

260

bytesRead += line.getBytes().length + 1;

261

JsonNode node = mapper.readTree(line);

262

batch.add(node);

263

count++;

264

}

265

266

return batch.isEmpty() ? null : new ArrayResultIterator<>(batch.toArray(new JsonNode[0]));

267

}

268

}

269

}

270

```

271

272

### Integration with Table API

273

274

Bulk formats can be integrated with Flink's Table API for structured data processing.

275

276

```java { .api }

277

/**

278

* Adapter for using bulk formats with file info extraction

279

*/

280

public class FileInfoExtractorBulkFormat<T> implements BulkFormat<RowData, FileSourceSplit> {

281

/**

282

* Creates bulk format that extracts file metadata along with records

283

* @param wrappedFormat The underlying bulk format

284

* @param metadataColumns File metadata columns to extract

285

*/

286

public FileInfoExtractorBulkFormat(

287

BulkFormat<T, FileSourceSplit> wrappedFormat,

288

String[] metadataColumns);

289

290

@Override

291

public Reader<RowData> createReader(Configuration config, FileSourceSplit split)

292

throws IOException;

293

294

@Override

295

public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)

296

throws IOException;

297

}

298

299

/**

300

* Bulk format with column projection support

301

*/

302

public class ProjectingBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {

303

/**

304

* Creates bulk format with column projection

305

* @param wrappedFormat The underlying bulk format

306

* @param projectedFields Fields to include in output

307

*/

308

public ProjectingBulkFormat(

309

BulkFormat<T, FileSourceSplit> wrappedFormat,

310

int[] projectedFields);

311

}

312

313

/**

314

* Bulk format with record limit support

315

*/

316

public class LimitableBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {

317

/**

318

* Creates bulk format with record limit

319

* @param wrappedFormat The underlying bulk format

320

* @param limit Maximum number of records to read

321

*/

322

public LimitableBulkFormat(BulkFormat<T, FileSourceSplit> wrappedFormat, long limit);

323

}

324

```

325

326

**Advanced Usage Examples:**

327

328

```java

329

// Bulk format with projection for columnar formats

330

int[] projectedColumns = {0, 2, 4}; // Only read columns 0, 2, and 4

331

BulkFormat<RowData, FileSourceSplit> projectedFormat = new ProjectingBulkFormat<>(

332

originalFormat, projectedColumns);

333

334

// Bulk format with file metadata extraction

335

String[] metadataColumns = {"file.path", "file.size", "file.modification-time"};

336

BulkFormat<RowData, FileSourceSplit> metadataFormat = new FileInfoExtractorBulkFormat<>(

337

originalFormat, metadataColumns);

338

339

// Limited bulk format for sampling

340

BulkFormat<RowData, FileSourceSplit> limitedFormat = new LimitableBulkFormat<>(

341

originalFormat, 1000); // Only read first 1000 records

342

343

FileSource<RowData> advancedSource = FileSource

344

.forBulkFileFormat(limitedFormat, new Path("/data/samples"))

345

.build();

346

```

347

348

## Error Handling

349

350

Bulk formats handle various error conditions during batch reading:

351

352

- **IOException**: File system read errors, corrupted file structures

353

- **RuntimeException**: Format-specific parsing errors, schema mismatches

354

- **OutOfMemoryError**: Batch sizes too large for available memory

355

356

```java

357

try {

358

BulkFormat.Reader<JsonNode> reader = format.createReader(config, split);

359

BulkFormat.RecordIterator<JsonNode> batch;

360

361

while ((batch = reader.readBatch()) != null) {

362

try {

363

while (batch.hasNext()) {

364

JsonNode record = batch.next();

365

// Process record

366

}

367

} finally {

368

batch.releaseBatch(); // Always release batch resources

369

}

370

}

371

} catch (IOException e) {

372

// Handle read errors

373

} catch (OutOfMemoryError e) {

374

// Handle memory issues - consider reducing batch size

375

}

376

```

377

378

## Performance Considerations

379

380

- Choose appropriate batch sizes to balance memory usage and I/O efficiency

381

- Always call `releaseBatch()` to prevent memory leaks

382

- Use column projection to reduce data transfer and processing overhead

383

- Consider file format characteristics (row-oriented vs. columnar) when choosing batch sizes

384

- Bulk formats are typically more efficient than stream formats for high-throughput scenarios

385

- Implement proper resource cleanup in custom bulk format implementations

386

- Monitor memory usage and adjust batch sizes based on record size and available heap space