0
# Columnar Reading
1
2
Helper utilities for creating columnar input formats and split readers with partition support for efficient ORC file reading. Provides high-performance vectorized reading with predicate pushdown and column projection capabilities.
3
4
## Capabilities
5
6
### Columnar Row Input Format
7
8
Helper class for creating partitioned ORC columnar input formats without Hive dependencies.
9
10
```java { .api }
11
/**
12
* Helper class to create OrcColumnarRowFileInputFormat for no-hive usage
13
* Provides static factory methods for creating partitioned input formats
14
*/
15
public class OrcNoHiveColumnarRowInputFormat {
16
17
/**
18
* Create a partitioned OrcColumnarRowFileInputFormat where partition columns
19
* can be generated by split metadata
20
* @param hadoopConfig Hadoop configuration for ORC reading
21
* @param tableType Row type describing the complete table schema
22
* @param partitionKeys List of partition column names
23
* @param extractor Extracts partition values from file splits
24
* @param selectedFields Array of field indices to read from files
25
* @param conjunctPredicates List of filter predicates for pushdown
26
* @param batchSize Number of rows per vectorized batch
27
* @return Configured columnar input format for partitioned reading
28
*/
29
public static <SplitT extends FileSourceSplit>
30
OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
31
Configuration hadoopConfig,
32
RowType tableType,
33
List<String> partitionKeys,
34
PartitionFieldExtractor<SplitT> extractor,
35
int[] selectedFields,
36
List<OrcFilters.Predicate> conjunctPredicates,
37
int batchSize
38
);
39
}
40
```
41
42
**Usage Examples:**
43
44
```java
45
import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
46
import org.apache.flink.table.types.logical.*;
47
import org.apache.flink.connector.file.src.FileSourceSplit;
48
49
// Define table schema with partitioned columns
50
RowType tableType = RowType.of(
51
new LogicalType[] {
52
new BigIntType(), // user_id
53
new VarCharType(255), // name
54
new VarCharType(100), // email
55
new IntType(), // age
56
new VarCharType(50), // country (partition)
57
new VarCharType(10) // year (partition)
58
},
59
new String[] {"user_id", "name", "email", "age", "country", "year"}
60
);
61
62
// Define partition keys
63
List<String> partitionKeys = Arrays.asList("country", "year");
64
65
// Select only specific fields to read (column projection)
66
int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year
67
68
// Create partition extractor
69
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
70
// Extract partition values from file path like /data/country=US/year=2023/file.orc
71
String path = split.path().toString();
72
if (fieldName.equals("country")) {
73
return extractFromPath(path, "country=");
74
} else if (fieldName.equals("year")) {
75
return extractFromPath(path, "year=");
76
}
77
return null;
78
};
79
80
// Create filter predicates
81
List<OrcFilters.Predicate> predicates = Arrays.asList(
82
OrcFilters.equals("age", 25),
83
OrcFilters.lessThan("user_id", 10000L)
84
);
85
86
// Create columnar input format
87
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
88
OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
89
new Configuration(),
90
tableType,
91
partitionKeys,
92
extractor,
93
selectedFields,
94
predicates,
95
1024 // batch size
96
);
97
```
98
99
### Split Reader Utility
100
101
Utility for generating ORC split readers with partition support and predicate pushdown.
102
103
```java { .api }
104
/**
105
* Utility for generating OrcColumnarRowSplitReader instances
106
* Provides factory methods for creating split readers with partition support
107
*/
108
public class OrcNoHiveSplitReaderUtil {
109
110
/**
111
* Generate partitioned columnar row reader for ORC files
112
* @param conf Hadoop configuration
113
* @param fullFieldNames Complete array of field names in table schema
114
* @param fullFieldTypes Complete array of field types in table schema
115
* @param partitionSpec Map of partition column names to values
116
* @param selectedFields Array of field indices to read from files
117
* @param conjunctPredicates List of filter predicates for pushdown
118
* @param batchSize Number of rows per vectorized batch
119
* @param path Path to the ORC file to read
120
* @param splitStart Byte offset where split starts in file
121
* @param splitLength Number of bytes to read in this split
122
* @return Configured columnar row split reader
123
* @throws IOException if reader creation fails
124
*/
125
public static OrcColumnarRowSplitReader<VectorizedRowBatch> genPartColumnarRowReader(
126
Configuration conf,
127
String[] fullFieldNames,
128
DataType[] fullFieldTypes,
129
Map<String, Object> partitionSpec,
130
int[] selectedFields,
131
List<OrcFilters.Predicate> conjunctPredicates,
132
int batchSize,
133
org.apache.flink.core.fs.Path path,
134
long splitStart,
135
long splitLength
136
) throws IOException;
137
}
138
```
139
140
**Usage Examples:**
141
142
```java
143
import org.apache.flink.orc.nohive.OrcNoHiveSplitReaderUtil;
144
import org.apache.flink.core.fs.Path;
145
146
// Define complete table schema
147
String[] fieldNames = {"user_id", "name", "email", "age", "country", "year"};
148
DataType[] fieldTypes = {
149
DataTypes.BIGINT(),
150
DataTypes.VARCHAR(255),
151
DataTypes.VARCHAR(100),
152
DataTypes.INT(),
153
DataTypes.VARCHAR(50), // partition column
154
DataTypes.VARCHAR(10) // partition column
155
};
156
157
// Define partition values for this split
158
Map<String, Object> partitionSpec = new HashMap<>();
159
partitionSpec.put("country", "US");
160
partitionSpec.put("year", "2023");
161
162
// Select fields to read (excluding age for performance)
163
int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year
164
165
// Create filter predicates
166
List<OrcFilters.Predicate> predicates = Arrays.asList(
167
OrcFilters.lessThan("user_id", 50000L)
168
);
169
170
// Create split reader
171
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/country=US/year=2023/part-00001.orc");
172
OrcColumnarRowSplitReader<VectorizedRowBatch> reader =
173
OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
174
new Configuration(),
175
fieldNames,
176
fieldTypes,
177
partitionSpec,
178
selectedFields,
179
predicates,
180
2048, // batch size
181
filePath,
182
0, // split start
183
1024 * 1024 // split length (1MB)
184
);
185
186
// Read data in batches
187
VectorizedColumnBatch batch;
188
while ((batch = reader.nextBatch()) != null) {
189
// Process vectorized batch
190
for (int i = 0; i < batch.getNumRows(); i++) {
191
// Access column data through vectors
192
long userId = batch.getColumn(0).getLong(i);
193
String name = batch.getColumn(1).getString(i);
194
// ... process row
195
}
196
}
197
reader.close();
198
```
199
200
### Column Batch Factory
201
202
The input format uses a ColumnBatchFactory to create Flink VectorizedColumnBatch instances from ORC VectorizedRowBatch:
203
204
```java { .api }
205
/**
206
* Factory interface for creating column batches from ORC row batches
207
* Used internally by input formats to convert ORC vectors to Flink vectors
208
*/
209
interface ColumnBatchFactory<T, SplitT extends FileSourceSplit> {
210
/**
211
* Create VectorizedColumnBatch from ORC VectorizedRowBatch
212
* @param split File split containing partition metadata
213
* @param rowBatch ORC vectorized row batch with column data
214
* @return Flink VectorizedColumnBatch for processing
215
*/
216
VectorizedColumnBatch createBatch(SplitT split, T rowBatch);
217
}
218
```
219
220
## Predicate Pushdown
221
222
ORC filters enable predicate pushdown for improved performance:
223
224
```java
225
import org.apache.flink.orc.OrcFilters;
226
227
// Comparison predicates
228
List<OrcFilters.Predicate> predicates = Arrays.asList(
229
OrcFilters.equals("status", "active"),
230
OrcFilters.lessThan("age", 65),
231
OrcFilters.greaterThan("salary", 50000.0),
232
OrcFilters.lessThanEquals("score", 100),
233
OrcFilters.greaterThanEquals("rating", 4.0),
234
OrcFilters.isNull("deleted_at"),
235
OrcFilters.isNotNull("email"),
236
OrcFilters.between("created_date", startDate, endDate)
237
);
238
239
// String predicates
240
List<OrcFilters.Predicate> stringPredicates = Arrays.asList(
241
OrcFilters.startsWith("name", "John"),
242
OrcFilters.in("country", Arrays.asList("US", "CA", "UK"))
243
);
244
245
// Logical combinations
246
OrcFilters.Predicate combined = OrcFilters.and(
247
OrcFilters.equals("status", "active"),
248
OrcFilters.or(
249
OrcFilters.greaterThan("age", 18),
250
OrcFilters.isNull("age")
251
)
252
);
253
```
254
255
## Column Projection
256
257
Optimize performance by reading only required columns:
258
259
```java
260
// Table has 10 columns but only need 3
261
String[] allFields = {"id", "name", "email", "age", "salary", "dept", "manager", "created", "updated", "status"};
262
263
// Project only required columns (indices 0, 1, 2, 9)
264
int[] selectedFields = {0, 1, 2, 9}; // id, name, email, status
265
266
// This reduces I/O and memory usage significantly
267
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> format =
268
OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
269
hadoopConfig,
270
fullTableType,
271
partitionKeys,
272
extractor,
273
selectedFields, // Only read these columns
274
predicates,
275
batchSize
276
);
277
```
278
279
## Partition Handling
280
281
Handle partitioned ORC datasets efficiently:
282
283
```java
284
// Partition extractor implementation
285
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
286
String path = split.path().toString();
287
288
// Parse Hive-style partition paths: /table/year=2023/month=12/file.orc
289
Pattern pattern = Pattern.compile(fieldName + "=([^/]+)");
290
Matcher matcher = pattern.matcher(path);
291
292
if (matcher.find()) {
293
String value = matcher.group(1);
294
295
// Convert string value to appropriate type
296
switch (fieldType.getTypeRoot()) {
297
case INTEGER:
298
return Integer.parseInt(value);
299
case BIGINT:
300
return Long.parseLong(value);
301
case VARCHAR:
302
return value;
303
case DATE:
304
return Date.valueOf(value);
305
default:
306
return value;
307
}
308
}
309
return null;
310
};
311
```
312
313
## Performance Optimization
314
315
Key strategies for optimal columnar reading performance:
316
317
1. **Column Projection**: Only read required columns using `selectedFields`
318
2. **Predicate Pushdown**: Use `conjunctPredicates` to filter at the ORC level
319
3. **Batch Size Tuning**: Adjust `batchSize` based on memory and processing requirements
320
4. **Partition Pruning**: Let Flink's partition pruning eliminate unnecessary splits
321
5. **Compression**: Configure ORC compression for better I/O performance
322
323
```java
324
// Optimized configuration
325
Configuration optimizedConfig = new Configuration();
326
optimizedConfig.setBoolean("orc.use.zerocopy", true);
327
optimizedConfig.setInt("orc.row.batch.size", 2048); // Larger batches for better throughput
328
optimizedConfig.set("orc.compress", "ZSTD"); // Fast compression
329
```
330
331
## Error Handling
332
333
Handle common reading errors:
334
335
```java
336
try {
337
OrcColumnarRowSplitReader<VectorizedRowBatch> reader =
338
OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(/* parameters */);
339
340
VectorizedColumnBatch batch;
341
while ((batch = reader.nextBatch()) != null) {
342
// Process batch
343
}
344
} catch (IOException e) {
345
// Handle file system errors, corrupt files, or read failures
346
logger.error("Failed to read ORC file: " + path, e);
347
} catch (IllegalArgumentException e) {
348
// Handle schema mismatches or invalid column selections
349
logger.error("Invalid schema or column selection", e);
350
} finally {
351
if (reader != null) {
352
reader.close();
353
}
354
}
355
```