Low-level ORC integration providing record readers and batch wrappers for direct ORC file access without Hive dependencies. Handles ORC file structure, metadata, and provides the foundation for higher-level reading and writing operations.
Shim implementation for ORC operations without Hive dependencies, providing record readers and batch management.
/**
* Shim for ORC reader without Hive dependencies
* Implements OrcShim interface for ORC file operations using standalone ORC library
*/
public class OrcNoHiveShim implements OrcShim<VectorizedRowBatch> {
/**
* Create ORC record reader for specified file and split
* @param conf Hadoop configuration for ORC settings
* @param schema ORC type description for the file schema
* @param selectedFields Array of field indices to read (column projection)
* @param conjunctPredicates List of filter predicates for pushdown
* @param path Path to the ORC file to read
* @param splitStart Byte offset where split starts in file
* @param splitLength Number of bytes to read in this split
* @return ORC RecordReader configured for the specified parameters
* @throws IOException if reader creation fails
*/
public RecordReader createRecordReader(
Configuration conf,
TypeDescription schema,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
org.apache.flink.core.fs.Path path,
long splitStart,
long splitLength
) throws IOException;
/**
* Create batch wrapper for ORC vectorized row batches
* @param schema ORC type description for creating appropriately sized batch
* @param batchSize Maximum number of rows per batch
* @return Batch wrapper containing initialized VectorizedRowBatch
*/
public OrcNoHiveBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize);
/**
* Read next batch of data from ORC record reader
* @param reader ORC record reader to read from
* @param rowBatch Vectorized row batch to populate with data
* @return true if batch was populated with data, false if end of data
* @throws IOException if read operation fails
*/
public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException;
}Wrapper class for ORC VectorizedRowBatch that provides size information and batch access.
/**
* Wrapper for ORC VectorizedRowBatch providing additional functionality
* Implements OrcVectorizedBatchWrapper interface for batch management
*/
public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
/**
* Create batch wrapper for the given VectorizedRowBatch
* @param batch ORC vectorized row batch to wrap
*/
public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);
/**
* Get the wrapped ORC vectorized row batch
* @return VectorizedRowBatch instance
*/
public VectorizedRowBatch getBatch();
/**
* Get the number of rows currently in the batch
* @return Current row count in the batch
*/
public int size();
}Usage Examples:
import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
import org.apache.orc.TypeDescription;
import org.apache.orc.RecordReader;
// Create ORC schema
TypeDescription schema = TypeDescription.fromString(
"struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>"
);
// Configure Hadoop settings
Configuration conf = new Configuration();
conf.set("orc.compress", "ZLIB");
conf.setBoolean("orc.use.zerocopy", true);
// Create shim instance
OrcNoHiveShim shim = new OrcNoHiveShim();
// Create record reader for entire file
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/users.orc");
int[] selectedFields = {0, 1, 2, 3, 4}; // Read all fields
List<OrcFilters.Predicate> predicates = Arrays.asList(
OrcFilters.greaterThan("age", 18),
OrcFilters.isNotNull("email")
);
RecordReader reader = shim.createRecordReader(
conf,
schema,
selectedFields,
predicates,
filePath,
0, // Start at beginning
Long.MAX_VALUE // Read entire file
);
// Create batch wrapper
OrcNoHiveBatchWrapper batchWrapper = shim.createBatchWrapper(schema, 2048);
VectorizedRowBatch batch = batchWrapper.getBatch();
// Read data in batches
while (shim.nextBatch(reader, batch)) {
System.out.println("Read batch with " + batch.size + " rows");
// Process batch data
for (int i = 0; i < batch.size; i++) {
if (!batch.cols[0].isNull[i]) {
long id = ((LongColumnVector) batch.cols[0]).vector[i];
// Process row...
}
}
// Reset batch for next read
batch.reset();
}
reader.close();The shim creates ORC RecordReader instances with advanced configuration:
// Create reader with split-specific configuration
public RecordReader createAdvancedReader(
Configuration conf,
TypeDescription schema,
Path filePath,
long splitStart,
long splitLength) throws IOException {
OrcNoHiveShim shim = new OrcNoHiveShim();
// Configure column projection (read only columns 0, 2, 4)
int[] selectedFields = {0, 2, 4};
// Configure predicate pushdown
List<OrcFilters.Predicate> predicates = Arrays.asList(
OrcFilters.between("timestamp_col", startTime, endTime),
OrcFilters.in("status", Arrays.asList("ACTIVE", "PENDING"))
);
return shim.createRecordReader(
conf, schema, selectedFields, predicates,
filePath, splitStart, splitLength
);
}import org.apache.flink.orc.nohive.vector.OrcNoHiveBatchWrapper;
// Process ORC file with custom batch size and error handling
public long processOrcFile(Path filePath, TypeDescription schema) throws IOException {
OrcNoHiveShim shim = new OrcNoHiveShim();
long totalRows = 0;
try {
// Create reader
RecordReader reader = shim.createRecordReader(
new Configuration(), schema, null, null,
filePath, 0, Long.MAX_VALUE
);
// Create larger batch for better throughput
OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 4096);
VectorizedRowBatch batch = wrapper.getBatch();
// Process all batches
while (shim.nextBatch(reader, batch)) {
totalRows += batch.size;
// Log progress every 100K rows
if (totalRows % 100000 == 0) {
System.out.println("Processed " + totalRows + " rows");
}
// Process batch data here
processBatch(batch);
// Reset for next batch
batch.reset();
}
reader.close();
} catch (IOException e) {
System.err.println("Error processing ORC file: " + e.getMessage());
throw e;
}
return totalRows;
}import org.apache.orc.TypeDescription;
// Parse ORC schema from string
TypeDescription schema = TypeDescription.fromString(
"struct<" +
"user_id:bigint," +
"profile:struct<name:string,age:int>," +
"tags:array<string>," +
"metrics:map<string,double>" +
">"
);
// Inspect schema structure
System.out.println("Root type: " + schema.getCategory());
System.out.println("Field count: " + schema.getChildren().size());
for (int i = 0; i < schema.getChildren().size(); i++) {
TypeDescription field = schema.getChildren().get(i);
String fieldName = schema.getFieldNames().get(i);
System.out.println("Field " + i + ": " + fieldName + " (" + field.getCategory() + ")");
}// Process specific byte range of large ORC file
public void processFileSplit(Path filePath, long splitStart, long splitLength) throws IOException {
TypeDescription schema = getSchemaFromFile(filePath);
OrcNoHiveShim shim = new OrcNoHiveShim();
// Create reader for specific split
RecordReader reader = shim.createRecordReader(
new Configuration(),
schema,
null, // Read all columns
null, // No predicates
filePath,
splitStart, // Start byte offset
splitLength // Bytes to read
);
OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);
VectorizedRowBatch batch = wrapper.getBatch();
while (shim.nextBatch(reader, batch)) {
System.out.println("Split batch: " + batch.size + " rows");
// Process split data
}
reader.close();
}Configuration conf = new Configuration();
// Performance settings
conf.setBoolean("orc.use.zerocopy", true); // Enable zero-copy reads
conf.setInt("orc.row.batch.size", 2048); // Rows per batch
conf.setBoolean("orc.skip.corrupt.data", false); // Fail on corrupt data
conf.setBoolean("orc.tolerate.missing.schema", false); // Strict schema validation
// Compression settings
conf.set("orc.compress", "ZLIB"); // Compression algorithm
conf.setInt("orc.compress.size", 262144); // 256KB compression blocks
// Memory settings
conf.setLong("orc.max.file.length", 1024 * 1024 * 1024L); // 1GB max file size
conf.setInt("orc.buffer.size", 262144); // 256KB I/O buffer
// Create shim with configuration
OrcNoHiveShim shim = new OrcNoHiveShim();
RecordReader reader = shim.createRecordReader(conf, schema, /* other params */);import org.apache.flink.orc.OrcFilters;
// Configure complex predicates for pushdown
List<OrcFilters.Predicate> complexPredicates = Arrays.asList(
// Date range filter
OrcFilters.between("created_date",
Date.valueOf("2023-01-01"),
Date.valueOf("2023-12-31")),
// Numeric comparisons
OrcFilters.and(
OrcFilters.greaterThanEquals("age", 18),
OrcFilters.lessThan("age", 65)
),
// String operations
OrcFilters.or(
OrcFilters.startsWith("email", "admin@"),
OrcFilters.in("role", Arrays.asList("admin", "moderator"))
),
// Null handling
OrcFilters.isNotNull("last_login"),
// Complex logical combinations
OrcFilters.or(
OrcFilters.and(
OrcFilters.equals("status", "premium"),
OrcFilters.greaterThan("subscription_end", new Date())
),
OrcFilters.equals("status", "free")
)
);import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.StripeInformation;
// Analyze file stripes for optimal split planning
public void analyzeOrcStripes(Path filePath) throws IOException {
Configuration conf = new Configuration();
// Open ORC file reader
Reader orcReader = OrcFile.createReader(
new org.apache.hadoop.fs.Path(filePath.toUri()),
OrcFile.readerOptions(conf)
);
// Examine stripe structure
List<StripeInformation> stripes = orcReader.getStripes();
System.out.println("File has " + stripes.size() + " stripes");
for (int i = 0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
System.out.println("Stripe " + i + ":");
System.out.println(" Offset: " + stripe.getOffset());
System.out.println(" Length: " + stripe.getLength());
System.out.println(" Rows: " + stripe.getNumberOfRows());
System.out.println(" Data Length: " + stripe.getDataLength());
}
orcReader.close();
}import java.util.concurrent.TimeUnit;
public class RobustOrcReader {
private static final int MAX_RETRIES = 3;
private static final long RETRY_DELAY_MS = 1000;
public long readOrcFileWithRetry(Path filePath, TypeDescription schema) {
OrcNoHiveShim shim = new OrcNoHiveShim();
long totalRows = 0;
int retryCount = 0;
while (retryCount < MAX_RETRIES) {
try {
RecordReader reader = shim.createRecordReader(
new Configuration(), schema, null, null,
filePath, 0, Long.MAX_VALUE
);
OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);
VectorizedRowBatch batch = wrapper.getBatch();
while (shim.nextBatch(reader, batch)) {
totalRows += batch.size;
batch.reset();
}
reader.close();
return totalRows; // Success
} catch (IOException e) {
retryCount++;
System.err.println("Read attempt " + retryCount + " failed: " + e.getMessage());
if (retryCount >= MAX_RETRIES) {
throw new RuntimeException("Failed to read ORC file after " + MAX_RETRIES + " attempts", e);
}
// Wait before retry
try {
TimeUnit.MILLISECONDS.sleep(RETRY_DELAY_MS * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry delay", ie);
}
}
}
return totalRows;
}
}// Validate schema compatibility before reading
public boolean validateSchema(Path filePath, TypeDescription expectedSchema) {
try {
Configuration conf = new Configuration();
Reader orcReader = OrcFile.createReader(
new org.apache.hadoop.fs.Path(filePath.toUri()),
OrcFile.readerOptions(conf)
);
TypeDescription fileSchema = orcReader.getSchema();
// Compare schemas
if (!isSchemaCompatible(fileSchema, expectedSchema)) {
System.err.println("Schema mismatch:");
System.err.println("Expected: " + expectedSchema);
System.err.println("Found: " + fileSchema);
return false;
}
orcReader.close();
return true;
} catch (IOException e) {
System.err.println("Failed to read schema from file: " + e.getMessage());
return false;
}
}
private boolean isSchemaCompatible(TypeDescription fileSchema, TypeDescription expectedSchema) {
// Implement schema compatibility logic
return fileSchema.toString().equals(expectedSchema.toString());
}// Optimize batch size based on data characteristics
public int calculateOptimalBatchSize(TypeDescription schema, long availableMemory) {
// Estimate bytes per row based on schema
long estimatedBytesPerRow = estimateRowSize(schema);
// Target 10% of available memory for batch
long targetBatchMemory = availableMemory / 10;
// Calculate optimal batch size
int optimalBatchSize = (int) (targetBatchMemory / estimatedBytesPerRow);
// Clamp to reasonable bounds
return Math.max(512, Math.min(optimalBatchSize, 8192));
}
private long estimateRowSize(TypeDescription schema) {
// Simplified row size estimation
long size = 0;
for (TypeDescription child : schema.getChildren()) {
switch (child.getCategory()) {
case BOOLEAN:
case BYTE:
size += 1;
break;
case SHORT:
size += 2;
break;
case INT:
case FLOAT:
size += 4;
break;
case LONG:
case DOUBLE:
case DATE:
case TIMESTAMP:
size += 8;
break;
case STRING:
case VARCHAR:
case CHAR:
size += 50; // Average string length estimate
break;
case DECIMAL:
size += 16; // Decimal storage estimate
break;
default:
size += 32; // Complex type estimate
}
}
return size;
}