CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-orc

Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files

Pending
Overview
Eval results
Files

predicate-pushdown.mddocs/

Predicate Pushdown

The ORC format supports advanced predicate pushdown capabilities through the OrcFilters class, enabling efficient filtering at the ORC file level before data is loaded into Flink. This significantly improves query performance by reducing I/O and processing overhead.

OrcFilters Utility

public class OrcFilters {
    public static Predicate toOrcPredicate(Expression expression);
}

Base Predicate Class

public abstract static class Predicate implements Serializable {
    public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
}

Comparison Predicates

Equality Predicates

public static class Equals extends BinaryPredicate {
    public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

public static class NullSafeEquals extends BinaryPredicate {
    public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

Comparison Predicates

public static class LessThan extends BinaryPredicate {
    public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

public static class LessThanEquals extends BinaryPredicate {
    public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

Null Predicates

public static class IsNull extends ColumnPredicate {
    public IsNull(String columnName, PredicateLeaf.Type literalType);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

Range Predicates

public static class Between extends ColumnPredicate {
    public Between(
        String columnName,
        PredicateLeaf.Type literalType,
        Serializable lowerBound,
        Serializable upperBound
    );
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

public static class In extends ColumnPredicate {
    public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

Logical Predicates

public static class Not extends Predicate {
    public Not(Predicate predicate);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
    protected Predicate child();
}

public static class Or extends Predicate {
    public Or(Predicate... predicates);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
    protected Iterable<Predicate> children();
}

public static class And extends Predicate {
    public And(Predicate... predicates);
    public SearchArgument.Builder add(SearchArgument.Builder builder);
}

Usage Examples

Basic Equality Filters

import org.apache.flink.orc.OrcFilters;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;

// age = 30
OrcFilters.Predicate agePredicate = new OrcFilters.Equals(
    "age", 
    PredicateLeaf.Type.LONG, 
    30
);

// name = "Alice"
OrcFilters.Predicate namePredicate = new OrcFilters.Equals(
    "name", 
    PredicateLeaf.Type.STRING, 
    "Alice"
);

// active = true
OrcFilters.Predicate activePredicate = new OrcFilters.Equals(
    "active", 
    PredicateLeaf.Type.BOOLEAN, 
    true
);

Comparison Filters

// age > 25 (implemented as NOT age <= 25)
OrcFilters.Predicate ageGreaterThan = new OrcFilters.Not(
    new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
);

// salary >= 50000.00
OrcFilters.Predicate salaryGreaterOrEqual = new OrcFilters.Not(
    new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000.00"))
);

// created_date < '2023-01-01'
OrcFilters.Predicate dateBeforePredicate = new OrcFilters.LessThan(
    "created_date", 
    PredicateLeaf.Type.DATE, 
    Date.valueOf("2023-01-01")
);

Null and Range Filters

// name IS NOT NULL
OrcFilters.Predicate nameNotNull = new OrcFilters.Not(
    new OrcFilters.IsNull("name", PredicateLeaf.Type.STRING)
);

// age BETWEEN 25 AND 65
OrcFilters.Predicate ageBetween = new OrcFilters.Between(
    "age", 
    PredicateLeaf.Type.LONG, 
    25, 
    65
);

// department IN ('Engineering', 'Sales', 'Marketing')
OrcFilters.Predicate departmentIn = new OrcFilters.In(
    "department", 
    PredicateLeaf.Type.STRING,
    "Engineering", 
    "Sales", 
    "Marketing"
);

Complex Logical Combinations

// (age > 25 AND salary >= 50000) OR department = 'Executive'
OrcFilters.Predicate ageFilter = new OrcFilters.Not(
    new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
);

OrcFilters.Predicate salaryFilter = new OrcFilters.Not(
    new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000"))
);

OrcFilters.Predicate departmentFilter = new OrcFilters.Equals(
    "department", 
    PredicateLeaf.Type.STRING, 
    "Executive"
);

OrcFilters.Predicate complexPredicate = new OrcFilters.Or(
    new OrcFilters.And(ageFilter, salaryFilter),
    departmentFilter
);

Automatic Filter Conversion

The ORC format can automatically convert Flink expressions to ORC predicates:

import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.CallExpression;

// Convert Flink expression to ORC predicate
Expression flinkExpression = // ... from Table API filter
OrcFilters.Predicate orcPredicate = OrcFilters.toOrcPredicate(flinkExpression);

if (orcPredicate != null) {
    // Predicate can be pushed down to ORC
    List<OrcFilters.Predicate> predicates = Arrays.asList(orcPredicate);
} else {
    // Predicate cannot be converted, will be applied at Flink level
    System.out.println("Unsupported predicate for pushdown: " + flinkExpression);
}

Supported Expression Types

The following Flink expressions can be automatically converted:

  • Comparison: EQUALS, NOT_EQUALS, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL
  • Null checks: IS_NULL, IS_NOT_NULL
  • Logical: AND, OR, NOT

Example automatic conversion:

// Table API filter automatically converted
Table result = table
    .filter($("age").isGreater(25)
        .and($("department").isEqual("Engineering"))
        .and($("salary").isNotNull()))
    .select($("name"), $("age"), $("salary"));

Data Type Support

Supported Types for Predicates

// Supported PredicateLeaf.Type values
PredicateLeaf.Type.LONG        // TINYINT, SMALLINT, INTEGER, BIGINT
PredicateLeaf.Type.FLOAT       // FLOAT, DOUBLE
PredicateLeaf.Type.STRING      // CHAR, VARCHAR
PredicateLeaf.Type.BOOLEAN     // BOOLEAN
PredicateLeaf.Type.DATE        // DATE
PredicateLeaf.Type.TIMESTAMP   // TIMESTAMP
PredicateLeaf.Type.DECIMAL     // DECIMAL

Type Conversion Examples

// Numeric types
new OrcFilters.Equals("byte_col", PredicateLeaf.Type.LONG, (byte) 10);
new OrcFilters.Equals("short_col", PredicateLeaf.Type.LONG, (short) 100);
new OrcFilters.Equals("int_col", PredicateLeaf.Type.LONG, 1000);
new OrcFilters.Equals("long_col", PredicateLeaf.Type.LONG, 10000L);

// Floating point
new OrcFilters.Equals("float_col", PredicateLeaf.Type.FLOAT, 3.14f);
new OrcFilters.Equals("double_col", PredicateLeaf.Type.FLOAT, 3.14159);

// Temporal types
new OrcFilters.Equals("date_col", PredicateLeaf.Type.DATE, Date.valueOf("2023-01-01"));
new OrcFilters.Equals("timestamp_col", PredicateLeaf.Type.TIMESTAMP, 
    Timestamp.valueOf("2023-01-01 12:00:00"));

// Decimal
new OrcFilters.Equals("decimal_col", PredicateLeaf.Type.DECIMAL, 
    new BigDecimal("12345.67"));

Integration with Input Format

import org.apache.flink.orc.OrcColumnarRowInputFormat;

// Create predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
    new OrcFilters.Equals("department", PredicateLeaf.Type.STRING, "Engineering"),
    new OrcFilters.Not(new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25))
);

// Use in input format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat = 
    OrcColumnarRowInputFormat.createPartitionedFormat(
        OrcShim.defaultShim(),
        hadoopConfig,
        tableType,
        Collections.emptyList(),
        null,
        selectedFields,
        predicates, // Apply predicates
        batchSize,
        TypeConversions::fromLogicalToDataType
    );

Performance Benefits

Stripe-Level Filtering

ORC predicates are evaluated at the stripe level, allowing entire stripes to be skipped:

// This predicate can skip entire stripes where max(age) <= 25
OrcFilters.Predicate ageFilter = new OrcFilters.Not(
    new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
);

Row Group Filtering

Predicates are also applied at the row group level within stripes:

// Row groups where all salary values are < 50000 will be skipped
OrcFilters.Predicate salaryFilter = new OrcFilters.Not(
    new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000"))
);

Bloom Filter Integration

When ORC files have bloom filters, predicates can leverage them:

// Bloom filters help with equality predicates
OrcFilters.Predicate nameFilter = new OrcFilters.Equals(
    "customer_id", 
    PredicateLeaf.Type.STRING, 
    "CUST-12345"
);

Best Practices

  1. Use Simple Predicates: Simple equality and comparison predicates are most efficient
  2. Combine with Projection: Use column projection with predicate pushdown for maximum benefit
  3. Consider Data Distribution: Predicates work best on columns with good selectivity
  4. Test Performance: Measure performance impact of different predicate combinations
  5. Monitor Skipped Data: Use ORC statistics to verify predicate effectiveness

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-orc

docs

bulk-writing.md

columnar-reading.md

index.md

predicate-pushdown.md

table-api.md

vector-processing.md

tile.json