0
# ORC Integration
1
2
Low-level ORC integration providing record readers and batch wrappers for direct ORC file access without Hive dependencies. Handles ORC file structure, metadata, and provides the foundation for higher-level reading and writing operations.
3
4
## Capabilities
5
6
### ORC No-Hive Shim
7
8
Shim implementation for ORC operations without Hive dependencies, providing record readers and batch management.
9
10
```java { .api }
11
/**
12
* Shim for ORC reader without Hive dependencies
13
* Implements OrcShim interface for ORC file operations using standalone ORC library
14
*/
15
public class OrcNoHiveShim implements OrcShim<VectorizedRowBatch> {
16
17
/**
18
* Create ORC record reader for specified file and split
19
* @param conf Hadoop configuration for ORC settings
20
* @param schema ORC type description for the file schema
21
* @param selectedFields Array of field indices to read (column projection)
22
* @param conjunctPredicates List of filter predicates for pushdown
23
* @param path Path to the ORC file to read
24
* @param splitStart Byte offset where split starts in file
25
* @param splitLength Number of bytes to read in this split
26
* @return ORC RecordReader configured for the specified parameters
27
* @throws IOException if reader creation fails
28
*/
29
public RecordReader createRecordReader(
30
Configuration conf,
31
TypeDescription schema,
32
int[] selectedFields,
33
List<OrcFilters.Predicate> conjunctPredicates,
34
org.apache.flink.core.fs.Path path,
35
long splitStart,
36
long splitLength
37
) throws IOException;
38
39
/**
40
* Create batch wrapper for ORC vectorized row batches
41
* @param schema ORC type description for creating appropriately sized batch
42
* @param batchSize Maximum number of rows per batch
43
* @return Batch wrapper containing initialized VectorizedRowBatch
44
*/
45
public OrcNoHiveBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize);
46
47
/**
48
* Read next batch of data from ORC record reader
49
* @param reader ORC record reader to read from
50
* @param rowBatch Vectorized row batch to populate with data
51
* @return true if batch was populated with data, false if end of data
52
* @throws IOException if read operation fails
53
*/
54
public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException;
55
}
56
```
57
58
### Batch Wrapper
59
60
Wrapper class for ORC VectorizedRowBatch that provides size information and batch access.
61
62
```java { .api }
63
/**
64
* Wrapper for ORC VectorizedRowBatch providing additional functionality
65
* Implements OrcVectorizedBatchWrapper interface for batch management
66
*/
67
public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
68
69
/**
70
* Create batch wrapper for the given VectorizedRowBatch
71
* @param batch ORC vectorized row batch to wrap
72
*/
73
public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);
74
75
/**
76
* Get the wrapped ORC vectorized row batch
77
* @return VectorizedRowBatch instance
78
*/
79
public VectorizedRowBatch getBatch();
80
81
/**
82
* Get the number of rows currently in the batch
83
* @return Current row count in the batch
84
*/
85
public int size();
86
}
87
```
88
89
**Usage Examples:**
90
91
```java
92
import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
93
import org.apache.orc.TypeDescription;
94
import org.apache.orc.RecordReader;
95
96
// Create ORC schema
97
TypeDescription schema = TypeDescription.fromString(
98
"struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>"
99
);
100
101
// Configure Hadoop settings
102
Configuration conf = new Configuration();
103
conf.set("orc.compress", "ZLIB");
104
conf.setBoolean("orc.use.zerocopy", true);
105
106
// Create shim instance
107
OrcNoHiveShim shim = new OrcNoHiveShim();
108
109
// Create record reader for entire file
110
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/users.orc");
111
int[] selectedFields = {0, 1, 2, 3, 4}; // Read all fields
112
List<OrcFilters.Predicate> predicates = Arrays.asList(
113
OrcFilters.greaterThan("age", 18),
114
OrcFilters.isNotNull("email")
115
);
116
117
RecordReader reader = shim.createRecordReader(
118
conf,
119
schema,
120
selectedFields,
121
predicates,
122
filePath,
123
0, // Start at beginning
124
Long.MAX_VALUE // Read entire file
125
);
126
127
// Create batch wrapper
128
OrcNoHiveBatchWrapper batchWrapper = shim.createBatchWrapper(schema, 2048);
129
VectorizedRowBatch batch = batchWrapper.getBatch();
130
131
// Read data in batches
132
while (shim.nextBatch(reader, batch)) {
133
System.out.println("Read batch with " + batch.size + " rows");
134
135
// Process batch data
136
for (int i = 0; i < batch.size; i++) {
137
if (!batch.cols[0].isNull[i]) {
138
long id = ((LongColumnVector) batch.cols[0]).vector[i];
139
// Process row...
140
}
141
}
142
143
// Reset batch for next read
144
batch.reset();
145
}
146
147
reader.close();
148
```
149
150
### Record Reader Operations
151
152
The shim creates ORC RecordReader instances with advanced configuration:
153
154
```java
155
// Create reader with split-specific configuration
156
public RecordReader createAdvancedReader(
157
Configuration conf,
158
TypeDescription schema,
159
Path filePath,
160
long splitStart,
161
long splitLength) throws IOException {
162
163
OrcNoHiveShim shim = new OrcNoHiveShim();
164
165
// Configure column projection (read only columns 0, 2, 4)
166
int[] selectedFields = {0, 2, 4};
167
168
// Configure predicate pushdown
169
List<OrcFilters.Predicate> predicates = Arrays.asList(
170
OrcFilters.between("timestamp_col", startTime, endTime),
171
OrcFilters.in("status", Arrays.asList("ACTIVE", "PENDING"))
172
);
173
174
return shim.createRecordReader(
175
conf, schema, selectedFields, predicates,
176
filePath, splitStart, splitLength
177
);
178
}
179
```
180
181
### Batch Processing with Shim
182
183
```java
184
import org.apache.flink.orc.nohive.vector.OrcNoHiveBatchWrapper;
185
186
// Process ORC file with custom batch size and error handling
187
public long processOrcFile(Path filePath, TypeDescription schema) throws IOException {
188
OrcNoHiveShim shim = new OrcNoHiveShim();
189
long totalRows = 0;
190
191
try {
192
// Create reader
193
RecordReader reader = shim.createRecordReader(
194
new Configuration(), schema, null, null,
195
filePath, 0, Long.MAX_VALUE
196
);
197
198
// Create larger batch for better throughput
199
OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 4096);
200
VectorizedRowBatch batch = wrapper.getBatch();
201
202
// Process all batches
203
while (shim.nextBatch(reader, batch)) {
204
totalRows += batch.size;
205
206
// Log progress every 100K rows
207
if (totalRows % 100000 == 0) {
208
System.out.println("Processed " + totalRows + " rows");
209
}
210
211
// Process batch data here
212
processBatch(batch);
213
214
// Reset for next batch
215
batch.reset();
216
}
217
218
reader.close();
219
220
} catch (IOException e) {
221
System.err.println("Error processing ORC file: " + e.getMessage());
222
throw e;
223
}
224
225
return totalRows;
226
}
227
```
228
229
## ORC File Structure and Metadata
230
231
### Schema Handling
232
233
```java
234
import org.apache.orc.TypeDescription;
235
236
// Parse ORC schema from string
237
TypeDescription schema = TypeDescription.fromString(
238
"struct<" +
239
"user_id:bigint," +
240
"profile:struct<name:string,age:int>," +
241
"tags:array<string>," +
242
"metrics:map<string,double>" +
243
">"
244
);
245
246
// Inspect schema structure
247
System.out.println("Root type: " + schema.getCategory());
248
System.out.println("Field count: " + schema.getChildren().size());
249
250
for (int i = 0; i < schema.getChildren().size(); i++) {
251
TypeDescription field = schema.getChildren().get(i);
252
String fieldName = schema.getFieldNames().get(i);
253
System.out.println("Field " + i + ": " + fieldName + " (" + field.getCategory() + ")");
254
}
255
```
256
257
### File Split Processing
258
259
```java
260
// Process specific byte range of large ORC file
261
public void processFileSplit(Path filePath, long splitStart, long splitLength) throws IOException {
262
TypeDescription schema = getSchemaFromFile(filePath);
263
OrcNoHiveShim shim = new OrcNoHiveShim();
264
265
// Create reader for specific split
266
RecordReader reader = shim.createRecordReader(
267
new Configuration(),
268
schema,
269
null, // Read all columns
270
null, // No predicates
271
filePath,
272
splitStart, // Start byte offset
273
splitLength // Bytes to read
274
);
275
276
OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);
277
VectorizedRowBatch batch = wrapper.getBatch();
278
279
while (shim.nextBatch(reader, batch)) {
280
System.out.println("Split batch: " + batch.size + " rows");
281
// Process split data
282
}
283
284
reader.close();
285
}
286
```
287
288
## Configuration Options
289
290
### ORC Reader Configuration
291
292
```java
293
Configuration conf = new Configuration();
294
295
// Performance settings
296
conf.setBoolean("orc.use.zerocopy", true); // Enable zero-copy reads
297
conf.setInt("orc.row.batch.size", 2048); // Rows per batch
298
conf.setBoolean("orc.skip.corrupt.data", false); // Fail on corrupt data
299
conf.setBoolean("orc.tolerate.missing.schema", false); // Strict schema validation
300
301
// Compression settings
302
conf.set("orc.compress", "ZLIB"); // Compression algorithm
303
conf.setInt("orc.compress.size", 262144); // 256KB compression blocks
304
305
// Memory settings
306
conf.setLong("orc.max.file.length", 1024 * 1024 * 1024L); // 1GB max file size
307
conf.setInt("orc.buffer.size", 262144); // 256KB I/O buffer
308
309
// Create shim with configuration
310
OrcNoHiveShim shim = new OrcNoHiveShim();
311
RecordReader reader = shim.createRecordReader(conf, schema, /* other params */);
312
```
313
314
### Predicate Configuration
315
316
```java
317
import org.apache.flink.orc.OrcFilters;
318
319
// Configure complex predicates for pushdown
320
List<OrcFilters.Predicate> complexPredicates = Arrays.asList(
321
// Date range filter
322
OrcFilters.between("created_date",
323
Date.valueOf("2023-01-01"),
324
Date.valueOf("2023-12-31")),
325
326
// Numeric comparisons
327
OrcFilters.and(
328
OrcFilters.greaterThanEquals("age", 18),
329
OrcFilters.lessThan("age", 65)
330
),
331
332
// String operations
333
OrcFilters.or(
334
OrcFilters.startsWith("email", "admin@"),
335
OrcFilters.in("role", Arrays.asList("admin", "moderator"))
336
),
337
338
// Null handling
339
OrcFilters.isNotNull("last_login"),
340
341
// Complex logical combinations
342
OrcFilters.or(
343
OrcFilters.and(
344
OrcFilters.equals("status", "premium"),
345
OrcFilters.greaterThan("subscription_end", new Date())
346
),
347
OrcFilters.equals("status", "free")
348
)
349
);
350
```
351
352
## Stripe and Split Management
353
354
### Stripe-Level Processing
355
356
```java
357
import org.apache.orc.OrcFile;
358
import org.apache.orc.Reader;
359
import org.apache.orc.StripeInformation;
360
361
// Analyze file stripes for optimal split planning
362
public void analyzeOrcStripes(Path filePath) throws IOException {
363
Configuration conf = new Configuration();
364
365
// Open ORC file reader
366
Reader orcReader = OrcFile.createReader(
367
new org.apache.hadoop.fs.Path(filePath.toUri()),
368
OrcFile.readerOptions(conf)
369
);
370
371
// Examine stripe structure
372
List<StripeInformation> stripes = orcReader.getStripes();
373
System.out.println("File has " + stripes.size() + " stripes");
374
375
for (int i = 0; i < stripes.size(); i++) {
376
StripeInformation stripe = stripes.get(i);
377
System.out.println("Stripe " + i + ":");
378
System.out.println(" Offset: " + stripe.getOffset());
379
System.out.println(" Length: " + stripe.getLength());
380
System.out.println(" Rows: " + stripe.getNumberOfRows());
381
System.out.println(" Data Length: " + stripe.getDataLength());
382
}
383
384
orcReader.close();
385
}
386
```
387
388
## Error Handling and Recovery
389
390
### Robust Reading Pattern
391
392
```java
393
import java.util.concurrent.TimeUnit;
394
395
public class RobustOrcReader {
396
private static final int MAX_RETRIES = 3;
397
private static final long RETRY_DELAY_MS = 1000;
398
399
public long readOrcFileWithRetry(Path filePath, TypeDescription schema) {
400
OrcNoHiveShim shim = new OrcNoHiveShim();
401
long totalRows = 0;
402
int retryCount = 0;
403
404
while (retryCount < MAX_RETRIES) {
405
try {
406
RecordReader reader = shim.createRecordReader(
407
new Configuration(), schema, null, null,
408
filePath, 0, Long.MAX_VALUE
409
);
410
411
OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);
412
VectorizedRowBatch batch = wrapper.getBatch();
413
414
while (shim.nextBatch(reader, batch)) {
415
totalRows += batch.size;
416
batch.reset();
417
}
418
419
reader.close();
420
return totalRows; // Success
421
422
} catch (IOException e) {
423
retryCount++;
424
System.err.println("Read attempt " + retryCount + " failed: " + e.getMessage());
425
426
if (retryCount >= MAX_RETRIES) {
427
throw new RuntimeException("Failed to read ORC file after " + MAX_RETRIES + " attempts", e);
428
}
429
430
// Wait before retry
431
try {
432
TimeUnit.MILLISECONDS.sleep(RETRY_DELAY_MS * retryCount);
433
} catch (InterruptedException ie) {
434
Thread.currentThread().interrupt();
435
throw new RuntimeException("Interrupted during retry delay", ie);
436
}
437
}
438
}
439
440
return totalRows;
441
}
442
}
443
```
444
445
### Schema Validation
446
447
```java
448
// Validate schema compatibility before reading
449
public boolean validateSchema(Path filePath, TypeDescription expectedSchema) {
450
try {
451
Configuration conf = new Configuration();
452
Reader orcReader = OrcFile.createReader(
453
new org.apache.hadoop.fs.Path(filePath.toUri()),
454
OrcFile.readerOptions(conf)
455
);
456
457
TypeDescription fileSchema = orcReader.getSchema();
458
459
// Compare schemas
460
if (!isSchemaCompatible(fileSchema, expectedSchema)) {
461
System.err.println("Schema mismatch:");
462
System.err.println("Expected: " + expectedSchema);
463
System.err.println("Found: " + fileSchema);
464
return false;
465
}
466
467
orcReader.close();
468
return true;
469
470
} catch (IOException e) {
471
System.err.println("Failed to read schema from file: " + e.getMessage());
472
return false;
473
}
474
}
475
476
private boolean isSchemaCompatible(TypeDescription fileSchema, TypeDescription expectedSchema) {
477
// Implement schema compatibility logic
478
return fileSchema.toString().equals(expectedSchema.toString());
479
}
480
```
481
482
## Performance Optimization
483
484
### Batch Size Tuning
485
486
```java
487
// Optimize batch size based on data characteristics
488
public int calculateOptimalBatchSize(TypeDescription schema, long availableMemory) {
489
// Estimate bytes per row based on schema
490
long estimatedBytesPerRow = estimateRowSize(schema);
491
492
// Target 10% of available memory for batch
493
long targetBatchMemory = availableMemory / 10;
494
495
// Calculate optimal batch size
496
int optimalBatchSize = (int) (targetBatchMemory / estimatedBytesPerRow);
497
498
// Clamp to reasonable bounds
499
return Math.max(512, Math.min(optimalBatchSize, 8192));
500
}
501
502
private long estimateRowSize(TypeDescription schema) {
503
// Simplified row size estimation
504
long size = 0;
505
for (TypeDescription child : schema.getChildren()) {
506
switch (child.getCategory()) {
507
case BOOLEAN:
508
case BYTE:
509
size += 1;
510
break;
511
case SHORT:
512
size += 2;
513
break;
514
case INT:
515
case FLOAT:
516
size += 4;
517
break;
518
case LONG:
519
case DOUBLE:
520
case DATE:
521
case TIMESTAMP:
522
size += 8;
523
break;
524
case STRING:
525
case VARCHAR:
526
case CHAR:
527
size += 50; // Average string length estimate
528
break;
529
case DECIMAL:
530
size += 16; // Decimal storage estimate
531
break;
532
default:
533
size += 32; // Complex type estimate
534
}
535
}
536
return size;
537
}
538
```