CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-orc

Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files

Pending
Overview
Eval results
Files

columnar-reading.mddocs/

Columnar Reading

The ORC format provides high-performance columnar reading through the OrcColumnarRowInputFormat, enabling vectorized processing with partition support, column projection, and statistics reporting.

Input Format

public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit> 
        extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> 
        implements FileBasedStatisticsReportableInputFormat {
    
    public OrcColumnarRowInputFormat(
        OrcShim<BatchT> shim,
        Configuration hadoopConfig,
        TypeDescription schema,
        int[] selectedFields,
        List<OrcFilters.Predicate> conjunctPredicates,
        int batchSize,
        ColumnBatchFactory<BatchT, SplitT> batchFactory,
        TypeInformation<RowData> producedTypeInfo
    );
    
    public OrcReaderBatch<RowData, BatchT> createReaderBatch(
        SplitT split,
        OrcVectorizedBatchWrapper<BatchT> orcBatch,
        Pool.Recycler<OrcReaderBatch<RowData, BatchT>> recycler,
        int batchSize
    );
    
    public TypeInformation<RowData> getProducedType();
    public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}

Factory Methods

public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> 
    createPartitionedFormat(
        OrcShim<VectorizedRowBatch> shim,
        Configuration hadoopConfig,
        RowType tableType,
        List<String> partitionKeys,
        PartitionFieldExtractor<SplitT> extractor,
        int[] selectedFields,
        List<OrcFilters.Predicate> conjunctPredicates,
        int batchSize,
        Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory
    );

Usage Examples

Basic Columnar Reading

import org.apache.flink.orc.OrcColumnarRowInputFormat;
import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.table.types.logical.*;
import org.apache.hadoop.conf.Configuration;

// Define table schema
RowType tableType = RowType.of(
    new LogicalType[] {
        new BigIntType(),      // id
        new VarCharType(255),  // name
        new IntType(),         // age
        new BooleanType()      // active
    },
    new String[] {"id", "name", "age", "active"}
);

// Configure reading
Configuration hadoopConfig = new Configuration();
int[] selectedFields = {0, 1, 2, 3}; // All fields
List<OrcFilters.Predicate> predicates = new ArrayList<>();
int batchSize = 1024;

// Create input format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat = 
    OrcColumnarRowInputFormat.createPartitionedFormat(
        OrcShim.defaultShim(),
        hadoopConfig,
        tableType,
        Collections.emptyList(), // No partitions
        null, // No partition extractor
        selectedFields,
        predicates,
        batchSize,
        TypeConversions::fromLogicalToDataType
    );

Reading with Column Projection

// Read only specific columns (id, name)
int[] projectedFields = {0, 1}; // Only id and name columns

RowType projectedType = RowType.of(
    new LogicalType[] {
        new BigIntType(),      // id
        new VarCharType(255)   // name
    },
    new String[] {"id", "name"}
);

OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> projectedFormat = 
    OrcColumnarRowInputFormat.createPartitionedFormat(
        OrcShim.defaultShim(),
        hadoopConfig,
        tableType, // Full table type
        Collections.emptyList(),
        null,
        projectedFields, // Only selected fields
        predicates,
        batchSize,
        TypeConversions::fromLogicalToDataType
    );

Reading Partitioned Data

import org.apache.flink.connector.file.table.PartitionFieldExtractor;

// Define partitioned table
List<String> partitionKeys = Arrays.asList("year", "month");

RowType partitionedTableType = RowType.of(
    new LogicalType[] {
        new BigIntType(),      // id
        new VarCharType(255),  // name
        new IntType(),         // age
        new BooleanType(),     // active
        new IntType(),         // year (partition)
        new IntType()          // month (partition)
    },
    new String[] {"id", "name", "age", "active", "year", "month"}
);

// Partition field extractor
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
    // Extract partition values from file path
    String path = split.path().toString();
    if ("year".equals(fieldName)) {
        // Extract year from path like /data/year=2023/month=01/file.orc
        return extractYearFromPath(path);
    } else if ("month".equals(fieldName)) {
        return extractMonthFromPath(path);
    }
    return null;
};

// Create partitioned format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> partitionedFormat = 
    OrcColumnarRowInputFormat.createPartitionedFormat(
        OrcShim.defaultShim(),
        hadoopConfig,
        partitionedTableType,
        partitionKeys,
        extractor,
        selectedFields,
        predicates,
        batchSize,
        TypeConversions::fromLogicalToDataType
    );

Reading with Predicates

import org.apache.flink.orc.OrcFilters;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;

// Create filter predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
    // age > 25
    new OrcFilters.Not(new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)),
    // active = true  
    new OrcFilters.Equals("active", PredicateLeaf.Type.BOOLEAN, true),
    // name IS NOT NULL
    new OrcFilters.Not(new OrcFilters.IsNull("name", PredicateLeaf.Type.STRING))
);

// Use predicates in format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> filteredFormat = 
    OrcColumnarRowInputFormat.createPartitionedFormat(
        OrcShim.defaultShim(),
        hadoopConfig,
        tableType,
        Collections.emptyList(),
        null,
        selectedFields,
        predicates, // Apply filters
        batchSize,
        TypeConversions::fromLogicalToDataType
    );

Column Batch Factory

@FunctionalInterface
public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {
    VectorizedColumnBatch create(SplitT split, BatchT batch);
}

Custom batch factory for specialized processing:

ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory = 
    (split, orcBatch) -> {
        // Create Flink column vectors from ORC vectors
        ColumnVector[] vectors = new ColumnVector[selectedFields.length];
        
        for (int i = 0; i < vectors.length; i++) {
            int fieldIndex = selectedFields[i];
            LogicalType fieldType = tableType.getTypeAt(fieldIndex);
            
            vectors[i] = AbstractOrcColumnVector.createFlinkVector(
                orcBatch.cols[i], 
                fieldType
            );
        }
        
        return new VectorizedColumnBatch(vectors);
    };

Statistics Reporting

public TableStats reportStatistics(List<Path> files, DataType producedDataType);

Extract statistics from ORC files:

import org.apache.flink.core.fs.Path;
import org.apache.flink.table.plan.stats.TableStats;

List<Path> orcFiles = Arrays.asList(
    new Path("/data/file1.orc"),
    new Path("/data/file2.orc")
);

// Get statistics from ORC metadata
TableStats stats = inputFormat.reportStatistics(orcFiles, dataType);

System.out.println("Row count: " + stats.getRowCount());
System.out.println("Column stats: " + stats.getColumnStats());

OrcFormatStatisticsReportUtil

Utility class for extracting comprehensive table statistics from ORC files.

public class OrcFormatStatisticsReportUtil {
    public static TableStats getTableStatistics(
        List<Path> files,
        DataType producedDataType
    );
    
    public static TableStats getTableStatistics(
        List<Path> files,
        DataType producedDataType,
        Configuration hadoopConfig
    );
}

Usage Examples:

import org.apache.flink.orc.util.OrcFormatStatisticsReportUtil;
import org.apache.hadoop.conf.Configuration;

// Get statistics with default configuration
List<Path> orcFiles = Arrays.asList(
    new Path("/warehouse/users/part1.orc"),
    new Path("/warehouse/users/part2.orc")
);

TableStats stats = OrcFormatStatisticsReportUtil.getTableStatistics(
    orcFiles,
    producedDataType
);

// Get statistics with custom Hadoop configuration
Configuration customConfig = new Configuration();
customConfig.set("orc.read.buffer.size", "262144");
customConfig.setBoolean("orc.read.include.file.footer", true);

TableStats detailedStats = OrcFormatStatisticsReportUtil.getTableStatistics(
    orcFiles,
    producedDataType,
    customConfig
);

// Access statistics information
System.out.println("Total row count: " + stats.getRowCount());
Map<String, ColumnStats> columnStats = stats.getColumnStats();
for (Map.Entry<String, ColumnStats> entry : columnStats.entrySet()) {
    String columnName = entry.getKey();
    ColumnStats colStats = entry.getValue();
    System.out.println(columnName + " - Null count: " + colStats.getNullCount());
    System.out.println(columnName + " - Min: " + colStats.getMin());
    System.out.println(columnName + " - Max: " + colStats.getMax());
}

Reader Batch Processing

public abstract class OrcReaderBatch<RowDataT, BatchT> implements Pool.Recyclable {
    public abstract RecordIterator<RowDataT> convertAndGetIterator(
        OrcVectorizedBatchWrapper<BatchT> orcBatch, 
        long startingOffset
    );
}

The reader processes data in vectorized batches for optimal performance:

  1. Batch Creation: createReaderBatch() creates vectorized column batch
  2. Vector Conversion: ORC vectors converted to Flink column vectors
  3. Iterator Generation: convertAndGetIterator() provides row-by-row access
  4. Memory Management: Batches are recyclable for memory efficiency

Performance Configuration

Batch Size Tuning

// Smaller batches for memory-constrained environments
int smallBatchSize = 512;

// Larger batches for high-throughput scenarios  
int largeBatchSize = 4096;

// Default ORC batch size
int defaultBatchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 1024

Hadoop Configuration

Configuration hadoopConfig = new Configuration();

// ORC reader settings
hadoopConfig.setBoolean("orc.use.zerocopy", true);
hadoopConfig.setInt("orc.read.buffer.size", 262144); // 256KB
hadoopConfig.setBoolean("orc.read.include.file.footer", true);

// Compression buffer settings  
hadoopConfig.setInt("io.compression.codec.lzo.buffersize", 65536);
hadoopConfig.setInt("io.compression.codec.snappy.buffersize", 65536);

Integration with DataStream API

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.file.src.FileSource;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create file source with ORC format
FileSource<RowData> orcSource = FileSource
    .forBulkFileFormat(inputFormat, new Path("/path/to/orc/files"))
    .build();

DataStream<RowData> orcStream = env.fromSource(
    orcSource, 
    WatermarkStrategy.noWatermarks(), 
    "ORC Source"
);

// Process the stream
orcStream
    .filter(row -> row.getInt(2) > 25) // age > 25
    .map(row -> row.getString(1))      // extract name
    .print();

Split Reader Utilities

OrcSplitReaderUtil

Core utility class for ORC split reading operations and type conversions.

public class OrcSplitReaderUtil {
    public static <SplitT extends FileSourceSplit> Function<SplitT, RecordIterator<RowData>> 
        genPartColumnarRowReader(
            Configuration hadoopConfig,
            String[] fullFieldNames,
            DataType[] fullFieldTypes,
            TypeInformation<RowData> typeInfo,
            int[] selectedFields,
            List<OrcFilters.Predicate> conjunctPredicates,
            int batchSize,
            OrcShim<VectorizedRowBatch> shim,
            List<String> partitionKeys,
            PartitionFieldExtractor<SplitT> extractor
        );
    
    public static int[] getSelectedOrcFields(
        RowType tableType,
        int[] selectedFields,
        List<String> partitionKeys
    );
    
    public static String[] getNonPartNames(
        String[] fullNames,
        List<String> partitionKeys
    );
    
    public static String[] getNonPartNames(
        RowType rowType,
        List<String> partitionKeys
    );
    
    public static TypeDescription convertToOrcTypeWithPart(
        RowType rowType,
        List<String> partitionKeys
    );
    
    public static TypeDescription convertToOrcTypeWithPart(
        String[] fieldNames,
        DataType[] fieldTypes,
        List<String> partitionKeys
    );
    
    public static TypeDescription logicalTypeToOrcType(LogicalType logicalType);
}

Usage Examples:

import org.apache.flink.orc.OrcSplitReaderUtil;

// Get selected ORC field indices
int[] selectedOrcFields = OrcSplitReaderUtil.getSelectedOrcFields(
    tableType,
    selectedFields,
    partitionKeys
);

// Convert Flink types to ORC schema
TypeDescription orcSchema = OrcSplitReaderUtil.convertToOrcTypeWithPart(
    tableType,
    partitionKeys
);

// Convert individual logical type
LogicalType stringType = new VarCharType(255);
TypeDescription orcStringType = OrcSplitReaderUtil.logicalTypeToOrcType(stringType);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-orc

docs

bulk-writing.md

columnar-reading.md

index.md

predicate-pushdown.md

table-api.md

vector-processing.md

tile.json