or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md
tile.json

orc-integration.mddocs/

ORC Integration

Low-level ORC integration providing record readers and batch wrappers for direct ORC file access without Hive dependencies. Handles ORC file structure, metadata, and provides the foundation for higher-level reading and writing operations.

Capabilities

ORC No-Hive Shim

Shim implementation for ORC operations without Hive dependencies, providing record readers and batch management.

/**
 * Shim for ORC reader without Hive dependencies
 * Implements OrcShim interface for ORC file operations using standalone ORC library
 */
public class OrcNoHiveShim implements OrcShim<VectorizedRowBatch> {
    
    /**
     * Create ORC record reader for specified file and split
     * @param conf Hadoop configuration for ORC settings
     * @param schema ORC type description for the file schema
     * @param selectedFields Array of field indices to read (column projection)
     * @param conjunctPredicates List of filter predicates for pushdown
     * @param path Path to the ORC file to read
     * @param splitStart Byte offset where split starts in file
     * @param splitLength Number of bytes to read in this split
     * @return ORC RecordReader configured for the specified parameters
     * @throws IOException if reader creation fails
     */
    public RecordReader createRecordReader(
        Configuration conf,
        TypeDescription schema,
        int[] selectedFields,
        List<OrcFilters.Predicate> conjunctPredicates,
        org.apache.flink.core.fs.Path path,
        long splitStart,
        long splitLength
    ) throws IOException;
    
    /**
     * Create batch wrapper for ORC vectorized row batches
     * @param schema ORC type description for creating appropriately sized batch
     * @param batchSize Maximum number of rows per batch
     * @return Batch wrapper containing initialized VectorizedRowBatch
     */
    public OrcNoHiveBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize);
    
    /**
     * Read next batch of data from ORC record reader
     * @param reader ORC record reader to read from
     * @param rowBatch Vectorized row batch to populate with data
     * @return true if batch was populated with data, false if end of data
     * @throws IOException if read operation fails
     */
    public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException;
}

Batch Wrapper

Wrapper class for ORC VectorizedRowBatch that provides size information and batch access.

/**
 * Wrapper for ORC VectorizedRowBatch providing additional functionality
 * Implements OrcVectorizedBatchWrapper interface for batch management
 */
public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
    
    /**
     * Create batch wrapper for the given VectorizedRowBatch
     * @param batch ORC vectorized row batch to wrap
     */
    public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);
    
    /**
     * Get the wrapped ORC vectorized row batch
     * @return VectorizedRowBatch instance
     */
    public VectorizedRowBatch getBatch();
    
    /**
     * Get the number of rows currently in the batch
     * @return Current row count in the batch
     */
    public int size();
}

Usage Examples:

import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
import org.apache.orc.TypeDescription;
import org.apache.orc.RecordReader;

// Create ORC schema
TypeDescription schema = TypeDescription.fromString(
    "struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>"
);

// Configure Hadoop settings
Configuration conf = new Configuration();
conf.set("orc.compress", "ZLIB");
conf.setBoolean("orc.use.zerocopy", true);

// Create shim instance
OrcNoHiveShim shim = new OrcNoHiveShim();

// Create record reader for entire file
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/users.orc");
int[] selectedFields = {0, 1, 2, 3, 4}; // Read all fields
List<OrcFilters.Predicate> predicates = Arrays.asList(
    OrcFilters.greaterThan("age", 18),
    OrcFilters.isNotNull("email")
);

RecordReader reader = shim.createRecordReader(
    conf,
    schema,
    selectedFields,
    predicates,
    filePath,
    0,                    // Start at beginning
    Long.MAX_VALUE       // Read entire file
);

// Create batch wrapper
OrcNoHiveBatchWrapper batchWrapper = shim.createBatchWrapper(schema, 2048);
VectorizedRowBatch batch = batchWrapper.getBatch();

// Read data in batches
while (shim.nextBatch(reader, batch)) {
    System.out.println("Read batch with " + batch.size + " rows");
    
    // Process batch data
    for (int i = 0; i < batch.size; i++) {
        if (!batch.cols[0].isNull[i]) {
            long id = ((LongColumnVector) batch.cols[0]).vector[i];
            // Process row...
        }
    }
    
    // Reset batch for next read
    batch.reset();
}

reader.close();

Record Reader Operations

The shim creates ORC RecordReader instances with advanced configuration:

// Create reader with split-specific configuration
public RecordReader createAdvancedReader(
    Configuration conf,
    TypeDescription schema,
    Path filePath,
    long splitStart,
    long splitLength) throws IOException {
    
    OrcNoHiveShim shim = new OrcNoHiveShim();
    
    // Configure column projection (read only columns 0, 2, 4)
    int[] selectedFields = {0, 2, 4};
    
    // Configure predicate pushdown
    List<OrcFilters.Predicate> predicates = Arrays.asList(
        OrcFilters.between("timestamp_col", startTime, endTime),
        OrcFilters.in("status", Arrays.asList("ACTIVE", "PENDING"))
    );
    
    return shim.createRecordReader(
        conf, schema, selectedFields, predicates,
        filePath, splitStart, splitLength
    );
}

Batch Processing with Shim

import org.apache.flink.orc.nohive.vector.OrcNoHiveBatchWrapper;

// Process ORC file with custom batch size and error handling
public long processOrcFile(Path filePath, TypeDescription schema) throws IOException {
    OrcNoHiveShim shim = new OrcNoHiveShim();
    long totalRows = 0;
    
    try {
        // Create reader
        RecordReader reader = shim.createRecordReader(
            new Configuration(), schema, null, null,
            filePath, 0, Long.MAX_VALUE
        );
        
        // Create larger batch for better throughput
        OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 4096);
        VectorizedRowBatch batch = wrapper.getBatch();
        
        // Process all batches
        while (shim.nextBatch(reader, batch)) {
            totalRows += batch.size;
            
            // Log progress every 100K rows
            if (totalRows % 100000 == 0) {
                System.out.println("Processed " + totalRows + " rows");
            }
            
            // Process batch data here
            processBatch(batch);
            
            // Reset for next batch
            batch.reset();
        }
        
        reader.close();
        
    } catch (IOException e) {
        System.err.println("Error processing ORC file: " + e.getMessage());
        throw e;
    }
    
    return totalRows;
}

ORC File Structure and Metadata

Schema Handling

import org.apache.orc.TypeDescription;

// Parse ORC schema from string
TypeDescription schema = TypeDescription.fromString(
    "struct<" +
        "user_id:bigint," +
        "profile:struct<name:string,age:int>," +
        "tags:array<string>," +
        "metrics:map<string,double>" +
    ">"
);

// Inspect schema structure
System.out.println("Root type: " + schema.getCategory());
System.out.println("Field count: " + schema.getChildren().size());

for (int i = 0; i < schema.getChildren().size(); i++) {
    TypeDescription field = schema.getChildren().get(i);
    String fieldName = schema.getFieldNames().get(i);
    System.out.println("Field " + i + ": " + fieldName + " (" + field.getCategory() + ")");
}

File Split Processing

// Process specific byte range of large ORC file
public void processFileSplit(Path filePath, long splitStart, long splitLength) throws IOException {
    TypeDescription schema = getSchemaFromFile(filePath);
    OrcNoHiveShim shim = new OrcNoHiveShim();
    
    // Create reader for specific split
    RecordReader reader = shim.createRecordReader(
        new Configuration(),
        schema,
        null,           // Read all columns
        null,           // No predicates
        filePath,
        splitStart,     // Start byte offset
        splitLength     // Bytes to read
    );
    
    OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);
    VectorizedRowBatch batch = wrapper.getBatch();
    
    while (shim.nextBatch(reader, batch)) {
        System.out.println("Split batch: " + batch.size + " rows");
        // Process split data
    }
    
    reader.close();
}

Configuration Options

ORC Reader Configuration

Configuration conf = new Configuration();

// Performance settings
conf.setBoolean("orc.use.zerocopy", true);        // Enable zero-copy reads
conf.setInt("orc.row.batch.size", 2048);          // Rows per batch
conf.setBoolean("orc.skip.corrupt.data", false);  // Fail on corrupt data
conf.setBoolean("orc.tolerate.missing.schema", false); // Strict schema validation

// Compression settings
conf.set("orc.compress", "ZLIB");                 // Compression algorithm
conf.setInt("orc.compress.size", 262144);         // 256KB compression blocks

// Memory settings  
conf.setLong("orc.max.file.length", 1024 * 1024 * 1024L); // 1GB max file size
conf.setInt("orc.buffer.size", 262144);           // 256KB I/O buffer

// Create shim with configuration
OrcNoHiveShim shim = new OrcNoHiveShim();
RecordReader reader = shim.createRecordReader(conf, schema, /* other params */);

Predicate Configuration

import org.apache.flink.orc.OrcFilters;

// Configure complex predicates for pushdown
List<OrcFilters.Predicate> complexPredicates = Arrays.asList(
    // Date range filter
    OrcFilters.between("created_date", 
        Date.valueOf("2023-01-01"), 
        Date.valueOf("2023-12-31")),
    
    // Numeric comparisons
    OrcFilters.and(
        OrcFilters.greaterThanEquals("age", 18),
        OrcFilters.lessThan("age", 65)
    ),
    
    // String operations
    OrcFilters.or(
        OrcFilters.startsWith("email", "admin@"),
        OrcFilters.in("role", Arrays.asList("admin", "moderator"))
    ),
    
    // Null handling
    OrcFilters.isNotNull("last_login"),
    
    // Complex logical combinations
    OrcFilters.or(
        OrcFilters.and(
            OrcFilters.equals("status", "premium"),
            OrcFilters.greaterThan("subscription_end", new Date())
        ),
        OrcFilters.equals("status", "free")
    )
);

Stripe and Split Management

Stripe-Level Processing

import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.StripeInformation;

// Analyze file stripes for optimal split planning
public void analyzeOrcStripes(Path filePath) throws IOException {
    Configuration conf = new Configuration();
    
    // Open ORC file reader
    Reader orcReader = OrcFile.createReader(
        new org.apache.hadoop.fs.Path(filePath.toUri()),
        OrcFile.readerOptions(conf)
    );
    
    // Examine stripe structure
    List<StripeInformation> stripes = orcReader.getStripes();
    System.out.println("File has " + stripes.size() + " stripes");
    
    for (int i = 0; i < stripes.size(); i++) {
        StripeInformation stripe = stripes.get(i);
        System.out.println("Stripe " + i + ":");
        System.out.println("  Offset: " + stripe.getOffset());
        System.out.println("  Length: " + stripe.getLength());  
        System.out.println("  Rows: " + stripe.getNumberOfRows());
        System.out.println("  Data Length: " + stripe.getDataLength());
    }
    
    orcReader.close();
}

Error Handling and Recovery

Robust Reading Pattern

import java.util.concurrent.TimeUnit;

public class RobustOrcReader {
    private static final int MAX_RETRIES = 3;
    private static final long RETRY_DELAY_MS = 1000;
    
    public long readOrcFileWithRetry(Path filePath, TypeDescription schema) {
        OrcNoHiveShim shim = new OrcNoHiveShim();
        long totalRows = 0;
        int retryCount = 0;
        
        while (retryCount < MAX_RETRIES) {
            try {
                RecordReader reader = shim.createRecordReader(
                    new Configuration(), schema, null, null,
                    filePath, 0, Long.MAX_VALUE
                );
                
                OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);
                VectorizedRowBatch batch = wrapper.getBatch();
                
                while (shim.nextBatch(reader, batch)) {
                    totalRows += batch.size;
                    batch.reset();
                }
                
                reader.close();
                return totalRows; // Success
                
            } catch (IOException e) {
                retryCount++;
                System.err.println("Read attempt " + retryCount + " failed: " + e.getMessage());
                
                if (retryCount >= MAX_RETRIES) {
                    throw new RuntimeException("Failed to read ORC file after " + MAX_RETRIES + " attempts", e);
                }
                
                // Wait before retry
                try {
                    TimeUnit.MILLISECONDS.sleep(RETRY_DELAY_MS * retryCount);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Interrupted during retry delay", ie);
                }
            }
        }
        
        return totalRows;
    }
}

Schema Validation

// Validate schema compatibility before reading
public boolean validateSchema(Path filePath, TypeDescription expectedSchema) {
    try {
        Configuration conf = new Configuration();
        Reader orcReader = OrcFile.createReader(
            new org.apache.hadoop.fs.Path(filePath.toUri()),
            OrcFile.readerOptions(conf)
        );
        
        TypeDescription fileSchema = orcReader.getSchema();
        
        // Compare schemas
        if (!isSchemaCompatible(fileSchema, expectedSchema)) {
            System.err.println("Schema mismatch:");
            System.err.println("Expected: " + expectedSchema);
            System.err.println("Found: " + fileSchema);
            return false;
        }
        
        orcReader.close();
        return true;
        
    } catch (IOException e) {
        System.err.println("Failed to read schema from file: " + e.getMessage());  
        return false;
    }
}

private boolean isSchemaCompatible(TypeDescription fileSchema, TypeDescription expectedSchema) {
    // Implement schema compatibility logic
    return fileSchema.toString().equals(expectedSchema.toString());
}

Performance Optimization

Batch Size Tuning

// Optimize batch size based on data characteristics
public int calculateOptimalBatchSize(TypeDescription schema, long availableMemory) {
    // Estimate bytes per row based on schema
    long estimatedBytesPerRow = estimateRowSize(schema);
    
    // Target 10% of available memory for batch
    long targetBatchMemory = availableMemory / 10;
    
    // Calculate optimal batch size
    int optimalBatchSize = (int) (targetBatchMemory / estimatedBytesPerRow);
    
    // Clamp to reasonable bounds
    return Math.max(512, Math.min(optimalBatchSize, 8192));
}

private long estimateRowSize(TypeDescription schema) {
    // Simplified row size estimation
    long size = 0;
    for (TypeDescription child : schema.getChildren()) {
        switch (child.getCategory()) {
            case BOOLEAN:
            case BYTE:
                size += 1;
                break;
            case SHORT:
                size += 2;
                break;
            case INT:
            case FLOAT:
                size += 4;
                break;
            case LONG:
            case DOUBLE:
            case DATE:
            case TIMESTAMP:
                size += 8;
                break;
            case STRING:
            case VARCHAR:
            case CHAR:
                size += 50; // Average string length estimate
                break;
            case DECIMAL:
                size += 16; // Decimal storage estimate
                break;
            default:
                size += 32; // Complex type estimate
        }
    }
    return size;
}