0
# Vectorized Input
1
2
High-performance columnar input formats optimized for analytical workloads with vectorized processing, partition support, and efficient memory usage.
3
4
## Capabilities
5
6
### ParquetColumnarRowInputFormat
7
8
Columnar input format that provides RowData iterators using vectorized column batches for maximum performance in analytical queries.
9
10
```java { .api }
11
/**
12
* Parquet input format providing RowData iterator using columnar row data
13
* Extends ParquetVectorizedInputFormat with RowData-specific functionality
14
*/
15
public class ParquetColumnarRowInputFormat<SplitT extends FileSourceSplit>
16
extends ParquetVectorizedInputFormat<RowData, SplitT> {
17
18
/**
19
* Creates a basic columnar row input format without extra fields
20
* @param hadoopConfig Hadoop configuration for Parquet reading
21
* @param projectedType Row type defining the projected schema
22
* @param batchSize Number of rows per vectorized batch
23
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
24
* @param isCaseSensitive Whether column name matching is case sensitive
25
*/
26
public ParquetColumnarRowInputFormat(
27
Configuration hadoopConfig,
28
RowType projectedType,
29
int batchSize,
30
boolean isUtcTimestamp,
31
boolean isCaseSensitive
32
);
33
34
/**
35
* Creates a columnar row input format with extra fields support
36
* @param hadoopConfig Hadoop configuration for Parquet reading
37
* @param projectedType Projected row type (excludes extra fields)
38
* @param producedType Produced row type (includes extra fields)
39
* @param batchFactory Factory for creating column batches with extra fields
40
* @param batchSize Number of rows per vectorized batch
41
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
42
* @param isCaseSensitive Whether column name matching is case sensitive
43
*/
44
public ParquetColumnarRowInputFormat(
45
Configuration hadoopConfig,
46
RowType projectedType,
47
RowType producedType,
48
ColumnBatchFactory<SplitT> batchFactory,
49
int batchSize,
50
boolean isUtcTimestamp,
51
boolean isCaseSensitive
52
);
53
54
/**
55
* Creates a partitioned columnar row input format
56
* Automatically handles partition columns generated from file paths
57
* @param hadoopConfig Hadoop configuration for Parquet reading
58
* @param producedRowType Complete row type including partition columns
59
* @param partitionKeys List of partition column names
60
* @param extractor Extractor for deriving partition values from splits
61
* @param batchSize Number of rows per vectorized batch
62
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
63
* @param isCaseSensitive Whether column name matching is case sensitive
64
* @return Configured ParquetColumnarRowInputFormat for partitioned data
65
*/
66
public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
67
Configuration hadoopConfig,
68
RowType producedRowType,
69
List<String> partitionKeys,
70
PartitionFieldExtractor<SplitT> extractor,
71
int batchSize,
72
boolean isUtcTimestamp,
73
boolean isCaseSensitive
74
);
75
76
/**
77
* Returns the type information for the produced RowData
78
* @return TypeInformation for RowData output
79
*/
80
public TypeInformation<RowData> getProducedType();
81
}
82
```
83
84
### ParquetVectorizedInputFormat
85
86
Abstract base class for vectorized Parquet input formats providing the foundation for high-performance columnar processing.
87
88
```java { .api }
89
/**
90
* Abstract base class for vectorized Parquet input formats
91
* Provides vectorized column batch processing with configurable batch sizes
92
*/
93
public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit>
94
implements BulkFormat<T, SplitT> {
95
96
/**
97
* Creates a reader for the given file split
98
* @param config Flink configuration
99
* @param split File split to read from
100
* @return BulkFormat.Reader for processing the split
101
* @throws IOException if reader creation fails
102
*/
103
public Reader<T> createReader(Configuration config, SplitT split) throws IOException;
104
105
/**
106
* Restores a reader from a checkpointed position
107
* @param config Flink configuration
108
* @param split File split to read from
109
* @param restoredOffset Checkpointed position to restore from
110
* @return BulkFormat.Reader restored at the specified position
111
* @throws IOException if reader restoration fails
112
*/
113
public Reader<T> restoreReader(Configuration config, SplitT split, CheckpointedPosition restoredOffset)
114
throws IOException;
115
116
/**
117
* Checks if the format supports splitting
118
* @return true if the format can be split across multiple readers
119
*/
120
public boolean isSplittable();
121
}
122
```
123
124
### ColumnBatchFactory
125
126
Factory interface for creating vectorized column batches with support for extra fields like partition columns.
127
128
```java { .api }
129
/**
130
* Factory for creating vectorized column batches
131
* Supports adding extra fields beyond those present in Parquet files
132
*/
133
@FunctionalInterface
134
public interface ColumnBatchFactory<SplitT extends FileSourceSplit> {
135
136
/**
137
* Creates a vectorized column batch from Parquet column vectors
138
* @param split File split being processed (for extracting partition info)
139
* @param parquetVectors Column vectors read from Parquet file
140
* @return VectorizedColumnBatch with original and extra columns
141
* @throws IOException if batch creation fails
142
*/
143
VectorizedColumnBatch create(SplitT split, ColumnVector[] parquetVectors) throws IOException;
144
145
/**
146
* Creates a factory that doesn't add extra fields
147
* @return ColumnBatchFactory that passes through Parquet columns unchanged
148
*/
149
static <SplitT extends FileSourceSplit> ColumnBatchFactory<SplitT> withoutExtraFields() {
150
return (split, vectors) -> new VectorizedColumnBatch(vectors);
151
}
152
}
153
```
154
155
## Usage Examples
156
157
### Basic Vectorized Reading
158
159
```java
160
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
161
import org.apache.flink.table.types.logical.RowType;
162
import org.apache.flink.table.types.logical.LogicalType;
163
import org.apache.hadoop.conf.Configuration;
164
165
// Define schema for reading
166
RowType rowType = RowType.of(
167
new LogicalType[]{
168
DataTypes.BIGINT().getLogicalType(),
169
DataTypes.STRING().getLogicalType(),
170
DataTypes.TIMESTAMP(3).getLogicalType()
171
},
172
new String[]{"id", "name", "created_at"}
173
);
174
175
// Create input format
176
Configuration hadoopConfig = new Configuration();
177
int batchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 2048
178
boolean utcTimezone = true;
179
boolean caseSensitive = false;
180
181
ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat =
182
new ParquetColumnarRowInputFormat<>(
183
hadoopConfig, rowType, batchSize, utcTimezone, caseSensitive
184
);
185
186
// Use with FileSource
187
FileSource<RowData> parquetSource = FileSource
188
.forBulkFileFormat(inputFormat, new Path("/input/parquet/files"))
189
.build();
190
191
DataStream<RowData> dataStream = env.fromSource(parquetSource,
192
WatermarkStrategy.noWatermarks(), "parquet-source");
193
```
194
195
### Partitioned Data Reading
196
197
```java
198
import org.apache.flink.table.filesystem.PartitionFieldExtractor;
199
200
// Schema including partition columns
201
RowType producedRowType = RowType.of(
202
new LogicalType[]{
203
DataTypes.BIGINT().getLogicalType(), // id
204
DataTypes.STRING().getLogicalType(), // name
205
DataTypes.DATE().getLogicalType(), // partition: date
206
DataTypes.STRING().getLogicalType() // partition: region
207
},
208
new String[]{"id", "name", "date", "region"}
209
);
210
211
// Define partition columns
212
List<String> partitionKeys = Arrays.asList("date", "region");
213
214
// Create partition field extractor
215
PartitionFieldExtractor<FileSourceSplit> extractor =
216
PartitionFieldExtractor.forFileSystem("__HIVE_DEFAULT_PARTITION__");
217
218
// Create partitioned input format
219
ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat =
220
ParquetColumnarRowInputFormat.createPartitionedFormat(
221
hadoopConfig,
222
producedRowType,
223
partitionKeys,
224
extractor,
225
4096, // Larger batch size for partitioned data
226
true, // UTC timezone
227
false // Case insensitive
228
);
229
230
// File structure: /data/date=2023-01-01/region=us-west/part-0000.parquet
231
FileSource<RowData> partitionedSource = FileSource
232
.forBulkFileFormat(partitionedFormat, new Path("/data"))
233
.setFileEnumerator(FileEnumerator.create())
234
.build();
235
```
236
237
### Performance Tuning
238
239
```java
240
// Custom configuration for high-throughput reading
241
Configuration optimizedConfig = new Configuration();
242
243
// Parquet-specific optimizations
244
optimizedConfig.setBoolean("parquet.enable.dictionary", true);
245
optimizedConfig.setInt("parquet.page.size", 1048576); // 1MB page size
246
optimizedConfig.setInt("parquet.block.size", 134217728); // 128MB block size
247
248
// Larger batch sizes for analytical workloads
249
int largeBatchSize = 8192; // 4x default size
250
251
ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =
252
new ParquetColumnarRowInputFormat<>(
253
optimizedConfig,
254
rowType,
255
largeBatchSize,
256
true, // UTC timestamps
257
false // Case insensitive
258
);
259
260
// Configure FileSource for optimal throughput
261
FileSource<RowData> optimizedSource = FileSource
262
.forBulkFileFormat(optimizedFormat, inputPath)
263
.monitorContinuously(Duration.ofMinutes(1))
264
.build();
265
266
DataStream<RowData> stream = env
267
.fromSource(optimizedSource, WatermarkStrategy.noWatermarks(), "optimized-parquet")
268
.setParallelism(Runtime.getRuntime().availableProcessors()); // Scale with CPU cores
269
```
270
271
### Column Projection
272
273
```java
274
// Original file schema: id, name, email, age, created_at, updated_at
275
// Project only needed columns for better performance
276
RowType projectedSchema = RowType.of(
277
new LogicalType[]{
278
DataTypes.BIGINT().getLogicalType(), // id
279
DataTypes.STRING().getLogicalType(), // name
280
DataTypes.INT().getLogicalType() // age
281
},
282
new String[]{"id", "name", "age"}
283
);
284
285
// Only projected columns are read from Parquet files
286
ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat =
287
new ParquetColumnarRowInputFormat<>(
288
hadoopConfig, projectedSchema, batchSize, true, false
289
);
290
291
// This will only read 3 columns instead of 6, significantly improving I/O performance
292
```
293
294
### Custom Column Batch Factory
295
296
```java
297
// Custom factory that adds computed columns
298
ColumnBatchFactory<FileSourceSplit> customFactory = (split, parquetVectors) -> {
299
// Original Parquet columns: id, amount
300
// Add computed column: tax (amount * 0.1)
301
302
ColumnVector[] allVectors = new ColumnVector[parquetVectors.length + 1];
303
System.arraycopy(parquetVectors, 0, allVectors, 0, parquetVectors.length);
304
305
// Create computed tax column
306
WritableColumnVector taxVector = new WritableDoubleVector(parquetVectors[0].getSize());
307
ColumnVector amountVector = parquetVectors[1]; // Assuming amount is second column
308
309
for (int i = 0; i < amountVector.getSize(); i++) {
310
if (!amountVector.isNullAt(i)) {
311
double amount = amountVector.getDouble(i);
312
taxVector.putDouble(i, amount * 0.1);
313
} else {
314
taxVector.putNull(i);
315
}
316
}
317
318
allVectors[parquetVectors.length] = taxVector;
319
return new VectorizedColumnBatch(allVectors);
320
};
321
322
// Schema including computed column
323
RowType schemaWithTax = RowType.of(
324
new LogicalType[]{
325
DataTypes.BIGINT().getLogicalType(), // id
326
DataTypes.DOUBLE().getLogicalType(), // amount
327
DataTypes.DOUBLE().getLogicalType() // tax (computed)
328
},
329
new String[]{"id", "amount", "tax"}
330
);
331
332
RowType parquetSchema = RowType.of(
333
new LogicalType[]{
334
DataTypes.BIGINT().getLogicalType(), // id
335
DataTypes.DOUBLE().getLogicalType() // amount
336
},
337
new String[]{"id", "amount"}
338
);
339
340
ParquetColumnarRowInputFormat<FileSourceSplit> customFormat =
341
new ParquetColumnarRowInputFormat<>(
342
hadoopConfig,
343
parquetSchema, // Schema in Parquet file
344
schemaWithTax, // Final produced schema
345
customFactory, // Custom batch factory
346
batchSize,
347
true,
348
false
349
);
350
```
351
352
## Performance Characteristics
353
354
### Memory Usage
355
356
- **Batch Size**: Larger batches (4096-8192) improve throughput but use more memory
357
- **Column Vectors**: Memory usage scales with batch size × number of columns × data type size
358
- **Dictionary Compression**: Reduces memory usage for repeated values
359
360
### I/O Optimization
361
362
- **Column Projection**: Only read required columns to minimize I/O
363
- **Predicate Pushdown**: Filter row groups at the Parquet level
364
- **Vectorized Processing**: Process multiple rows per operation
365
366
### Parallelization
367
368
- **Split-level Parallelism**: Each Parquet file can be processed by separate parallel instances
369
- **Row Group Parallelism**: Large files can be split at row group boundaries
370
- **CPU Vectorization**: Modern CPUs can process vectorized operations efficiently
371
372
## Error Handling
373
374
Common error scenarios and solutions:
375
376
```java
377
try {
378
ParquetColumnarRowInputFormat<FileSourceSplit> format =
379
new ParquetColumnarRowInputFormat<>(hadoopConfig, rowType, batchSize, true, false);
380
} catch (IllegalArgumentException e) {
381
// Invalid row type, negative batch size, etc.
382
logger.error("Invalid configuration for Parquet input format", e);
383
} catch (Exception e) {
384
// Other initialization errors
385
logger.error("Failed to create Parquet input format", e);
386
}
387
388
// Runtime reading errors
389
try {
390
BulkFormat.Reader<RowData> reader = format.createReader(config, split);
391
RecordIterator<RowData> iterator = reader.readBatch();
392
} catch (IOException e) {
393
// File not found, permission errors, corrupted files
394
logger.error("Failed to read Parquet file", e);
395
} catch (RuntimeException e) {
396
// Schema mismatches, unsupported data types
397
logger.error("Runtime error during Parquet reading", e);
398
}
399
```