Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
—
The ORC format provides high-performance columnar reading through the OrcColumnarRowInputFormat, enabling vectorized processing with partition support, column projection, and statistics reporting.
public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT>
implements FileBasedStatisticsReportableInputFormat {
public OrcColumnarRowInputFormat(
OrcShim<BatchT> shim,
Configuration hadoopConfig,
TypeDescription schema,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
int batchSize,
ColumnBatchFactory<BatchT, SplitT> batchFactory,
TypeInformation<RowData> producedTypeInfo
);
public OrcReaderBatch<RowData, BatchT> createReaderBatch(
SplitT split,
OrcVectorizedBatchWrapper<BatchT> orcBatch,
Pool.Recycler<OrcReaderBatch<RowData, BatchT>> recycler,
int batchSize
);
public TypeInformation<RowData> getProducedType();
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT>
createPartitionedFormat(
OrcShim<VectorizedRowBatch> shim,
Configuration hadoopConfig,
RowType tableType,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
int batchSize,
Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory
);import org.apache.flink.orc.OrcColumnarRowInputFormat;
import org.apache.flink.orc.shim.OrcShim;
import org.apache.flink.table.types.logical.*;
import org.apache.hadoop.conf.Configuration;
// Define table schema
RowType tableType = RowType.of(
new LogicalType[] {
new BigIntType(), // id
new VarCharType(255), // name
new IntType(), // age
new BooleanType() // active
},
new String[] {"id", "name", "age", "active"}
);
// Configure reading
Configuration hadoopConfig = new Configuration();
int[] selectedFields = {0, 1, 2, 3}; // All fields
List<OrcFilters.Predicate> predicates = new ArrayList<>();
int batchSize = 1024;
// Create input format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
OrcColumnarRowInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
hadoopConfig,
tableType,
Collections.emptyList(), // No partitions
null, // No partition extractor
selectedFields,
predicates,
batchSize,
TypeConversions::fromLogicalToDataType
);// Read only specific columns (id, name)
int[] projectedFields = {0, 1}; // Only id and name columns
RowType projectedType = RowType.of(
new LogicalType[] {
new BigIntType(), // id
new VarCharType(255) // name
},
new String[] {"id", "name"}
);
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> projectedFormat =
OrcColumnarRowInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
hadoopConfig,
tableType, // Full table type
Collections.emptyList(),
null,
projectedFields, // Only selected fields
predicates,
batchSize,
TypeConversions::fromLogicalToDataType
);import org.apache.flink.connector.file.table.PartitionFieldExtractor;
// Define partitioned table
List<String> partitionKeys = Arrays.asList("year", "month");
RowType partitionedTableType = RowType.of(
new LogicalType[] {
new BigIntType(), // id
new VarCharType(255), // name
new IntType(), // age
new BooleanType(), // active
new IntType(), // year (partition)
new IntType() // month (partition)
},
new String[] {"id", "name", "age", "active", "year", "month"}
);
// Partition field extractor
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
// Extract partition values from file path
String path = split.path().toString();
if ("year".equals(fieldName)) {
// Extract year from path like /data/year=2023/month=01/file.orc
return extractYearFromPath(path);
} else if ("month".equals(fieldName)) {
return extractMonthFromPath(path);
}
return null;
};
// Create partitioned format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> partitionedFormat =
OrcColumnarRowInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
hadoopConfig,
partitionedTableType,
partitionKeys,
extractor,
selectedFields,
predicates,
batchSize,
TypeConversions::fromLogicalToDataType
);import org.apache.flink.orc.OrcFilters;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
// Create filter predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
// age > 25
new OrcFilters.Not(new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)),
// active = true
new OrcFilters.Equals("active", PredicateLeaf.Type.BOOLEAN, true),
// name IS NOT NULL
new OrcFilters.Not(new OrcFilters.IsNull("name", PredicateLeaf.Type.STRING))
);
// Use predicates in format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> filteredFormat =
OrcColumnarRowInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
hadoopConfig,
tableType,
Collections.emptyList(),
null,
selectedFields,
predicates, // Apply filters
batchSize,
TypeConversions::fromLogicalToDataType
);@FunctionalInterface
public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {
VectorizedColumnBatch create(SplitT split, BatchT batch);
}Custom batch factory for specialized processing:
ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory =
(split, orcBatch) -> {
// Create Flink column vectors from ORC vectors
ColumnVector[] vectors = new ColumnVector[selectedFields.length];
for (int i = 0; i < vectors.length; i++) {
int fieldIndex = selectedFields[i];
LogicalType fieldType = tableType.getTypeAt(fieldIndex);
vectors[i] = AbstractOrcColumnVector.createFlinkVector(
orcBatch.cols[i],
fieldType
);
}
return new VectorizedColumnBatch(vectors);
};public TableStats reportStatistics(List<Path> files, DataType producedDataType);Extract statistics from ORC files:
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.plan.stats.TableStats;
List<Path> orcFiles = Arrays.asList(
new Path("/data/file1.orc"),
new Path("/data/file2.orc")
);
// Get statistics from ORC metadata
TableStats stats = inputFormat.reportStatistics(orcFiles, dataType);
System.out.println("Row count: " + stats.getRowCount());
System.out.println("Column stats: " + stats.getColumnStats());Utility class for extracting comprehensive table statistics from ORC files.
public class OrcFormatStatisticsReportUtil {
public static TableStats getTableStatistics(
List<Path> files,
DataType producedDataType
);
public static TableStats getTableStatistics(
List<Path> files,
DataType producedDataType,
Configuration hadoopConfig
);
}Usage Examples:
import org.apache.flink.orc.util.OrcFormatStatisticsReportUtil;
import org.apache.hadoop.conf.Configuration;
// Get statistics with default configuration
List<Path> orcFiles = Arrays.asList(
new Path("/warehouse/users/part1.orc"),
new Path("/warehouse/users/part2.orc")
);
TableStats stats = OrcFormatStatisticsReportUtil.getTableStatistics(
orcFiles,
producedDataType
);
// Get statistics with custom Hadoop configuration
Configuration customConfig = new Configuration();
customConfig.set("orc.read.buffer.size", "262144");
customConfig.setBoolean("orc.read.include.file.footer", true);
TableStats detailedStats = OrcFormatStatisticsReportUtil.getTableStatistics(
orcFiles,
producedDataType,
customConfig
);
// Access statistics information
System.out.println("Total row count: " + stats.getRowCount());
Map<String, ColumnStats> columnStats = stats.getColumnStats();
for (Map.Entry<String, ColumnStats> entry : columnStats.entrySet()) {
String columnName = entry.getKey();
ColumnStats colStats = entry.getValue();
System.out.println(columnName + " - Null count: " + colStats.getNullCount());
System.out.println(columnName + " - Min: " + colStats.getMin());
System.out.println(columnName + " - Max: " + colStats.getMax());
}public abstract class OrcReaderBatch<RowDataT, BatchT> implements Pool.Recyclable {
public abstract RecordIterator<RowDataT> convertAndGetIterator(
OrcVectorizedBatchWrapper<BatchT> orcBatch,
long startingOffset
);
}The reader processes data in vectorized batches for optimal performance:
createReaderBatch() creates vectorized column batchconvertAndGetIterator() provides row-by-row access// Smaller batches for memory-constrained environments
int smallBatchSize = 512;
// Larger batches for high-throughput scenarios
int largeBatchSize = 4096;
// Default ORC batch size
int defaultBatchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 1024Configuration hadoopConfig = new Configuration();
// ORC reader settings
hadoopConfig.setBoolean("orc.use.zerocopy", true);
hadoopConfig.setInt("orc.read.buffer.size", 262144); // 256KB
hadoopConfig.setBoolean("orc.read.include.file.footer", true);
// Compression buffer settings
hadoopConfig.setInt("io.compression.codec.lzo.buffersize", 65536);
hadoopConfig.setInt("io.compression.codec.snappy.buffersize", 65536);import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.file.src.FileSource;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create file source with ORC format
FileSource<RowData> orcSource = FileSource
.forBulkFileFormat(inputFormat, new Path("/path/to/orc/files"))
.build();
DataStream<RowData> orcStream = env.fromSource(
orcSource,
WatermarkStrategy.noWatermarks(),
"ORC Source"
);
// Process the stream
orcStream
.filter(row -> row.getInt(2) > 25) // age > 25
.map(row -> row.getString(1)) // extract name
.print();Core utility class for ORC split reading operations and type conversions.
public class OrcSplitReaderUtil {
public static <SplitT extends FileSourceSplit> Function<SplitT, RecordIterator<RowData>>
genPartColumnarRowReader(
Configuration hadoopConfig,
String[] fullFieldNames,
DataType[] fullFieldTypes,
TypeInformation<RowData> typeInfo,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
int batchSize,
OrcShim<VectorizedRowBatch> shim,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor
);
public static int[] getSelectedOrcFields(
RowType tableType,
int[] selectedFields,
List<String> partitionKeys
);
public static String[] getNonPartNames(
String[] fullNames,
List<String> partitionKeys
);
public static String[] getNonPartNames(
RowType rowType,
List<String> partitionKeys
);
public static TypeDescription convertToOrcTypeWithPart(
RowType rowType,
List<String> partitionKeys
);
public static TypeDescription convertToOrcTypeWithPart(
String[] fieldNames,
DataType[] fieldTypes,
List<String> partitionKeys
);
public static TypeDescription logicalTypeToOrcType(LogicalType logicalType);
}Usage Examples:
import org.apache.flink.orc.OrcSplitReaderUtil;
// Get selected ORC field indices
int[] selectedOrcFields = OrcSplitReaderUtil.getSelectedOrcFields(
tableType,
selectedFields,
partitionKeys
);
// Convert Flink types to ORC schema
TypeDescription orcSchema = OrcSplitReaderUtil.convertToOrcTypeWithPart(
tableType,
partitionKeys
);
// Convert individual logical type
LogicalType stringType = new VarCharType(255);
TypeDescription orcStringType = OrcSplitReaderUtil.logicalTypeToOrcType(stringType);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-orc