0
# Vectorized Reading
1
2
High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.
3
4
## Capabilities
5
6
### ParquetVectorizedInputFormat
7
8
Abstract base class for vectorized Parquet file reading with pluggable batch creation strategies.
9
10
```java { .api }
11
/**
12
* Base class for vectorized Parquet file reading
13
* @param <T> Type of records produced
14
* @param <SplitT> Type of file split
15
*/
16
public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {
17
18
/**
19
* Creates a reader for the given configuration and split
20
* @param config Hadoop configuration for Parquet settings
21
* @param split File split to read
22
* @return RecordReaderIterator for reading records
23
* @throws IOException if reader creation fails
24
*/
25
public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;
26
27
/**
28
* Restores a reader from checkpoint state
29
* @param config Hadoop configuration
30
* @param split File split to read
31
* @return RecordReaderIterator for reading records
32
* @throws IOException if reader restoration fails
33
*/
34
public RecordReaderIterator<T> restoreReader(Configuration config, SplitT split) throws IOException;
35
36
/**
37
* Indicates whether this format supports file splitting
38
* @return true - vectorized reading supports splitting
39
*/
40
public boolean isSplittable();
41
42
/**
43
* Creates reader batch implementation for vectorized processing (abstract method)
44
* @param writableVectors Array of writable column vectors
45
* @param columnarBatch Vectorized column batch for processing
46
* @param recycler Pool recycler for batch reuse
47
* @return ParquetReaderBatch implementation for the specific type
48
*/
49
protected abstract ParquetReaderBatch<T> createReaderBatch(
50
WritableColumnVector[] writableVectors,
51
VectorizedColumnBatch columnarBatch,
52
Pool.Recycler<ParquetReaderBatch<T>> recycler
53
);
54
}
55
```
56
57
### ColumnBatchFactory
58
59
Functional interface for creating vectorized column batches from file splits and column vectors.
60
61
```java { .api }
62
/**
63
* Factory for creating vectorized column batches
64
* @param <SplitT> Type of file split
65
*/
66
@FunctionalInterface
67
public interface ColumnBatchFactory<SplitT> {
68
69
/**
70
* Creates a VectorizedColumnBatch from split and column vectors
71
* @param split File split containing metadata
72
* @param vectors Array of column vectors containing data
73
* @return VectorizedColumnBatch for processing
74
*/
75
VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);
76
77
/**
78
* Creates a default factory that doesn't add extra fields
79
* @return ColumnBatchFactory without partition field injection
80
*/
81
static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();
82
}
83
```
84
85
### ParquetColumnarRowSplitReader
86
87
Specialized split reader for columnar RowData reading with vectorized processing and partition support.
88
89
```java { .api }
90
/**
91
* Split reader for columnar RowData reading with vectorization
92
*/
93
public class ParquetColumnarRowSplitReader implements RecordReader<RowData> {
94
95
/**
96
* Creates a new ParquetColumnarRowSplitReader
97
* @param utcTimestamp Whether to use UTC timezone for timestamps
98
* @param caseSensitive Whether field names are case sensitive
99
* @param conf Hadoop configuration
100
* @param fieldTypes Array of logical types for output fields
101
* @param fieldNames Array of field names for output schema
102
* @param selectedFields Array of selected field names (null for all)
103
* @param batchSize Batch size for vectorized reading
104
* @param rowDataWrapper Function to transform raw RowData
105
* @param splitStart Start offset within the split
106
* @param splitLength Length of data to read from split
107
* @param fileLength Total length of the file
108
* @param footer Parquet file metadata
109
* @param blocks Array of column chunks to read
110
*/
111
public ParquetColumnarRowSplitReader(
112
boolean utcTimestamp,
113
boolean caseSensitive,
114
Configuration conf,
115
RowType rowType,
116
String[] fieldNames,
117
String[] selectedFields,
118
int batchSize,
119
Function<RowData, RowData> rowDataWrapper,
120
long splitStart,
121
long splitLength,
122
long fileLength,
123
ParquetMetadata footer,
124
ColumnChunk[] blocks
125
);
126
127
/**
128
* Reads next batch of records
129
* @return RecordIterator for the batch, null if no more data
130
* @throws IOException if reading fails
131
*/
132
public RecordIterator<RowData> readBatch() throws IOException;
133
134
/**
135
* Closes the reader and releases resources
136
* @throws IOException if close fails
137
*/
138
public void close() throws IOException;
139
}
140
```
141
142
### ParquetSplitReaderUtil
143
144
Utility class providing helper methods for vectorized Parquet reading operations.
145
146
```java { .api }
147
/**
148
* Utilities for vectorized Parquet file reading
149
*/
150
public class ParquetSplitReaderUtil {
151
152
/**
153
* Creates column readers for the specified schema and configuration
154
* @param utcTimestamp Whether to use UTC timezone
155
* @param caseSensitive Whether names are case sensitive
156
* @param conf Hadoop configuration
157
* @param fieldTypes Array of field logical types
158
* @param fieldNames Array of field names
159
* @param footer Parquet metadata
160
* @param blocks Column chunks to read
161
* @param batchSize Batch size for reading
162
* @return Array of ColumnReader instances
163
*/
164
public static ColumnReader[] createColumnReaders(
165
boolean utcTimestamp,
166
boolean caseSensitive,
167
Configuration conf,
168
LogicalType[] fieldTypes,
169
String[] fieldNames,
170
ParquetMetadata footer,
171
ColumnChunk[] blocks,
172
int batchSize
173
);
174
175
/**
176
* Additional utility methods for split reading operations
177
*/
178
// ... other static utility methods
179
}
180
```
181
182
### Column Vector Types
183
184
Specialized column vectors for different data types in vectorized processing.
185
186
```java { .api }
187
/**
188
* Specialized decimal vector for high-precision numeric data
189
*/
190
public class ParquetDecimalVector extends ColumnVector {
191
192
/**
193
* Creates a new ParquetDecimalVector
194
* @param capacity Maximum number of values to store
195
*/
196
public ParquetDecimalVector(int capacity);
197
198
/**
199
* Sets decimal value at specified position
200
* @param index Position to set
201
* @param value Decimal value to set
202
*/
203
public void setDecimal(int index, DecimalData value);
204
205
/**
206
* Gets decimal value at specified position
207
* @param index Position to get
208
* @return DecimalData value at position
209
*/
210
public DecimalData getDecimal(int index);
211
}
212
213
/**
214
* Dictionary support for vectorized reading
215
*/
216
public class ParquetDictionary {
217
218
/**
219
* Creates dictionary from Parquet dictionary page
220
* @param dictionaryPage Dictionary page from Parquet file
221
* @param descriptor Column descriptor for type information
222
* @return ParquetDictionary for decoding values
223
*/
224
public static ParquetDictionary create(
225
DictionaryPage dictionaryPage,
226
ColumnDescriptor descriptor
227
);
228
229
/**
230
* Decodes dictionary ID to actual value
231
* @param id Dictionary ID to decode
232
* @return Decoded value
233
*/
234
public Object decode(int id);
235
}
236
```
237
238
## Column Reader Architecture
239
240
### Base Column Reader Interface
241
242
```java { .api }
243
/**
244
* Base interface for vectorized column readers
245
*/
246
public interface ColumnReader {
247
248
/**
249
* Reads a batch of values into the provided column vector
250
* @param num Number of values to read
251
* @param vector Column vector to populate
252
*/
253
void readBatch(int num, WritableColumnVector vector);
254
255
/**
256
* Returns the current repetition level
257
* @return Repetition level for nested data
258
*/
259
int getCurrentRepetitionLevel();
260
261
/**
262
* Returns the current definition level
263
* @return Definition level for null handling
264
*/
265
int getCurrentDefinitionLevel();
266
}
267
```
268
269
### Specialized Column Readers
270
271
```java { .api }
272
/**
273
* Column reader implementations for different primitive types
274
*/
275
276
// Boolean column reader
277
public class BooleanColumnReader extends AbstractColumnReader {
278
public void readBatch(int num, WritableColumnVector vector);
279
}
280
281
// Integer type readers
282
public class ByteColumnReader extends AbstractColumnReader {
283
public void readBatch(int num, WritableColumnVector vector);
284
}
285
286
public class ShortColumnReader extends AbstractColumnReader {
287
public void readBatch(int num, WritableColumnVector vector);
288
}
289
290
public class IntColumnReader extends AbstractColumnReader {
291
public void readBatch(int num, WritableColumnVector vector);
292
}
293
294
public class LongColumnReader extends AbstractColumnReader {
295
public void readBatch(int num, WritableColumnVector vector);
296
}
297
298
// Floating point readers
299
public class FloatColumnReader extends AbstractColumnReader {
300
public void readBatch(int num, WritableColumnVector vector);
301
}
302
303
public class DoubleColumnReader extends AbstractColumnReader {
304
public void readBatch(int num, WritableColumnVector vector);
305
}
306
307
// String and binary readers
308
public class BytesColumnReader extends AbstractColumnReader {
309
public void readBatch(int num, WritableColumnVector vector);
310
}
311
312
public class FixedLenBytesColumnReader extends AbstractColumnReader {
313
public void readBatch(int num, WritableColumnVector vector);
314
}
315
316
// Temporal type readers
317
public class TimestampColumnReader extends AbstractColumnReader {
318
public void readBatch(int num, WritableColumnVector vector);
319
}
320
321
// Nested type readers
322
public class NestedColumnReader extends AbstractColumnReader {
323
public void readBatch(int num, WritableColumnVector vector);
324
}
325
326
public class NestedPrimitiveColumnReader extends AbstractColumnReader {
327
public void readBatch(int num, WritableColumnVector vector);
328
}
329
```
330
331
## Usage Examples
332
333
### Basic Vectorized Reading
334
335
```java
336
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
337
import org.apache.flink.connector.file.src.FileSource;
338
339
// Create vectorized input format with optimal batch size
340
ParquetColumnarRowInputFormat<FileSourceSplit> vectorizedFormat =
341
new ParquetColumnarRowInputFormat<>(
342
new Configuration(),
343
rowType,
344
TypeInformation.of(RowData.class),
345
null, // Read all fields
346
null, // No field ID mapping
347
4096, // Large batch size for performance
348
true, // UTC timestamps
349
true // Case sensitive
350
);
351
352
// Use with FileSource for high-throughput reading
353
FileSource<RowData> source = FileSource
354
.forBulkFormat(vectorizedFormat, new Path("/large-dataset"))
355
.build();
356
357
DataStream<RowData> highThroughputStream = env.fromSource(
358
source,
359
WatermarkStrategy.noWatermarks(),
360
"vectorized-parquet-source"
361
);
362
```
363
364
### Custom Column Batch Factory
365
366
```java
367
import org.apache.flink.formats.parquet.vector.ColumnBatchFactory;
368
369
// Custom factory that adds partition information
370
ColumnBatchFactory<FileSourceSplit> customFactory = (split, vectors) -> {
371
// Extract partition information from split
372
String[] partitionValues = extractPartitionValues(split);
373
374
// Create batch with partition fields
375
VectorizedColumnBatch batch = new VectorizedColumnBatch(vectors.length + partitionValues.length);
376
377
// Add data columns
378
for (int i = 0; i < vectors.length; i++) {
379
batch.cols[i] = vectors[i];
380
}
381
382
// Add partition columns
383
for (int i = 0; i < partitionValues.length; i++) {
384
batch.cols[vectors.length + i] = createPartitionVector(partitionValues[i], batch.size);
385
}
386
387
return batch;
388
};
389
```
390
391
### Performance Tuning
392
393
```java
394
// Optimize batch size based on available memory and data characteristics
395
int optimalBatchSize = calculateBatchSize(
396
Runtime.getRuntime().maxMemory(), // Available memory
397
numberOfColumns, // Schema width
398
averageRowSize, // Data density
399
parquetBlockSize // File block size
400
);
401
402
// Configure for high-throughput scenarios
403
Configuration perfConfig = new Configuration();
404
perfConfig.setInt("parquet.read.batch.size", optimalBatchSize);
405
perfConfig.setBoolean("parquet.read.vectorized.enable", true);
406
perfConfig.setInt("parquet.read.allocation.size", 8 * 1024 * 1024); // 8MB
407
408
ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =
409
new ParquetColumnarRowInputFormat<>(
410
perfConfig,
411
rowType,
412
typeInfo,
413
selectedFields,
414
null,
415
optimalBatchSize,
416
utcTimestamp,
417
caseSensitive
418
);
419
```
420
421
### Nested Data Reading
422
423
```java
424
// Schema with nested structures
425
RowType nestedSchema = RowType.of(
426
new LogicalType[] {
427
DataTypes.BIGINT().getLogicalType(), // id
428
RowType.of( // address (nested)
429
new LogicalType[] {
430
DataTypes.STRING().getLogicalType(), // street
431
DataTypes.STRING().getLogicalType(), // city
432
DataTypes.STRING().getLogicalType() // country
433
},
434
new String[] {"street", "city", "country"}
435
),
436
ArrayType.newBuilder() // phone_numbers (array)
437
.elementType(DataTypes.STRING().getLogicalType())
438
.build()
439
},
440
new String[] {"id", "address", "phone_numbers"}
441
);
442
443
// Vectorized reading handles nested structures efficiently
444
ParquetColumnarRowInputFormat<FileSourceSplit> nestedFormat =
445
new ParquetColumnarRowInputFormat<>(
446
conf, nestedSchema, typeInfo, null, null,
447
2048, // Smaller batches for complex data
448
utcTimestamp, caseSensitive
449
);
450
```
451
452
### Memory-Efficient Reading
453
454
```java
455
// Configure for memory-constrained environments
456
Configuration memoryConfig = new Configuration();
457
memoryConfig.setInt("parquet.read.batch.size", 1024); // Smaller batches
458
memoryConfig.setLong("parquet.memory.pool.ratio", 0.7); // Conservative memory usage
459
memoryConfig.setBoolean("parquet.strings.signed-min-max", false); // Reduce string overhead
460
461
// Use column projection to reduce memory footprint
462
List<String> essentialFields = Arrays.asList("id", "timestamp", "value");
463
464
ParquetColumnarRowInputFormat<FileSourceSplit> memoryEfficientFormat =
465
new ParquetColumnarRowInputFormat<>(
466
memoryConfig,
467
projectedRowType, // Only essential fields
468
typeInfo,
469
essentialFields, // Column projection
470
null,
471
1024, // Conservative batch size
472
utcTimestamp,
473
caseSensitive
474
);
475
```
476
477
### Parallel Reading with Multiple Splits
478
479
```java
480
// Configure for parallel reading across multiple splits
481
FileSource<RowData> parallelSource = FileSource
482
.forBulkFormat(vectorizedFormat, inputPath)
483
.monitorContinuously(Duration.ofMinutes(1)) // Monitor for new files
484
.setSplitEnumerator( // Custom split strategy
485
ContinuousFileSplitEnumerator.builder()
486
.setSplitSize(64 * 1024 * 1024) // 64MB splits
487
.build()
488
)
489
.build();
490
491
// Process with appropriate parallelism
492
DataStream<RowData> parallelStream = env
493
.fromSource(parallelSource, WatermarkStrategy.noWatermarks(), "parallel-source")
494
.setParallelism(numberOfCores * 2); // CPU-bound processing
495
```
496
497
### ParquetSplitReaderUtil
498
499
Utility class providing helper methods for creating column readers and vectors in vectorized Parquet reading.
500
501
```java { .api }
502
/**
503
* Utility methods for Parquet vectorized reading components
504
*/
505
public class ParquetSplitReaderUtil {
506
507
/**
508
* Builds a list of ParquetField representations from RowType fields
509
* @param fields List of RowType fields to convert
510
* @param fieldNames List of field names for mapping
511
* @param columnIO MessageColumnIO for schema information
512
* @return List of ParquetField objects for vectorized reading
513
*/
514
public static List<ParquetField> buildFieldsList(
515
List<RowType.RowField> fields,
516
List<String> fieldNames,
517
MessageColumnIO columnIO
518
);
519
520
/**
521
* Creates a column reader for the specified field and configuration
522
* @param utcTimestamp Whether to use UTC timezone for timestamps
523
* @param logicalType Flink logical type for the column
524
* @param physicalType Parquet physical type representation
525
* @param columnDescriptors List of column descriptors for the field
526
* @param pageReadStore Page read store for accessing data
527
* @param field ParquetField definition
528
* @param depth Nesting depth of the field
529
* @return ColumnReader instance for reading the field data
530
*/
531
public static ColumnReader createColumnReader(
532
boolean utcTimestamp,
533
LogicalType logicalType,
534
Type physicalType,
535
List<ColumnDescriptor> columnDescriptors,
536
PageReadStore pageReadStore,
537
ParquetField field,
538
int depth
539
);
540
541
/**
542
* Creates a writable column vector for the specified type and configuration
543
* @param batchSize Batch size for the vector
544
* @param logicalType Flink logical type
545
* @param physicalType Parquet physical type
546
* @param columnDescriptors Column descriptors for metadata
547
* @param depth Nesting depth
548
* @return WritableColumnVector for storing read data
549
*/
550
public static WritableColumnVector createWritableColumnVector(
551
int batchSize,
552
LogicalType logicalType,
553
Type physicalType,
554
List<ColumnDescriptor> columnDescriptors,
555
int depth
556
);
557
558
/**
559
* Creates a constant-value column vector
560
* @param type Logical type of the constant
561
* @param value Constant value to fill the vector
562
* @param batchSize Size of the vector
563
* @return ColumnVector filled with the constant value
564
*/
565
public static ColumnVector createVectorFromConstant(
566
LogicalType type,
567
Object value,
568
int batchSize
569
);
570
}
571
```
572
573
## Performance Characteristics
574
575
### Throughput Optimization
576
577
- **Batch Processing**: Processes multiple rows simultaneously using SIMD operations where possible
578
- **Column Pruning**: Only reads required columns from storage, reducing I/O
579
- **Dictionary Compression**: Efficient handling of dictionary-encoded columns
580
- **Lazy Evaluation**: Defers expensive operations until data is actually needed
581
582
### Memory Management
583
584
- **Vectorized Memory Layout**: Contiguous memory access patterns for better CPU cache utilization
585
- **Controlled Memory Usage**: Configurable batch sizes prevent memory overflow
586
- **Off-heap Storage**: Column vectors can use off-heap memory to reduce GC pressure
587
588
### I/O Efficiency
589
590
- **Block-level Reading**: Aligns with Parquet row group boundaries for optimal disk access
591
- **Parallel I/O**: Multiple threads can read different column chunks simultaneously
592
- **Compression Handling**: Native support for all Parquet compression codecs
593
594
The vectorized reading infrastructure provides significant performance improvements over row-based processing, especially for analytical workloads with wide schemas and large datasets.