Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
—
The ORC format supports advanced predicate pushdown capabilities through the OrcFilters class, enabling efficient filtering at the ORC file level before data is loaded into Flink. This significantly improves query performance by reducing I/O and processing overhead.
public class OrcFilters {
public static Predicate toOrcPredicate(Expression expression);
}public abstract static class Predicate implements Serializable {
public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
}public static class Equals extends BinaryPredicate {
public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}
public static class NullSafeEquals extends BinaryPredicate {
public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}public static class LessThan extends BinaryPredicate {
public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}
public static class LessThanEquals extends BinaryPredicate {
public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}public static class IsNull extends ColumnPredicate {
public IsNull(String columnName, PredicateLeaf.Type literalType);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}public static class Between extends ColumnPredicate {
public Between(
String columnName,
PredicateLeaf.Type literalType,
Serializable lowerBound,
Serializable upperBound
);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}
public static class In extends ColumnPredicate {
public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}public static class Not extends Predicate {
public Not(Predicate predicate);
public SearchArgument.Builder add(SearchArgument.Builder builder);
protected Predicate child();
}
public static class Or extends Predicate {
public Or(Predicate... predicates);
public SearchArgument.Builder add(SearchArgument.Builder builder);
protected Iterable<Predicate> children();
}
public static class And extends Predicate {
public And(Predicate... predicates);
public SearchArgument.Builder add(SearchArgument.Builder builder);
}import org.apache.flink.orc.OrcFilters;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
// age = 30
OrcFilters.Predicate agePredicate = new OrcFilters.Equals(
"age",
PredicateLeaf.Type.LONG,
30
);
// name = "Alice"
OrcFilters.Predicate namePredicate = new OrcFilters.Equals(
"name",
PredicateLeaf.Type.STRING,
"Alice"
);
// active = true
OrcFilters.Predicate activePredicate = new OrcFilters.Equals(
"active",
PredicateLeaf.Type.BOOLEAN,
true
);// age > 25 (implemented as NOT age <= 25)
OrcFilters.Predicate ageGreaterThan = new OrcFilters.Not(
new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
);
// salary >= 50000.00
OrcFilters.Predicate salaryGreaterOrEqual = new OrcFilters.Not(
new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000.00"))
);
// created_date < '2023-01-01'
OrcFilters.Predicate dateBeforePredicate = new OrcFilters.LessThan(
"created_date",
PredicateLeaf.Type.DATE,
Date.valueOf("2023-01-01")
);// name IS NOT NULL
OrcFilters.Predicate nameNotNull = new OrcFilters.Not(
new OrcFilters.IsNull("name", PredicateLeaf.Type.STRING)
);
// age BETWEEN 25 AND 65
OrcFilters.Predicate ageBetween = new OrcFilters.Between(
"age",
PredicateLeaf.Type.LONG,
25,
65
);
// department IN ('Engineering', 'Sales', 'Marketing')
OrcFilters.Predicate departmentIn = new OrcFilters.In(
"department",
PredicateLeaf.Type.STRING,
"Engineering",
"Sales",
"Marketing"
);// (age > 25 AND salary >= 50000) OR department = 'Executive'
OrcFilters.Predicate ageFilter = new OrcFilters.Not(
new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
);
OrcFilters.Predicate salaryFilter = new OrcFilters.Not(
new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000"))
);
OrcFilters.Predicate departmentFilter = new OrcFilters.Equals(
"department",
PredicateLeaf.Type.STRING,
"Executive"
);
OrcFilters.Predicate complexPredicate = new OrcFilters.Or(
new OrcFilters.And(ageFilter, salaryFilter),
departmentFilter
);The ORC format can automatically convert Flink expressions to ORC predicates:
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.CallExpression;
// Convert Flink expression to ORC predicate
Expression flinkExpression = // ... from Table API filter
OrcFilters.Predicate orcPredicate = OrcFilters.toOrcPredicate(flinkExpression);
if (orcPredicate != null) {
// Predicate can be pushed down to ORC
List<OrcFilters.Predicate> predicates = Arrays.asList(orcPredicate);
} else {
// Predicate cannot be converted, will be applied at Flink level
System.out.println("Unsupported predicate for pushdown: " + flinkExpression);
}The following Flink expressions can be automatically converted:
EQUALS, NOT_EQUALS, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUALIS_NULL, IS_NOT_NULLAND, OR, NOTExample automatic conversion:
// Table API filter automatically converted
Table result = table
.filter($("age").isGreater(25)
.and($("department").isEqual("Engineering"))
.and($("salary").isNotNull()))
.select($("name"), $("age"), $("salary"));// Supported PredicateLeaf.Type values
PredicateLeaf.Type.LONG // TINYINT, SMALLINT, INTEGER, BIGINT
PredicateLeaf.Type.FLOAT // FLOAT, DOUBLE
PredicateLeaf.Type.STRING // CHAR, VARCHAR
PredicateLeaf.Type.BOOLEAN // BOOLEAN
PredicateLeaf.Type.DATE // DATE
PredicateLeaf.Type.TIMESTAMP // TIMESTAMP
PredicateLeaf.Type.DECIMAL // DECIMAL// Numeric types
new OrcFilters.Equals("byte_col", PredicateLeaf.Type.LONG, (byte) 10);
new OrcFilters.Equals("short_col", PredicateLeaf.Type.LONG, (short) 100);
new OrcFilters.Equals("int_col", PredicateLeaf.Type.LONG, 1000);
new OrcFilters.Equals("long_col", PredicateLeaf.Type.LONG, 10000L);
// Floating point
new OrcFilters.Equals("float_col", PredicateLeaf.Type.FLOAT, 3.14f);
new OrcFilters.Equals("double_col", PredicateLeaf.Type.FLOAT, 3.14159);
// Temporal types
new OrcFilters.Equals("date_col", PredicateLeaf.Type.DATE, Date.valueOf("2023-01-01"));
new OrcFilters.Equals("timestamp_col", PredicateLeaf.Type.TIMESTAMP,
Timestamp.valueOf("2023-01-01 12:00:00"));
// Decimal
new OrcFilters.Equals("decimal_col", PredicateLeaf.Type.DECIMAL,
new BigDecimal("12345.67"));import org.apache.flink.orc.OrcColumnarRowInputFormat;
// Create predicates
List<OrcFilters.Predicate> predicates = Arrays.asList(
new OrcFilters.Equals("department", PredicateLeaf.Type.STRING, "Engineering"),
new OrcFilters.Not(new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25))
);
// Use in input format
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
OrcColumnarRowInputFormat.createPartitionedFormat(
OrcShim.defaultShim(),
hadoopConfig,
tableType,
Collections.emptyList(),
null,
selectedFields,
predicates, // Apply predicates
batchSize,
TypeConversions::fromLogicalToDataType
);ORC predicates are evaluated at the stripe level, allowing entire stripes to be skipped:
// This predicate can skip entire stripes where max(age) <= 25
OrcFilters.Predicate ageFilter = new OrcFilters.Not(
new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
);Predicates are also applied at the row group level within stripes:
// Row groups where all salary values are < 50000 will be skipped
OrcFilters.Predicate salaryFilter = new OrcFilters.Not(
new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000"))
);When ORC files have bloom filters, predicates can leverage them:
// Bloom filters help with equality predicates
OrcFilters.Predicate nameFilter = new OrcFilters.Equals(
"customer_id",
PredicateLeaf.Type.STRING,
"CUST-12345"
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-orc