0
# Bulk Formats
1
2
Bulk formats provide batch-oriented reading interfaces optimized for columnar formats like ORC, Parquet, and other high-performance file formats.
3
4
## Capabilities
5
6
### BulkFormat Interface
7
8
Core interface for implementing batch-oriented file reading with optimized performance.
9
10
```java { .api }
11
/**
12
* The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
13
* are formats like ORC or Parquet.
14
*
15
* The outer 'BulkFormat' class acts mainly as a configuration holder and factory for the
16
* reader. The actual reading is done by the Reader, which is created in the
17
* createReader method. If a bulk reader is created based on a checkpoint during checkpointed
18
* streaming execution, then the reader is re-created in the restoreReader method.
19
*/
20
@PublicEvolving
21
public interface BulkFormat<T, SplitT extends FileSourceSplit>
22
extends Serializable, ResultTypeQueryable<T> {
23
24
/**
25
* Creates a new reader that reads from the split's path starting
26
* at the split's offset and reads length bytes after the offset.
27
*/
28
BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;
29
30
/**
31
* Creates a new reader that reads from split.path() starting at offset and
32
* reads until length bytes after the offset. A number of recordsToSkip records
33
* should be read and discarded after the offset. This is typically part of restoring a reader
34
* to a checkpointed position.
35
*/
36
BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;
37
38
/**
39
* Checks whether this format is splittable. Splittable formats allow Flink to create multiple
40
* splits per file, so that Flink can read multiple regions of the file concurrently.
41
*/
42
boolean isSplittable();
43
44
/**
45
* Gets the type produced by this format. This type will be the type produced by the file source
46
* as a whole.
47
*/
48
@Override
49
TypeInformation<T> getProducedType();
50
}
51
```
52
53
### BulkFormat.Reader Interface
54
55
Nested interface for reading batches of records with efficient iteration.
56
57
```java { .api }
58
/**
59
* The actual reader that reads the batches of records.
60
*/
61
interface Reader<T> extends Closeable {
62
63
/**
64
* Reads one batch. The method should return null when reaching the end of the input. The
65
* returned batch will be handed over to the processing threads as one.
66
*
67
* The returned iterator object and any contained objects may be held onto by the file
68
* source for some time, so it should not be immediately reused by the reader.
69
*
70
* To implement reuse and to save object allocation, consider using a Pool and recycle objects
71
* into the Pool in the the RecordIterator.releaseBatch() method.
72
*/
73
@Nullable
74
RecordIterator<T> readBatch() throws IOException;
75
76
/**
77
* Closes the reader and should release all resources.
78
*/
79
@Override
80
void close() throws IOException;
81
}
82
```
83
84
### BulkFormat.RecordIterator Interface
85
86
Iterator interface for efficiently processing batches of records.
87
88
```java { .api }
89
/**
90
* An iterator over records with their position in the file. The iterator is closeable to
91
* support clean resource release and recycling.
92
*
93
* @param <T> The type of the record.
94
*/
95
interface RecordIterator<T> {
96
97
/**
98
* Gets the next record from the file, together with its position.
99
*
100
* The position information returned with the record point to the record AFTER the
101
* returned record, because it defines the point where the reading should resume once the
102
* current record is emitted. The position information is put in the source's state when the
103
* record is emitted.
104
*
105
* Objects returned by this method may be reused by the iterator. By the time that this
106
* method is called again, no object returned from the previous call will be referenced any
107
* more. That makes it possible to have a single MutableRecordAndPosition object and
108
* return the same instance (with updated record and position) on every call.
109
*/
110
@Nullable
111
RecordAndPosition<T> next();
112
113
/**
114
* Releases the batch that this iterator iterated over. This is not supposed to close the
115
* reader and its resources, but is simply a signal that this iterator is no used any more.
116
* This method can be used as a hook to recycle/reuse heavyweight object structures.
117
*/
118
void releaseBatch();
119
}
120
```
121
122
**Usage Examples:**
123
124
```java
125
import org.apache.flink.connector.file.src.FileSource;
126
import org.apache.flink.connector.file.src.reader.BulkFormat;
127
import org.apache.flink.core.fs.Path;
128
129
// Example usage with a hypothetical Parquet bulk format
130
BulkFormat<RowData, FileSourceSplit> parquetFormat = ParquetBulkFormat.builder()
131
.setSchema(schema)
132
.setProjection(projection)
133
.build();
134
135
FileSource<RowData> parquetSource = FileSource
136
.forBulkFileFormat(parquetFormat, new Path("/data/parquet"))
137
.build();
138
139
// Use in DataStream API
140
DataStream<RowData> rows = env.fromSource(parquetSource, WatermarkStrategy.noWatermarks(), "parquet-source");
141
```
142
143
### Utility Record Iterators
144
145
Built-in implementations for common record iteration patterns.
146
147
```java { .api }
148
/**
149
* Record iterator wrapping an array of records
150
*/
151
public class ArrayResultIterator<T> implements BulkFormat.RecordIterator<T> {
152
/**
153
* Creates iterator for array of records
154
* @param records Array of records to iterate over
155
*/
156
public ArrayResultIterator(T[] records);
157
158
public T next();
159
public boolean hasNext();
160
public void releaseBatch();
161
}
162
163
/**
164
* Record iterator wrapping another iterator
165
*/
166
public class IteratorResultIterator<T> implements BulkFormat.RecordIterator<T> {
167
/**
168
* Creates iterator wrapping another iterator
169
* @param iterator Iterator to wrap
170
*/
171
public IteratorResultIterator(Iterator<T> iterator);
172
173
public T next();
174
public boolean hasNext();
175
public void releaseBatch();
176
}
177
178
/**
179
* Record iterator for single records
180
*/
181
public class SingletonResultIterator<T> implements BulkFormat.RecordIterator<T> {
182
/**
183
* Creates iterator for single record
184
* @param record Single record to return
185
*/
186
public SingletonResultIterator(T record);
187
188
public T next();
189
public boolean hasNext();
190
public void releaseBatch();
191
}
192
```
193
194
### Custom BulkFormat Implementation
195
196
Example of implementing a custom bulk format for efficient batch processing.
197
198
```java { .api }
199
/**
200
* Example custom bulk format for reading JSON records in batches
201
*/
202
public class JsonBulkFormat implements BulkFormat<JsonNode, FileSourceSplit> {
203
private final int batchSize;
204
private final ObjectMapper mapper;
205
206
public JsonBulkFormat(int batchSize) {
207
this.batchSize = batchSize;
208
this.mapper = new ObjectMapper();
209
}
210
211
@Override
212
public Reader<JsonNode> createReader(Configuration config, FileSourceSplit split)
213
throws IOException {
214
FSDataInputStream stream = FileSystem.get(split.path().toUri())
215
.open(split.path(), 4096);
216
stream.seek(split.offset());
217
return new JsonBulkReader(stream, split.length(), batchSize, mapper);
218
}
219
220
@Override
221
public Reader<JsonNode> restoreReader(Configuration config, FileSourceSplit split)
222
throws IOException {
223
// For simplicity, restart from beginning of split
224
return createReader(config, split);
225
}
226
227
@Override
228
public TypeInformation<JsonNode> getProducedType() {
229
return TypeInformation.of(JsonNode.class);
230
}
231
232
private static class JsonBulkReader implements BulkFormat.Reader<JsonNode> {
233
private final BufferedReader reader;
234
private final long splitLength;
235
private final int batchSize;
236
private final ObjectMapper mapper;
237
private long bytesRead = 0;
238
239
public JsonBulkReader(FSDataInputStream stream, long splitLength,
240
int batchSize, ObjectMapper mapper) {
241
this.reader = new BufferedReader(new InputStreamReader(stream));
242
this.splitLength = splitLength;
243
this.batchSize = batchSize;
244
this.mapper = mapper;
245
}
246
247
@Override
248
public BulkFormat.RecordIterator<JsonNode> readBatch() throws IOException {
249
if (bytesRead >= splitLength) {
250
return null;
251
}
252
253
List<JsonNode> batch = new ArrayList<>(batchSize);
254
String line;
255
int count = 0;
256
257
while (count < batchSize && (line = reader.readLine()) != null) {
258
if (bytesRead >= splitLength) break;
259
260
bytesRead += line.getBytes().length + 1;
261
JsonNode node = mapper.readTree(line);
262
batch.add(node);
263
count++;
264
}
265
266
return batch.isEmpty() ? null : new ArrayResultIterator<>(batch.toArray(new JsonNode[0]));
267
}
268
}
269
}
270
```
271
272
### Integration with Table API
273
274
Bulk formats can be integrated with Flink's Table API for structured data processing.
275
276
```java { .api }
277
/**
278
* Adapter for using bulk formats with file info extraction
279
*/
280
public class FileInfoExtractorBulkFormat<T> implements BulkFormat<RowData, FileSourceSplit> {
281
/**
282
* Creates bulk format that extracts file metadata along with records
283
* @param wrappedFormat The underlying bulk format
284
* @param metadataColumns File metadata columns to extract
285
*/
286
public FileInfoExtractorBulkFormat(
287
BulkFormat<T, FileSourceSplit> wrappedFormat,
288
String[] metadataColumns);
289
290
@Override
291
public Reader<RowData> createReader(Configuration config, FileSourceSplit split)
292
throws IOException;
293
294
@Override
295
public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)
296
throws IOException;
297
}
298
299
/**
300
* Bulk format with column projection support
301
*/
302
public class ProjectingBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {
303
/**
304
* Creates bulk format with column projection
305
* @param wrappedFormat The underlying bulk format
306
* @param projectedFields Fields to include in output
307
*/
308
public ProjectingBulkFormat(
309
BulkFormat<T, FileSourceSplit> wrappedFormat,
310
int[] projectedFields);
311
}
312
313
/**
314
* Bulk format with record limit support
315
*/
316
public class LimitableBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {
317
/**
318
* Creates bulk format with record limit
319
* @param wrappedFormat The underlying bulk format
320
* @param limit Maximum number of records to read
321
*/
322
public LimitableBulkFormat(BulkFormat<T, FileSourceSplit> wrappedFormat, long limit);
323
}
324
```
325
326
**Advanced Usage Examples:**
327
328
```java
329
// Bulk format with projection for columnar formats
330
int[] projectedColumns = {0, 2, 4}; // Only read columns 0, 2, and 4
331
BulkFormat<RowData, FileSourceSplit> projectedFormat = new ProjectingBulkFormat<>(
332
originalFormat, projectedColumns);
333
334
// Bulk format with file metadata extraction
335
String[] metadataColumns = {"file.path", "file.size", "file.modification-time"};
336
BulkFormat<RowData, FileSourceSplit> metadataFormat = new FileInfoExtractorBulkFormat<>(
337
originalFormat, metadataColumns);
338
339
// Limited bulk format for sampling
340
BulkFormat<RowData, FileSourceSplit> limitedFormat = new LimitableBulkFormat<>(
341
originalFormat, 1000); // Only read first 1000 records
342
343
FileSource<RowData> advancedSource = FileSource
344
.forBulkFileFormat(limitedFormat, new Path("/data/samples"))
345
.build();
346
```
347
348
## Error Handling
349
350
Bulk formats handle various error conditions during batch reading:
351
352
- **IOException**: File system read errors, corrupted file structures
353
- **RuntimeException**: Format-specific parsing errors, schema mismatches
354
- **OutOfMemoryError**: Batch sizes too large for available memory
355
356
```java
357
try {
358
BulkFormat.Reader<JsonNode> reader = format.createReader(config, split);
359
BulkFormat.RecordIterator<JsonNode> batch;
360
361
while ((batch = reader.readBatch()) != null) {
362
try {
363
while (batch.hasNext()) {
364
JsonNode record = batch.next();
365
// Process record
366
}
367
} finally {
368
batch.releaseBatch(); // Always release batch resources
369
}
370
}
371
} catch (IOException e) {
372
// Handle read errors
373
} catch (OutOfMemoryError e) {
374
// Handle memory issues - consider reducing batch size
375
}
376
```
377
378
## Performance Considerations
379
380
- Choose appropriate batch sizes to balance memory usage and I/O efficiency
381
- Always call `releaseBatch()` to prevent memory leaks
382
- Use column projection to reduce data transfer and processing overhead
383
- Consider file format characteristics (row-oriented vs. columnar) when choosing batch sizes
384
- Bulk formats are typically more efficient than stream formats for high-throughput scenarios
385
- Implement proper resource cleanup in custom bulk format implementations
386
- Monitor memory usage and adjust batch sizes based on record size and available heap space