Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.
—
Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters, metadata handling, and version compatibility management for optimal performance in large-scale data processing environments.
Advanced predicate pushdown capabilities for optimizing query performance by filtering data at the ORC reader level.
/**
* Utility class for converting Flink expressions to ORC predicates.
* Enables filter pushdown optimization for improved query performance.
*/
public class OrcFilters {
/**
* Convert a Flink expression to an ORC predicate
* @param expression Flink filter expression
* @return ORC predicate for pushdown filtering
*/
public static Predicate toOrcPredicate(Expression expression);
/** Abstract base class for all ORC predicates */
public abstract static class Predicate implements Serializable {
public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
}
/** Base class for column-only predicates */
public abstract static class ColumnPredicate extends Predicate {
/** Column name this predicate applies to */
protected final String columnName;
public ColumnPredicate(String columnName);
public String getColumnName();
}
/** Base class for binary predicates (comparison operations) */
public abstract static class BinaryPredicate extends ColumnPredicate {
/** Literal value for comparison */
protected final Serializable literal;
/** Type of the literal value */
protected final PredicateLeaf.Type literalType;
public BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal);
public Serializable getLiteral();
public PredicateLeaf.Type getLiteralType();
}
/** Equality predicate: column = literal */
public static class Equals extends BinaryPredicate {
public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
}
/** Null-safe equality predicate: column <=> literal */
public static class NullSafeEquals extends BinaryPredicate {
public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
}
/** Less than predicate: column < literal */
public static class LessThan extends BinaryPredicate {
public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal);
}
/** Less than or equals predicate: column <= literal */
public static class LessThanEquals extends BinaryPredicate {
public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
}
/** Is null predicate: column IS NULL */
public static class IsNull extends ColumnPredicate {
protected final PredicateLeaf.Type literalType;
public IsNull(String columnName, PredicateLeaf.Type literalType);
public PredicateLeaf.Type getLiteralType();
}
/** Between predicate: column BETWEEN lower AND upper */
public static class Between extends ColumnPredicate {
protected final PredicateLeaf.Type literalType;
protected final Serializable lowerBound;
protected final Serializable upperBound;
public Between(String columnName, PredicateLeaf.Type literalType, Serializable lowerBound, Serializable upperBound);
public Serializable getLowerBound();
public Serializable getUpperBound();
public PredicateLeaf.Type getLiteralType();
}
/** In predicate: column IN (value1, value2, ...) */
public static class In extends ColumnPredicate {
protected final PredicateLeaf.Type literalType;
protected final Serializable[] literals;
public In(String columnName, PredicateLeaf.Type literalType, Serializable[] literals);
public Serializable[] getLiterals();
public PredicateLeaf.Type getLiteralType();
}
/** Not predicate: NOT (predicate) */
public static class Not extends Predicate {
protected final Predicate childPredicate;
public Not(Predicate childPredicate);
public Predicate getChildPredicate();
}
/** Or predicate: predicate1 OR predicate2 */
public static class Or extends Predicate {
protected final Predicate[] childPredicates;
public Or(Predicate[] childPredicates);
public Predicate[] getChildPredicates();
}
}Usage Example:
import org.apache.flink.orc.OrcFilters;
import org.apache.flink.table.expressions.Expression;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
// In practice, predicates are typically created by converting Flink expressions
// rather than constructing them directly
List<Expression> filterExpressions = // ... get from query planner
List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
for (Expression expr : filterExpressions) {
OrcFilters.Predicate pred = OrcFilters.toOrcPredicate(expr);
if (pred != null) {
orcPredicates.add(pred);
}
}
// Apply converted predicates to input format for pushdown filtering
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
new OrcColumnarRowFileInputFormat<>(
inputPaths,
fieldNames,
fieldTypes,
selectedFields,
orcPredicates, // Predicates for pushdown
batchSize,
orcConfig,
hadoopConfig
);
// Direct predicate construction (advanced usage)
List<OrcFilters.Predicate> directPredicates = Arrays.asList(
// amount > 100.0 (note: constructor parameters from actual source)
new OrcFilters.LessThan("amount", PredicateLeaf.Type.DECIMAL, new BigDecimal("100.0")),
// user_id IS NULL
new OrcFilters.IsNull("user_id", PredicateLeaf.Type.LONG)
);Compatibility layer for supporting different Hive and ORC versions seamlessly.
/**
* Version compatibility interface for different Hive/ORC versions.
* Abstracts version-specific differences in ORC reader implementations.
* @param <BATCH> Type of batch used (typically VectorizedRowBatch)
*/
public interface OrcShim<BATCH> extends Serializable {
/**
* Create an ORC record reader for the specified file and range
* @param conf Hadoop configuration
* @param schema ORC schema description
* @param selectedFields Indices of fields to read (for projection)
* @param conjunctPredicates List of predicates for pushdown filtering
* @param path Path to ORC file
* @param splitStart Start offset for reading
* @param splitLength Length to read
* @return Configured ORC record reader
* @throws IOException If reader creation fails
*/
RecordReader createRecordReader(
Configuration conf,
TypeDescription schema,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
org.apache.flink.core.fs.Path path,
long splitStart,
long splitLength
) throws IOException;
/**
* Create a batch wrapper for the given schema and batch size
* @param schema ORC schema description
* @param batchSize Expected batch size
* @return Wrapped batch for unified processing
*/
OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);
/**
* Read the next batch from the record reader
* @param reader ORC record reader
* @param batch Batch to populate
* @return true if batch was read, false if end of file
* @throws IOException If reading fails
*/
boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;
/**
* Get the default shim for the current runtime environment
* Typically uses Hive 2.3.0+ compatibility
* @return Default ORC shim instance
*/
static OrcShim<VectorizedRowBatch> defaultShim();
/**
* Create a shim for a specific Hive version
* @param hiveDependencyVersion Hive version string (e.g., "2.0.0", "2.1.0", "2.3.0")
* @return Version-specific ORC shim
*/
static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);
}Usage Example:
// Using specific Hive version compatibility
OrcShim<VectorizedRowBatch> shim;
// Auto-detect and use appropriate shim
shim = OrcShim.defaultShim();
// Or specify exact version for compatibility
shim = OrcShim.createShim("2.1.0"); // For Hive 2.1.x compatibility
// Use shim in reader configuration
OrcColumnarRowSplitReader<VectorizedRowBatch> reader =
new OrcColumnarRowSplitReader<>(
shim,
orcConfig,
fieldNames,
fieldTypes,
selectedFields,
predicates,
batchSize,
split,
batchGenerator
);Utilities for managing Hadoop and ORC configurations in distributed environments.
/**
* Serializable wrapper for Hadoop Configuration objects.
* Enables distribution of configuration across Flink cluster nodes.
*/
public final class SerializableHadoopConfigWrapper implements Serializable {
/**
* Constructor wrapping a Hadoop Configuration
* @param hadoopConfig Hadoop configuration to wrap
*/
public SerializableHadoopConfigWrapper(Configuration hadoopConfig);
/**
* Get the wrapped Hadoop configuration
* @return Hadoop Configuration object
*/
public Configuration getHadoopConfig();
}Usage Example:
// Configure ORC and Hadoop settings for optimal performance
Configuration orcConfig = new Configuration();
// Compression settings
orcConfig.set("orc.compress", "SNAPPY"); // or ZLIB, LZO, LZ4, ZSTD
orcConfig.set("orc.compress.size", "262144"); // 256KB compression blocks
// Stripe and batch settings for performance tuning
orcConfig.set("orc.stripe.size", "67108864"); // 64MB stripes
orcConfig.set("orc.row.index.stride", "10000"); // Row index every 10K rows
orcConfig.set("orc.create.index", "true"); // Enable indexes
// Vectorization settings
orcConfig.set("orc.row.batch.size", "1024"); // Batch size for vectorized processing
// Memory settings
orcConfig.set("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold
// Hadoop configuration for HDFS optimization
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("dfs.client.read.shortcircuit", "true");
hadoopConfig.set("dfs.domain.socket.path", "/var/lib/hadoop-hdfs/dn_socket");
hadoopConfig.set("dfs.client.cache.readahead", "268435456"); // 256MB readahead
// Create serializable wrapper
SerializableHadoopConfigWrapper configWrapper =
new SerializableHadoopConfigWrapper(hadoopConfig);
// Use in ORC input format
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
new OrcColumnarRowFileInputFormat<>(
inputPaths,
fieldNames,
fieldTypes,
selectedFields,
predicates,
1024, // Batch size
orcConfig,
configWrapper
);Utilities for handling complex ORC type conversions and schema evolution.
/**
* Utility methods for timestamp handling across different Hive versions
*/
public class TimestampUtil {
/**
* Check if a column vector is a Hive timestamp column vector
* @param columnVector Column vector to check
* @return true if it's a Hive timestamp vector
*/
public static boolean isHiveTimestampColumnVector(
org.apache.hadoop.hive.ql.exec.vector.ColumnVector columnVector
);
/**
* Create a column vector from a constant timestamp value
* @param value Constant timestamp value
* @param size Size of the vector
* @return Timestamp column vector with constant value
*/
public static org.apache.hadoop.hive.ql.exec.vector.ColumnVector createVectorFromConstant(
Object value,
int size
);
}Configuration Properties Reference:
// Common ORC configuration properties for SQL/Table API
Properties orcProperties = new Properties();
// Compression options
orcProperties.put("orc.compress", "SNAPPY"); // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
orcProperties.put("orc.compress.size", "262144"); // Compression chunk size
// File structure
orcProperties.put("orc.stripe.size", "67108864"); // Target stripe size (64MB)
orcProperties.put("orc.row.index.stride", "10000"); // Rows between index entries
orcProperties.put("orc.bloom.filter.columns", "user_id,product_id"); // Bloom filter columns
orcProperties.put("orc.bloom.filter.fpp", "0.05"); // False positive probability
// Performance tuning
orcProperties.put("orc.row.batch.size", "1024"); // Vectorized batch size
orcProperties.put("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold
orcProperties.put("orc.max.merge.distance", "1048576"); // Max merge distance (1MB)
// Schema evolution
orcProperties.put("orc.force.positional.evolution", "false"); // Use column names for evolution
orcProperties.put("orc.tolerate.missing.schema", "true"); // Handle missing columns
// For Table API usage in SQL DDL
tEnv.executeSql(
"CREATE TABLE optimized_sales (" +
" user_id BIGINT," +
" product_id BIGINT," +
" amount DECIMAL(10,2)," +
" purchase_time TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/path/to/data'," +
" 'format' = 'orc'," +
" 'orc.compress' = 'ZSTD'," +
" 'orc.stripe.size' = '128MB'," +
" 'orc.bloom.filter.columns' = 'user_id'," +
" 'orc.bloom.filter.fpp' = '0.01'" +
")"
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-orc-2-12