Helper utilities for creating columnar input formats and split readers with partition support for efficient ORC file reading. Provides high-performance vectorized reading with predicate pushdown and column projection capabilities.
Helper class for creating partitioned ORC columnar input formats without Hive dependencies.
/**
* Helper class to create OrcColumnarRowFileInputFormat for no-hive usage
* Provides static factory methods for creating partitioned input formats
*/
public class OrcNoHiveColumnarRowInputFormat {
/**
* Create a partitioned OrcColumnarRowFileInputFormat where partition columns
* can be generated by split metadata
* @param hadoopConfig Hadoop configuration for ORC reading
* @param tableType Row type describing the complete table schema
* @param partitionKeys List of partition column names
* @param extractor Extracts partition values from file splits
* @param selectedFields Array of field indices to read from files
* @param conjunctPredicates List of filter predicates for pushdown
* @param batchSize Number of rows per vectorized batch
* @return Configured columnar input format for partitioned reading
*/
public static <SplitT extends FileSourceSplit>
OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(
Configuration hadoopConfig,
RowType tableType,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
int batchSize
);
}Usage Examples:
import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.connector.file.src.FileSourceSplit;
// Define table schema with partitioned columns
RowType tableType = RowType.of(
new LogicalType[] {
new BigIntType(), // user_id
new VarCharType(255), // name
new VarCharType(100), // email
new IntType(), // age
new VarCharType(50), // country (partition)
new VarCharType(10) // year (partition)
},
new String[] {"user_id", "name", "email", "age", "country", "year"}
);
// Define partition keys
List<String> partitionKeys = Arrays.asList("country", "year");
// Select only specific fields to read (column projection)
int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year
// Create partition extractor
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
// Extract partition values from file path like /data/country=US/year=2023/file.orc
String path = split.path().toString();
if (fieldName.equals("country")) {
return extractFromPath(path, "country=");
} else if (fieldName.equals("year")) {
return extractFromPath(path, "year=");
}
return null;
};
// Create filter predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
OrcFilters.equals("age", 25),
OrcFilters.lessThan("user_id", 10000L)
);
// Create columnar input format
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
new Configuration(),
tableType,
partitionKeys,
extractor,
selectedFields,
predicates,
1024 // batch size
);Utility for generating ORC split readers with partition support and predicate pushdown.
/**
* Utility for generating OrcColumnarRowSplitReader instances
* Provides factory methods for creating split readers with partition support
*/
public class OrcNoHiveSplitReaderUtil {
/**
* Generate partitioned columnar row reader for ORC files
* @param conf Hadoop configuration
* @param fullFieldNames Complete array of field names in table schema
* @param fullFieldTypes Complete array of field types in table schema
* @param partitionSpec Map of partition column names to values
* @param selectedFields Array of field indices to read from files
* @param conjunctPredicates List of filter predicates for pushdown
* @param batchSize Number of rows per vectorized batch
* @param path Path to the ORC file to read
* @param splitStart Byte offset where split starts in file
* @param splitLength Number of bytes to read in this split
* @return Configured columnar row split reader
* @throws IOException if reader creation fails
*/
public static OrcColumnarRowSplitReader<VectorizedRowBatch> genPartColumnarRowReader(
Configuration conf,
String[] fullFieldNames,
DataType[] fullFieldTypes,
Map<String, Object> partitionSpec,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
int batchSize,
org.apache.flink.core.fs.Path path,
long splitStart,
long splitLength
) throws IOException;
}Usage Examples:
import org.apache.flink.orc.nohive.OrcNoHiveSplitReaderUtil;
import org.apache.flink.core.fs.Path;
// Define complete table schema
String[] fieldNames = {"user_id", "name", "email", "age", "country", "year"};
DataType[] fieldTypes = {
DataTypes.BIGINT(),
DataTypes.VARCHAR(255),
DataTypes.VARCHAR(100),
DataTypes.INT(),
DataTypes.VARCHAR(50), // partition column
DataTypes.VARCHAR(10) // partition column
};
// Define partition values for this split
Map<String, Object> partitionSpec = new HashMap<>();
partitionSpec.put("country", "US");
partitionSpec.put("year", "2023");
// Select fields to read (excluding age for performance)
int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year
// Create filter predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
OrcFilters.lessThan("user_id", 50000L)
);
// Create split reader
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/country=US/year=2023/part-00001.orc");
OrcColumnarRowSplitReader<VectorizedRowBatch> reader =
OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
new Configuration(),
fieldNames,
fieldTypes,
partitionSpec,
selectedFields,
predicates,
2048, // batch size
filePath,
0, // split start
1024 * 1024 // split length (1MB)
);
// Read data in batches
VectorizedColumnBatch batch;
while ((batch = reader.nextBatch()) != null) {
// Process vectorized batch
for (int i = 0; i < batch.getNumRows(); i++) {
// Access column data through vectors
long userId = batch.getColumn(0).getLong(i);
String name = batch.getColumn(1).getString(i);
// ... process row
}
}
reader.close();The input format uses a ColumnBatchFactory to create Flink VectorizedColumnBatch instances from ORC VectorizedRowBatch:
/**
* Factory interface for creating column batches from ORC row batches
* Used internally by input formats to convert ORC vectors to Flink vectors
*/
interface ColumnBatchFactory<T, SplitT extends FileSourceSplit> {
/**
* Create VectorizedColumnBatch from ORC VectorizedRowBatch
* @param split File split containing partition metadata
* @param rowBatch ORC vectorized row batch with column data
* @return Flink VectorizedColumnBatch for processing
*/
VectorizedColumnBatch createBatch(SplitT split, T rowBatch);
}ORC filters enable predicate pushdown for improved performance:
import org.apache.flink.orc.OrcFilters;
// Comparison predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
OrcFilters.equals("status", "active"),
OrcFilters.lessThan("age", 65),
OrcFilters.greaterThan("salary", 50000.0),
OrcFilters.lessThanEquals("score", 100),
OrcFilters.greaterThanEquals("rating", 4.0),
OrcFilters.isNull("deleted_at"),
OrcFilters.isNotNull("email"),
OrcFilters.between("created_date", startDate, endDate)
);
// String predicates
List<OrcFilters.Predicate> stringPredicates = Arrays.asList(
OrcFilters.startsWith("name", "John"),
OrcFilters.in("country", Arrays.asList("US", "CA", "UK"))
);
// Logical combinations
OrcFilters.Predicate combined = OrcFilters.and(
OrcFilters.equals("status", "active"),
OrcFilters.or(
OrcFilters.greaterThan("age", 18),
OrcFilters.isNull("age")
)
);Optimize performance by reading only required columns:
// Table has 10 columns but only need 3
String[] allFields = {"id", "name", "email", "age", "salary", "dept", "manager", "created", "updated", "status"};
// Project only required columns (indices 0, 1, 2, 9)
int[] selectedFields = {0, 1, 2, 9}; // id, name, email, status
// This reduces I/O and memory usage significantly
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> format =
OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
hadoopConfig,
fullTableType,
partitionKeys,
extractor,
selectedFields, // Only read these columns
predicates,
batchSize
);Handle partitioned ORC datasets efficiently:
// Partition extractor implementation
PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {
String path = split.path().toString();
// Parse Hive-style partition paths: /table/year=2023/month=12/file.orc
Pattern pattern = Pattern.compile(fieldName + "=([^/]+)");
Matcher matcher = pattern.matcher(path);
if (matcher.find()) {
String value = matcher.group(1);
// Convert string value to appropriate type
switch (fieldType.getTypeRoot()) {
case INTEGER:
return Integer.parseInt(value);
case BIGINT:
return Long.parseLong(value);
case VARCHAR:
return value;
case DATE:
return Date.valueOf(value);
default:
return value;
}
}
return null;
};Key strategies for optimal columnar reading performance:
selectedFieldsconjunctPredicates to filter at the ORC levelbatchSize based on memory and processing requirements// Optimized configuration
Configuration optimizedConfig = new Configuration();
optimizedConfig.setBoolean("orc.use.zerocopy", true);
optimizedConfig.setInt("orc.row.batch.size", 2048); // Larger batches for better throughput
optimizedConfig.set("orc.compress", "ZSTD"); // Fast compressionHandle common reading errors:
try {
OrcColumnarRowSplitReader<VectorizedRowBatch> reader =
OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(/* parameters */);
VectorizedColumnBatch batch;
while ((batch = reader.nextBatch()) != null) {
// Process batch
}
} catch (IOException e) {
// Handle file system errors, corrupt files, or read failures
logger.error("Failed to read ORC file: " + path, e);
} catch (IllegalArgumentException e) {
// Handle schema mismatches or invalid column selections
logger.error("Invalid schema or column selection", e);
} finally {
if (reader != null) {
reader.close();
}
}