CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-orc-2-12

Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration and Advanced Usage

Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters, metadata handling, and version compatibility management for optimal performance in large-scale data processing environments.

Capabilities

Filter Pushdown

Advanced predicate pushdown capabilities for optimizing query performance by filtering data at the ORC reader level.

/**
 * Utility class for converting Flink expressions to ORC predicates.
 * Enables filter pushdown optimization for improved query performance.
 */
public class OrcFilters {
    /**
     * Convert a Flink expression to an ORC predicate
     * @param expression Flink filter expression
     * @return ORC predicate for pushdown filtering
     */
    public static Predicate toOrcPredicate(Expression expression);
    
    /** Abstract base class for all ORC predicates */
    public abstract static class Predicate implements Serializable {
        public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
    }
    
    /** Base class for column-only predicates */
    public abstract static class ColumnPredicate extends Predicate {
        /** Column name this predicate applies to */
        protected final String columnName;
        
        public ColumnPredicate(String columnName);
        public String getColumnName();
    }
    
    /** Base class for binary predicates (comparison operations) */
    public abstract static class BinaryPredicate extends ColumnPredicate {
        /** Literal value for comparison */
        protected final Serializable literal;
        /** Type of the literal value */
        protected final PredicateLeaf.Type literalType;
        
        public BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal);
        public Serializable getLiteral();
        public PredicateLeaf.Type getLiteralType();
    }
    
    /** Equality predicate: column = literal */
    public static class Equals extends BinaryPredicate {
        public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    }
    
    /** Null-safe equality predicate: column <=> literal */
    public static class NullSafeEquals extends BinaryPredicate {
        public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    }
    
    /** Less than predicate: column < literal */
    public static class LessThan extends BinaryPredicate {
        public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    }
    
    /** Less than or equals predicate: column <= literal */
    public static class LessThanEquals extends BinaryPredicate {
        public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    }
    
    /** Is null predicate: column IS NULL */
    public static class IsNull extends ColumnPredicate {
        protected final PredicateLeaf.Type literalType;
        
        public IsNull(String columnName, PredicateLeaf.Type literalType);
        public PredicateLeaf.Type getLiteralType();
    }
    
    /** Between predicate: column BETWEEN lower AND upper */
    public static class Between extends ColumnPredicate {
        protected final PredicateLeaf.Type literalType;
        protected final Serializable lowerBound;
        protected final Serializable upperBound;
        
        public Between(String columnName, PredicateLeaf.Type literalType, Serializable lowerBound, Serializable upperBound);
        public Serializable getLowerBound();
        public Serializable getUpperBound();
        public PredicateLeaf.Type getLiteralType();
    }
    
    /** In predicate: column IN (value1, value2, ...) */
    public static class In extends ColumnPredicate {
        protected final PredicateLeaf.Type literalType;
        protected final Serializable[] literals;
        
        public In(String columnName, PredicateLeaf.Type literalType, Serializable[] literals);
        public Serializable[] getLiterals();
        public PredicateLeaf.Type getLiteralType();
    }
    
    /** Not predicate: NOT (predicate) */
    public static class Not extends Predicate {
        protected final Predicate childPredicate;
        
        public Not(Predicate childPredicate);
        public Predicate getChildPredicate();
    }
    
    /** Or predicate: predicate1 OR predicate2 */
    public static class Or extends Predicate {
        protected final Predicate[] childPredicates;
        
        public Or(Predicate[] childPredicates);
        public Predicate[] getChildPredicates();
    }
}

Usage Example:

import org.apache.flink.orc.OrcFilters;
import org.apache.flink.table.expressions.Expression;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;

// In practice, predicates are typically created by converting Flink expressions
// rather than constructing them directly
List<Expression> filterExpressions = // ... get from query planner

List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
for (Expression expr : filterExpressions) {
    OrcFilters.Predicate pred = OrcFilters.toOrcPredicate(expr);
    if (pred != null) {
        orcPredicates.add(pred);
    }
}

// Apply converted predicates to input format for pushdown filtering
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat = 
    new OrcColumnarRowFileInputFormat<>(
        inputPaths,
        fieldNames,
        fieldTypes,
        selectedFields,
        orcPredicates, // Predicates for pushdown
        batchSize,
        orcConfig,
        hadoopConfig
    );

// Direct predicate construction (advanced usage)
List<OrcFilters.Predicate> directPredicates = Arrays.asList(
    // amount > 100.0 (note: constructor parameters from actual source)
    new OrcFilters.LessThan("amount", PredicateLeaf.Type.DECIMAL, new BigDecimal("100.0")),
    // user_id IS NULL
    new OrcFilters.IsNull("user_id", PredicateLeaf.Type.LONG)
);

Version Compatibility Layer

Compatibility layer for supporting different Hive and ORC versions seamlessly.

/**
 * Version compatibility interface for different Hive/ORC versions.
 * Abstracts version-specific differences in ORC reader implementations.
 * @param <BATCH> Type of batch used (typically VectorizedRowBatch)
 */
public interface OrcShim<BATCH> extends Serializable {
    /**
     * Create an ORC record reader for the specified file and range
     * @param conf Hadoop configuration
     * @param schema ORC schema description
     * @param selectedFields Indices of fields to read (for projection)
     * @param conjunctPredicates List of predicates for pushdown filtering
     * @param path Path to ORC file
     * @param splitStart Start offset for reading
     * @param splitLength Length to read
     * @return Configured ORC record reader
     * @throws IOException If reader creation fails
     */
    RecordReader createRecordReader(
        Configuration conf,
        TypeDescription schema,
        int[] selectedFields,
        List<OrcFilters.Predicate> conjunctPredicates,
        org.apache.flink.core.fs.Path path,
        long splitStart,
        long splitLength
    ) throws IOException;
    
    /**
     * Create a batch wrapper for the given schema and batch size
     * @param schema ORC schema description
     * @param batchSize Expected batch size
     * @return Wrapped batch for unified processing
     */
    OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);
    
    /**
     * Read the next batch from the record reader
     * @param reader ORC record reader
     * @param batch Batch to populate
     * @return true if batch was read, false if end of file
     * @throws IOException If reading fails
     */
    boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;
    
    /**
     * Get the default shim for the current runtime environment
     * Typically uses Hive 2.3.0+ compatibility
     * @return Default ORC shim instance
     */
    static OrcShim<VectorizedRowBatch> defaultShim();
    
    /**
     * Create a shim for a specific Hive version
     * @param hiveDependencyVersion Hive version string (e.g., "2.0.0", "2.1.0", "2.3.0")
     * @return Version-specific ORC shim
     */
    static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);
}

Usage Example:

// Using specific Hive version compatibility
OrcShim<VectorizedRowBatch> shim;

// Auto-detect and use appropriate shim
shim = OrcShim.defaultShim();

// Or specify exact version for compatibility
shim = OrcShim.createShim("2.1.0"); // For Hive 2.1.x compatibility

// Use shim in reader configuration
OrcColumnarRowSplitReader<VectorizedRowBatch> reader = 
    new OrcColumnarRowSplitReader<>(
        shim,
        orcConfig,
        fieldNames,
        fieldTypes,
        selectedFields,
        predicates,
        batchSize,
        split,
        batchGenerator
    );

Configuration Management

Utilities for managing Hadoop and ORC configurations in distributed environments.

/**
 * Serializable wrapper for Hadoop Configuration objects.
 * Enables distribution of configuration across Flink cluster nodes.
 */
public final class SerializableHadoopConfigWrapper implements Serializable {
    /**
     * Constructor wrapping a Hadoop Configuration
     * @param hadoopConfig Hadoop configuration to wrap
     */
    public SerializableHadoopConfigWrapper(Configuration hadoopConfig);
    
    /**
     * Get the wrapped Hadoop configuration
     * @return Hadoop Configuration object
     */
    public Configuration getHadoopConfig();
}

Usage Example:

// Configure ORC and Hadoop settings for optimal performance
Configuration orcConfig = new Configuration();

// Compression settings
orcConfig.set("orc.compress", "SNAPPY");           // or ZLIB, LZO, LZ4, ZSTD
orcConfig.set("orc.compress.size", "262144");      // 256KB compression blocks

// Stripe and batch settings for performance tuning
orcConfig.set("orc.stripe.size", "67108864");      // 64MB stripes
orcConfig.set("orc.row.index.stride", "10000");    // Row index every 10K rows
orcConfig.set("orc.create.index", "true");         // Enable indexes

// Vectorization settings
orcConfig.set("orc.row.batch.size", "1024");       // Batch size for vectorized processing

// Memory settings
orcConfig.set("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold

// Hadoop configuration for HDFS optimization
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("dfs.client.read.shortcircuit", "true");
hadoopConfig.set("dfs.domain.socket.path", "/var/lib/hadoop-hdfs/dn_socket");
hadoopConfig.set("dfs.client.cache.readahead", "268435456"); // 256MB readahead

// Create serializable wrapper
SerializableHadoopConfigWrapper configWrapper = 
    new SerializableHadoopConfigWrapper(hadoopConfig);

// Use in ORC input format
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat = 
    new OrcColumnarRowFileInputFormat<>(
        inputPaths,
        fieldNames,
        fieldTypes,
        selectedFields,
        predicates,
        1024, // Batch size
        orcConfig,
        configWrapper
    );

Advanced Type Handling

Utilities for handling complex ORC type conversions and schema evolution.

/**
 * Utility methods for timestamp handling across different Hive versions
 */
public class TimestampUtil {
    /**
     * Check if a column vector is a Hive timestamp column vector
     * @param columnVector Column vector to check
     * @return true if it's a Hive timestamp vector
     */
    public static boolean isHiveTimestampColumnVector(
        org.apache.hadoop.hive.ql.exec.vector.ColumnVector columnVector
    );
    
    /**
     * Create a column vector from a constant timestamp value
     * @param value Constant timestamp value
     * @param size Size of the vector
     * @return Timestamp column vector with constant value
     */
    public static org.apache.hadoop.hive.ql.exec.vector.ColumnVector createVectorFromConstant(
        Object value,
        int size
    );
}

Configuration Properties Reference:

// Common ORC configuration properties for SQL/Table API
Properties orcProperties = new Properties();

// Compression options
orcProperties.put("orc.compress", "SNAPPY");        // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
orcProperties.put("orc.compress.size", "262144");   // Compression chunk size

// File structure
orcProperties.put("orc.stripe.size", "67108864");   // Target stripe size (64MB)
orcProperties.put("orc.row.index.stride", "10000"); // Rows between index entries
orcProperties.put("orc.bloom.filter.columns", "user_id,product_id"); // Bloom filter columns
orcProperties.put("orc.bloom.filter.fpp", "0.05");  // False positive probability

// Performance tuning
orcProperties.put("orc.row.batch.size", "1024");    // Vectorized batch size
orcProperties.put("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold
orcProperties.put("orc.max.merge.distance", "1048576");   // Max merge distance (1MB)

// Schema evolution
orcProperties.put("orc.force.positional.evolution", "false"); // Use column names for evolution
orcProperties.put("orc.tolerate.missing.schema", "true");     // Handle missing columns

// For Table API usage in SQL DDL
tEnv.executeSql(
    "CREATE TABLE optimized_sales (" +
    "  user_id BIGINT," +
    "  product_id BIGINT," +
    "  amount DECIMAL(10,2)," +
    "  purchase_time TIMESTAMP(3)" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/path/to/data'," +
    "  'format' = 'orc'," +
    "  'orc.compress' = 'ZSTD'," +
    "  'orc.stripe.size' = '128MB'," +
    "  'orc.bloom.filter.columns' = 'user_id'," +
    "  'orc.bloom.filter.fpp' = '0.01'" +
    ")"
);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-orc-2-12

docs

configuration.md

datastream-api.md

index.md

table-api.md

tile.json