or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md
tile.json

columnar-reading.mddocs/

Columnar Reading

Helper utilities for creating columnar input formats and split readers with partition support for efficient ORC file reading. Provides high-performance vectorized reading with predicate pushdown and column projection capabilities.

Capabilities

Columnar Row Input Format

Helper class for creating partitioned ORC columnar input formats without Hive dependencies.

/**
 * Helper class to create OrcColumnarRowFileInputFormat for no-hive usage
 * Provides static factory methods for creating partitioned input formats
 */
public class OrcNoHiveColumnarRowInputFormat {
    
    /**
     * Create a partitioned OrcColumnarRowFileInputFormat where partition columns 
     * can be generated by split metadata
     * @param hadoopConfig Hadoop configuration for ORC reading
     * @param tableType Row type describing the complete table schema  
     * @param partitionKeys List of partition column names
     * @param extractor Extracts partition values from file splits
     * @param selectedFields Array of field indices to read from files
     * @param conjunctPredicates List of filter predicates for pushdown
     * @param batchSize Number of rows per vectorized batch
     * @return Configured columnar input format for partitioned reading
     */
    public static <SplitT extends FileSourceSplit> 
        OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
            Configuration hadoopConfig,
            RowType tableType,
            List<String> partitionKeys,
            PartitionFieldExtractor<SplitT> extractor,
            int[] selectedFields,
            List<OrcFilters.Predicate> conjunctPredicates,
            int batchSize
        );
}

Usage Examples:

import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.connector.file.src.FileSourceSplit;

// Define table schema with partitioned columns
RowType tableType = RowType.of(
    new LogicalType[] {
        new BigIntType(),      // user_id
        new VarCharType(255),  // name  
        new VarCharType(100),  // email
        new IntType(),         // age
        new VarCharType(50),   // country (partition)
        new VarCharType(10)    // year (partition)
    },
    new String[] {"user_id", "name", "email", "age", "country", "year"}
);

// Define partition keys
List<String> partitionKeys = Arrays.asList("country", "year");

// Select only specific fields to read (column projection)
int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year

// Create partition extractor
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
    // Extract partition values from file path like /data/country=US/year=2023/file.orc
    String path = split.path().toString();
    if (fieldName.equals("country")) {
        return extractFromPath(path, "country=");
    } else if (fieldName.equals("year")) {
        return extractFromPath(path, "year=");
    }
    return null;
};

// Create filter predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
    OrcFilters.equals("age", 25),
    OrcFilters.lessThan("user_id", 10000L)
);

// Create columnar input format
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat = 
    OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
        new Configuration(),
        tableType,
        partitionKeys, 
        extractor,
        selectedFields,
        predicates,
        1024  // batch size
    );

Split Reader Utility

Utility for generating ORC split readers with partition support and predicate pushdown.

/**
 * Utility for generating OrcColumnarRowSplitReader instances
 * Provides factory methods for creating split readers with partition support
 */
public class OrcNoHiveSplitReaderUtil {
    
    /**
     * Generate partitioned columnar row reader for ORC files
     * @param conf Hadoop configuration
     * @param fullFieldNames Complete array of field names in table schema
     * @param fullFieldTypes Complete array of field types in table schema  
     * @param partitionSpec Map of partition column names to values
     * @param selectedFields Array of field indices to read from files
     * @param conjunctPredicates List of filter predicates for pushdown
     * @param batchSize Number of rows per vectorized batch
     * @param path Path to the ORC file to read
     * @param splitStart Byte offset where split starts in file
     * @param splitLength Number of bytes to read in this split
     * @return Configured columnar row split reader
     * @throws IOException if reader creation fails
     */
    public static OrcColumnarRowSplitReader<VectorizedRowBatch> genPartColumnarRowReader(
        Configuration conf,
        String[] fullFieldNames,
        DataType[] fullFieldTypes,
        Map<String, Object> partitionSpec,
        int[] selectedFields,
        List<OrcFilters.Predicate> conjunctPredicates,
        int batchSize,
        org.apache.flink.core.fs.Path path,
        long splitStart,
        long splitLength
    ) throws IOException;
}

Usage Examples:

import org.apache.flink.orc.nohive.OrcNoHiveSplitReaderUtil;
import org.apache.flink.core.fs.Path;

// Define complete table schema
String[] fieldNames = {"user_id", "name", "email", "age", "country", "year"};
DataType[] fieldTypes = {
    DataTypes.BIGINT(),
    DataTypes.VARCHAR(255),
    DataTypes.VARCHAR(100), 
    DataTypes.INT(),
    DataTypes.VARCHAR(50),   // partition column
    DataTypes.VARCHAR(10)    // partition column
};

// Define partition values for this split
Map<String, Object> partitionSpec = new HashMap<>();
partitionSpec.put("country", "US");
partitionSpec.put("year", "2023");

// Select fields to read (excluding age for performance)
int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year

// Create filter predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
    OrcFilters.lessThan("user_id", 50000L)
);

// Create split reader
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/country=US/year=2023/part-00001.orc");
OrcColumnarRowSplitReader<VectorizedRowBatch> reader = 
    OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
        new Configuration(),
        fieldNames,
        fieldTypes,
        partitionSpec,
        selectedFields,
        predicates,
        2048,        // batch size
        filePath,
        0,           // split start
        1024 * 1024  // split length (1MB)
    );

// Read data in batches
VectorizedColumnBatch batch;
while ((batch = reader.nextBatch()) != null) {
    // Process vectorized batch
    for (int i = 0; i < batch.getNumRows(); i++) {
        // Access column data through vectors
        long userId = batch.getColumn(0).getLong(i);
        String name = batch.getColumn(1).getString(i);
        // ... process row
    }
}
reader.close();

Column Batch Factory

The input format uses a ColumnBatchFactory to create Flink VectorizedColumnBatch instances from ORC VectorizedRowBatch:

/**
 * Factory interface for creating column batches from ORC row batches
 * Used internally by input formats to convert ORC vectors to Flink vectors
 */
interface ColumnBatchFactory<T, SplitT extends FileSourceSplit> {
    /**
     * Create VectorizedColumnBatch from ORC VectorizedRowBatch
     * @param split File split containing partition metadata
     * @param rowBatch ORC vectorized row batch with column data
     * @return Flink VectorizedColumnBatch for processing
     */
    VectorizedColumnBatch createBatch(SplitT split, T rowBatch);
}

Predicate Pushdown

ORC filters enable predicate pushdown for improved performance:

import org.apache.flink.orc.OrcFilters;

// Comparison predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
    OrcFilters.equals("status", "active"),
    OrcFilters.lessThan("age", 65),
    OrcFilters.greaterThan("salary", 50000.0),
    OrcFilters.lessThanEquals("score", 100),
    OrcFilters.greaterThanEquals("rating", 4.0),
    OrcFilters.isNull("deleted_at"),
    OrcFilters.isNotNull("email"),
    OrcFilters.between("created_date", startDate, endDate)
);

// String predicates  
List<OrcFilters.Predicate> stringPredicates = Arrays.asList(
    OrcFilters.startsWith("name", "John"),
    OrcFilters.in("country", Arrays.asList("US", "CA", "UK"))
);

// Logical combinations
OrcFilters.Predicate combined = OrcFilters.and(
    OrcFilters.equals("status", "active"),
    OrcFilters.or(
        OrcFilters.greaterThan("age", 18),
        OrcFilters.isNull("age")
    )
);

Column Projection

Optimize performance by reading only required columns:

// Table has 10 columns but only need 3
String[] allFields = {"id", "name", "email", "age", "salary", "dept", "manager", "created", "updated", "status"};

// Project only required columns (indices 0, 1, 2, 9)
int[] selectedFields = {0, 1, 2, 9}; // id, name, email, status

// This reduces I/O and memory usage significantly
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> format = 
    OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
        hadoopConfig,
        fullTableType,
        partitionKeys,
        extractor, 
        selectedFields,  // Only read these columns
        predicates,
        batchSize
    );

Partition Handling

Handle partitioned ORC datasets efficiently:

// Partition extractor implementation
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
    String path = split.path().toString();
    
    // Parse Hive-style partition paths: /table/year=2023/month=12/file.orc
    Pattern pattern = Pattern.compile(fieldName + "=([^/]+)");
    Matcher matcher = pattern.matcher(path);
    
    if (matcher.find()) {
        String value = matcher.group(1);
        
        // Convert string value to appropriate type
        switch (fieldType.getTypeRoot()) {
            case INTEGER:
                return Integer.parseInt(value);
            case BIGINT:
                return Long.parseLong(value);
            case VARCHAR:
                return value;
            case DATE:
                return Date.valueOf(value);
            default:
                return value;
        }
    }
    return null;
};

Performance Optimization

Key strategies for optimal columnar reading performance:

  1. Column Projection: Only read required columns using selectedFields
  2. Predicate Pushdown: Use conjunctPredicates to filter at the ORC level
  3. Batch Size Tuning: Adjust batchSize based on memory and processing requirements
  4. Partition Pruning: Let Flink's partition pruning eliminate unnecessary splits
  5. Compression: Configure ORC compression for better I/O performance
// Optimized configuration
Configuration optimizedConfig = new Configuration();
optimizedConfig.setBoolean("orc.use.zerocopy", true);
optimizedConfig.setInt("orc.row.batch.size", 2048);  // Larger batches for better throughput
optimizedConfig.set("orc.compress", "ZSTD");          // Fast compression

Error Handling

Handle common reading errors:

try {
    OrcColumnarRowSplitReader<VectorizedRowBatch> reader = 
        OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(/* parameters */);
    
    VectorizedColumnBatch batch;
    while ((batch = reader.nextBatch()) != null) {
        // Process batch
    }
} catch (IOException e) {
    // Handle file system errors, corrupt files, or read failures
    logger.error("Failed to read ORC file: " + path, e);
} catch (IllegalArgumentException e) {
    // Handle schema mismatches or invalid column selections
    logger.error("Invalid schema or column selection", e);
} finally {
    if (reader != null) {
        reader.close();
    }
}