0
# Columnar Reading
1
2
The ORC format provides high-performance columnar reading through the `OrcColumnarRowInputFormat`, enabling vectorized processing with partition support, column projection, and statistics reporting.
3
4
## Input Format
5
6
```java { .api }
7
public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
8
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT>
9
implements FileBasedStatisticsReportableInputFormat {
10
11
public OrcColumnarRowInputFormat(
12
OrcShim<BatchT> shim,
13
Configuration hadoopConfig,
14
TypeDescription schema,
15
int[] selectedFields,
16
List<OrcFilters.Predicate> conjunctPredicates,
17
int batchSize,
18
ColumnBatchFactory<BatchT, SplitT> batchFactory,
19
TypeInformation<RowData> producedTypeInfo
20
);
21
22
public OrcReaderBatch<RowData, BatchT> createReaderBatch(
23
SplitT split,
24
OrcVectorizedBatchWrapper<BatchT> orcBatch,
25
Pool.Recycler<OrcReaderBatch<RowData, BatchT>> recycler,
26
int batchSize
27
);
28
29
public TypeInformation<RowData> getProducedType();
30
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
31
}
32
```
33
34
## Factory Methods
35
36
```java { .api }
37
public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT>
38
createPartitionedFormat(
39
OrcShim<VectorizedRowBatch> shim,
40
Configuration hadoopConfig,
41
RowType tableType,
42
List<String> partitionKeys,
43
PartitionFieldExtractor<SplitT> extractor,
44
int[] selectedFields,
45
List<OrcFilters.Predicate> conjunctPredicates,
46
int batchSize,
47
Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory
48
);
49
```
50
51
## Usage Examples
52
53
### Basic Columnar Reading
54
55
```java
56
import org.apache.flink.orc.OrcColumnarRowInputFormat;
57
import org.apache.flink.orc.shim.OrcShim;
58
import org.apache.flink.table.types.logical.*;
59
import org.apache.hadoop.conf.Configuration;
60
61
// Define table schema
62
RowType tableType = RowType.of(
63
new LogicalType[] {
64
new BigIntType(), // id
65
new VarCharType(255), // name
66
new IntType(), // age
67
new BooleanType() // active
68
},
69
new String[] {"id", "name", "age", "active"}
70
);
71
72
// Configure reading
73
Configuration hadoopConfig = new Configuration();
74
int[] selectedFields = {0, 1, 2, 3}; // All fields
75
List<OrcFilters.Predicate> predicates = new ArrayList<>();
76
int batchSize = 1024;
77
78
// Create input format
79
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
80
OrcColumnarRowInputFormat.createPartitionedFormat(
81
OrcShim.defaultShim(),
82
hadoopConfig,
83
tableType,
84
Collections.emptyList(), // No partitions
85
null, // No partition extractor
86
selectedFields,
87
predicates,
88
batchSize,
89
TypeConversions::fromLogicalToDataType
90
);
91
```
92
93
### Reading with Column Projection
94
95
```java
96
// Read only specific columns (id, name)
97
int[] projectedFields = {0, 1}; // Only id and name columns
98
99
RowType projectedType = RowType.of(
100
new LogicalType[] {
101
new BigIntType(), // id
102
new VarCharType(255) // name
103
},
104
new String[] {"id", "name"}
105
);
106
107
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> projectedFormat =
108
OrcColumnarRowInputFormat.createPartitionedFormat(
109
OrcShim.defaultShim(),
110
hadoopConfig,
111
tableType, // Full table type
112
Collections.emptyList(),
113
null,
114
projectedFields, // Only selected fields
115
predicates,
116
batchSize,
117
TypeConversions::fromLogicalToDataType
118
);
119
```
120
121
### Reading Partitioned Data
122
123
```java
124
import org.apache.flink.connector.file.table.PartitionFieldExtractor;
125
126
// Define partitioned table
127
List<String> partitionKeys = Arrays.asList("year", "month");
128
129
RowType partitionedTableType = RowType.of(
130
new LogicalType[] {
131
new BigIntType(), // id
132
new VarCharType(255), // name
133
new IntType(), // age
134
new BooleanType(), // active
135
new IntType(), // year (partition)
136
new IntType() // month (partition)
137
},
138
new String[] {"id", "name", "age", "active", "year", "month"}
139
);
140
141
// Partition field extractor
142
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
143
// Extract partition values from file path
144
String path = split.path().toString();
145
if ("year".equals(fieldName)) {
146
// Extract year from path like /data/year=2023/month=01/file.orc
147
return extractYearFromPath(path);
148
} else if ("month".equals(fieldName)) {
149
return extractMonthFromPath(path);
150
}
151
return null;
152
};
153
154
// Create partitioned format
155
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> partitionedFormat =
156
OrcColumnarRowInputFormat.createPartitionedFormat(
157
OrcShim.defaultShim(),
158
hadoopConfig,
159
partitionedTableType,
160
partitionKeys,
161
extractor,
162
selectedFields,
163
predicates,
164
batchSize,
165
TypeConversions::fromLogicalToDataType
166
);
167
```
168
169
### Reading with Predicates
170
171
```java
172
import org.apache.flink.orc.OrcFilters;
173
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
174
175
// Create filter predicates
176
List<OrcFilters.Predicate> predicates = Arrays.asList(
177
// age > 25
178
new OrcFilters.Not(new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)),
179
// active = true
180
new OrcFilters.Equals("active", PredicateLeaf.Type.BOOLEAN, true),
181
// name IS NOT NULL
182
new OrcFilters.Not(new OrcFilters.IsNull("name", PredicateLeaf.Type.STRING))
183
);
184
185
// Use predicates in format
186
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> filteredFormat =
187
OrcColumnarRowInputFormat.createPartitionedFormat(
188
OrcShim.defaultShim(),
189
hadoopConfig,
190
tableType,
191
Collections.emptyList(),
192
null,
193
selectedFields,
194
predicates, // Apply filters
195
batchSize,
196
TypeConversions::fromLogicalToDataType
197
);
198
```
199
200
## Column Batch Factory
201
202
```java { .api }
203
@FunctionalInterface
204
public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {
205
VectorizedColumnBatch create(SplitT split, BatchT batch);
206
}
207
```
208
209
Custom batch factory for specialized processing:
210
211
```java
212
ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory =
213
(split, orcBatch) -> {
214
// Create Flink column vectors from ORC vectors
215
ColumnVector[] vectors = new ColumnVector[selectedFields.length];
216
217
for (int i = 0; i < vectors.length; i++) {
218
int fieldIndex = selectedFields[i];
219
LogicalType fieldType = tableType.getTypeAt(fieldIndex);
220
221
vectors[i] = AbstractOrcColumnVector.createFlinkVector(
222
orcBatch.cols[i],
223
fieldType
224
);
225
}
226
227
return new VectorizedColumnBatch(vectors);
228
};
229
```
230
231
## Statistics Reporting
232
233
```java { .api }
234
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
235
```
236
237
Extract statistics from ORC files:
238
239
```java
240
import org.apache.flink.core.fs.Path;
241
import org.apache.flink.table.plan.stats.TableStats;
242
243
List<Path> orcFiles = Arrays.asList(
244
new Path("/data/file1.orc"),
245
new Path("/data/file2.orc")
246
);
247
248
// Get statistics from ORC metadata
249
TableStats stats = inputFormat.reportStatistics(orcFiles, dataType);
250
251
System.out.println("Row count: " + stats.getRowCount());
252
System.out.println("Column stats: " + stats.getColumnStats());
253
```
254
255
### OrcFormatStatisticsReportUtil
256
257
Utility class for extracting comprehensive table statistics from ORC files.
258
259
```java { .api }
260
public class OrcFormatStatisticsReportUtil {
261
public static TableStats getTableStatistics(
262
List<Path> files,
263
DataType producedDataType
264
);
265
266
public static TableStats getTableStatistics(
267
List<Path> files,
268
DataType producedDataType,
269
Configuration hadoopConfig
270
);
271
}
272
```
273
274
**Usage Examples:**
275
276
```java
277
import org.apache.flink.orc.util.OrcFormatStatisticsReportUtil;
278
import org.apache.hadoop.conf.Configuration;
279
280
// Get statistics with default configuration
281
List<Path> orcFiles = Arrays.asList(
282
new Path("/warehouse/users/part1.orc"),
283
new Path("/warehouse/users/part2.orc")
284
);
285
286
TableStats stats = OrcFormatStatisticsReportUtil.getTableStatistics(
287
orcFiles,
288
producedDataType
289
);
290
291
// Get statistics with custom Hadoop configuration
292
Configuration customConfig = new Configuration();
293
customConfig.set("orc.read.buffer.size", "262144");
294
customConfig.setBoolean("orc.read.include.file.footer", true);
295
296
TableStats detailedStats = OrcFormatStatisticsReportUtil.getTableStatistics(
297
orcFiles,
298
producedDataType,
299
customConfig
300
);
301
302
// Access statistics information
303
System.out.println("Total row count: " + stats.getRowCount());
304
Map<String, ColumnStats> columnStats = stats.getColumnStats();
305
for (Map.Entry<String, ColumnStats> entry : columnStats.entrySet()) {
306
String columnName = entry.getKey();
307
ColumnStats colStats = entry.getValue();
308
System.out.println(columnName + " - Null count: " + colStats.getNullCount());
309
System.out.println(columnName + " - Min: " + colStats.getMin());
310
System.out.println(columnName + " - Max: " + colStats.getMax());
311
}
312
```
313
314
## Reader Batch Processing
315
316
```java { .api }
317
public abstract class OrcReaderBatch<RowDataT, BatchT> implements Pool.Recyclable {
318
public abstract RecordIterator<RowDataT> convertAndGetIterator(
319
OrcVectorizedBatchWrapper<BatchT> orcBatch,
320
long startingOffset
321
);
322
}
323
```
324
325
The reader processes data in vectorized batches for optimal performance:
326
327
1. **Batch Creation**: `createReaderBatch()` creates vectorized column batch
328
2. **Vector Conversion**: ORC vectors converted to Flink column vectors
329
3. **Iterator Generation**: `convertAndGetIterator()` provides row-by-row access
330
4. **Memory Management**: Batches are recyclable for memory efficiency
331
332
## Performance Configuration
333
334
### Batch Size Tuning
335
336
```java
337
// Smaller batches for memory-constrained environments
338
int smallBatchSize = 512;
339
340
// Larger batches for high-throughput scenarios
341
int largeBatchSize = 4096;
342
343
// Default ORC batch size
344
int defaultBatchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 1024
345
```
346
347
### Hadoop Configuration
348
349
```java
350
Configuration hadoopConfig = new Configuration();
351
352
// ORC reader settings
353
hadoopConfig.setBoolean("orc.use.zerocopy", true);
354
hadoopConfig.setInt("orc.read.buffer.size", 262144); // 256KB
355
hadoopConfig.setBoolean("orc.read.include.file.footer", true);
356
357
// Compression buffer settings
358
hadoopConfig.setInt("io.compression.codec.lzo.buffersize", 65536);
359
hadoopConfig.setInt("io.compression.codec.snappy.buffersize", 65536);
360
```
361
362
## Integration with DataStream API
363
364
```java
365
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
366
import org.apache.flink.connector.file.src.FileSource;
367
368
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
369
370
// Create file source with ORC format
371
FileSource<RowData> orcSource = FileSource
372
.forBulkFileFormat(inputFormat, new Path("/path/to/orc/files"))
373
.build();
374
375
DataStream<RowData> orcStream = env.fromSource(
376
orcSource,
377
WatermarkStrategy.noWatermarks(),
378
"ORC Source"
379
);
380
381
// Process the stream
382
orcStream
383
.filter(row -> row.getInt(2) > 25) // age > 25
384
.map(row -> row.getString(1)) // extract name
385
.print();
386
```
387
388
## Split Reader Utilities
389
390
### OrcSplitReaderUtil
391
392
Core utility class for ORC split reading operations and type conversions.
393
394
```java { .api }
395
public class OrcSplitReaderUtil {
396
public static <SplitT extends FileSourceSplit> Function<SplitT, RecordIterator<RowData>>
397
genPartColumnarRowReader(
398
Configuration hadoopConfig,
399
String[] fullFieldNames,
400
DataType[] fullFieldTypes,
401
TypeInformation<RowData> typeInfo,
402
int[] selectedFields,
403
List<OrcFilters.Predicate> conjunctPredicates,
404
int batchSize,
405
OrcShim<VectorizedRowBatch> shim,
406
List<String> partitionKeys,
407
PartitionFieldExtractor<SplitT> extractor
408
);
409
410
public static int[] getSelectedOrcFields(
411
RowType tableType,
412
int[] selectedFields,
413
List<String> partitionKeys
414
);
415
416
public static String[] getNonPartNames(
417
String[] fullNames,
418
List<String> partitionKeys
419
);
420
421
public static String[] getNonPartNames(
422
RowType rowType,
423
List<String> partitionKeys
424
);
425
426
public static TypeDescription convertToOrcTypeWithPart(
427
RowType rowType,
428
List<String> partitionKeys
429
);
430
431
public static TypeDescription convertToOrcTypeWithPart(
432
String[] fieldNames,
433
DataType[] fieldTypes,
434
List<String> partitionKeys
435
);
436
437
public static TypeDescription logicalTypeToOrcType(LogicalType logicalType);
438
}
439
```
440
441
**Usage Examples:**
442
443
```java
444
import org.apache.flink.orc.OrcSplitReaderUtil;
445
446
// Get selected ORC field indices
447
int[] selectedOrcFields = OrcSplitReaderUtil.getSelectedOrcFields(
448
tableType,
449
selectedFields,
450
partitionKeys
451
);
452
453
// Convert Flink types to ORC schema
454
TypeDescription orcSchema = OrcSplitReaderUtil.convertToOrcTypeWithPart(
455
tableType,
456
partitionKeys
457
);
458
459
// Convert individual logical type
460
LogicalType stringType = new VarCharType(255);
461
TypeDescription orcStringType = OrcSplitReaderUtil.logicalTypeToOrcType(stringType);
462
```