0
# Predicate Pushdown
1
2
The ORC format supports advanced predicate pushdown capabilities through the `OrcFilters` class, enabling efficient filtering at the ORC file level before data is loaded into Flink. This significantly improves query performance by reducing I/O and processing overhead.
3
4
## OrcFilters Utility
5
6
```java { .api }
7
public class OrcFilters {
8
public static Predicate toOrcPredicate(Expression expression);
9
}
10
```
11
12
## Base Predicate Class
13
14
```java { .api }
15
public abstract static class Predicate implements Serializable {
16
public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
17
}
18
```
19
20
## Comparison Predicates
21
22
### Equality Predicates
23
24
```java { .api }
25
public static class Equals extends BinaryPredicate {
26
public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
27
public SearchArgument.Builder add(SearchArgument.Builder builder);
28
}
29
30
public static class NullSafeEquals extends BinaryPredicate {
31
public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
32
public SearchArgument.Builder add(SearchArgument.Builder builder);
33
}
34
```
35
36
### Comparison Predicates
37
38
```java { .api }
39
public static class LessThan extends BinaryPredicate {
40
public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal);
41
public SearchArgument.Builder add(SearchArgument.Builder builder);
42
}
43
44
public static class LessThanEquals extends BinaryPredicate {
45
public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
46
public SearchArgument.Builder add(SearchArgument.Builder builder);
47
}
48
```
49
50
### Null Predicates
51
52
```java { .api }
53
public static class IsNull extends ColumnPredicate {
54
public IsNull(String columnName, PredicateLeaf.Type literalType);
55
public SearchArgument.Builder add(SearchArgument.Builder builder);
56
}
57
```
58
59
### Range Predicates
60
61
```java { .api }
62
public static class Between extends ColumnPredicate {
63
public Between(
64
String columnName,
65
PredicateLeaf.Type literalType,
66
Serializable lowerBound,
67
Serializable upperBound
68
);
69
public SearchArgument.Builder add(SearchArgument.Builder builder);
70
}
71
72
public static class In extends ColumnPredicate {
73
public In(String columnName, PredicateLeaf.Type literalType, Serializable... literals);
74
public SearchArgument.Builder add(SearchArgument.Builder builder);
75
}
76
```
77
78
## Logical Predicates
79
80
```java { .api }
81
public static class Not extends Predicate {
82
public Not(Predicate predicate);
83
public SearchArgument.Builder add(SearchArgument.Builder builder);
84
protected Predicate child();
85
}
86
87
public static class Or extends Predicate {
88
public Or(Predicate... predicates);
89
public SearchArgument.Builder add(SearchArgument.Builder builder);
90
protected Iterable<Predicate> children();
91
}
92
93
public static class And extends Predicate {
94
public And(Predicate... predicates);
95
public SearchArgument.Builder add(SearchArgument.Builder builder);
96
}
97
```
98
99
## Usage Examples
100
101
### Basic Equality Filters
102
103
```java
104
import org.apache.flink.orc.OrcFilters;
105
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
106
107
// age = 30
108
OrcFilters.Predicate agePredicate = new OrcFilters.Equals(
109
"age",
110
PredicateLeaf.Type.LONG,
111
30
112
);
113
114
// name = "Alice"
115
OrcFilters.Predicate namePredicate = new OrcFilters.Equals(
116
"name",
117
PredicateLeaf.Type.STRING,
118
"Alice"
119
);
120
121
// active = true
122
OrcFilters.Predicate activePredicate = new OrcFilters.Equals(
123
"active",
124
PredicateLeaf.Type.BOOLEAN,
125
true
126
);
127
```
128
129
### Comparison Filters
130
131
```java
132
// age > 25 (implemented as NOT age <= 25)
133
OrcFilters.Predicate ageGreaterThan = new OrcFilters.Not(
134
new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
135
);
136
137
// salary >= 50000.00
138
OrcFilters.Predicate salaryGreaterOrEqual = new OrcFilters.Not(
139
new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000.00"))
140
);
141
142
// created_date < '2023-01-01'
143
OrcFilters.Predicate dateBeforePredicate = new OrcFilters.LessThan(
144
"created_date",
145
PredicateLeaf.Type.DATE,
146
Date.valueOf("2023-01-01")
147
);
148
```
149
150
### Null and Range Filters
151
152
```java
153
// name IS NOT NULL
154
OrcFilters.Predicate nameNotNull = new OrcFilters.Not(
155
new OrcFilters.IsNull("name", PredicateLeaf.Type.STRING)
156
);
157
158
// age BETWEEN 25 AND 65
159
OrcFilters.Predicate ageBetween = new OrcFilters.Between(
160
"age",
161
PredicateLeaf.Type.LONG,
162
25,
163
65
164
);
165
166
// department IN ('Engineering', 'Sales', 'Marketing')
167
OrcFilters.Predicate departmentIn = new OrcFilters.In(
168
"department",
169
PredicateLeaf.Type.STRING,
170
"Engineering",
171
"Sales",
172
"Marketing"
173
);
174
```
175
176
### Complex Logical Combinations
177
178
```java
179
// (age > 25 AND salary >= 50000) OR department = 'Executive'
180
OrcFilters.Predicate ageFilter = new OrcFilters.Not(
181
new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
182
);
183
184
OrcFilters.Predicate salaryFilter = new OrcFilters.Not(
185
new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000"))
186
);
187
188
OrcFilters.Predicate departmentFilter = new OrcFilters.Equals(
189
"department",
190
PredicateLeaf.Type.STRING,
191
"Executive"
192
);
193
194
OrcFilters.Predicate complexPredicate = new OrcFilters.Or(
195
new OrcFilters.And(ageFilter, salaryFilter),
196
departmentFilter
197
);
198
```
199
200
## Automatic Filter Conversion
201
202
The ORC format can automatically convert Flink expressions to ORC predicates:
203
204
```java
205
import org.apache.flink.table.expressions.Expression;
206
import org.apache.flink.table.expressions.CallExpression;
207
208
// Convert Flink expression to ORC predicate
209
Expression flinkExpression = // ... from Table API filter
210
OrcFilters.Predicate orcPredicate = OrcFilters.toOrcPredicate(flinkExpression);
211
212
if (orcPredicate != null) {
213
// Predicate can be pushed down to ORC
214
List<OrcFilters.Predicate> predicates = Arrays.asList(orcPredicate);
215
} else {
216
// Predicate cannot be converted, will be applied at Flink level
217
System.out.println("Unsupported predicate for pushdown: " + flinkExpression);
218
}
219
```
220
221
### Supported Expression Types
222
223
The following Flink expressions can be automatically converted:
224
225
- **Comparison**: `EQUALS`, `NOT_EQUALS`, `GREATER_THAN`, `GREATER_THAN_OR_EQUAL`, `LESS_THAN`, `LESS_THAN_OR_EQUAL`
226
- **Null checks**: `IS_NULL`, `IS_NOT_NULL`
227
- **Logical**: `AND`, `OR`, `NOT`
228
229
Example automatic conversion:
230
231
```java
232
// Table API filter automatically converted
233
Table result = table
234
.filter($("age").isGreater(25)
235
.and($("department").isEqual("Engineering"))
236
.and($("salary").isNotNull()))
237
.select($("name"), $("age"), $("salary"));
238
```
239
240
## Data Type Support
241
242
### Supported Types for Predicates
243
244
```java
245
// Supported PredicateLeaf.Type values
246
PredicateLeaf.Type.LONG // TINYINT, SMALLINT, INTEGER, BIGINT
247
PredicateLeaf.Type.FLOAT // FLOAT, DOUBLE
248
PredicateLeaf.Type.STRING // CHAR, VARCHAR
249
PredicateLeaf.Type.BOOLEAN // BOOLEAN
250
PredicateLeaf.Type.DATE // DATE
251
PredicateLeaf.Type.TIMESTAMP // TIMESTAMP
252
PredicateLeaf.Type.DECIMAL // DECIMAL
253
```
254
255
### Type Conversion Examples
256
257
```java
258
// Numeric types
259
new OrcFilters.Equals("byte_col", PredicateLeaf.Type.LONG, (byte) 10);
260
new OrcFilters.Equals("short_col", PredicateLeaf.Type.LONG, (short) 100);
261
new OrcFilters.Equals("int_col", PredicateLeaf.Type.LONG, 1000);
262
new OrcFilters.Equals("long_col", PredicateLeaf.Type.LONG, 10000L);
263
264
// Floating point
265
new OrcFilters.Equals("float_col", PredicateLeaf.Type.FLOAT, 3.14f);
266
new OrcFilters.Equals("double_col", PredicateLeaf.Type.FLOAT, 3.14159);
267
268
// Temporal types
269
new OrcFilters.Equals("date_col", PredicateLeaf.Type.DATE, Date.valueOf("2023-01-01"));
270
new OrcFilters.Equals("timestamp_col", PredicateLeaf.Type.TIMESTAMP,
271
Timestamp.valueOf("2023-01-01 12:00:00"));
272
273
// Decimal
274
new OrcFilters.Equals("decimal_col", PredicateLeaf.Type.DECIMAL,
275
new BigDecimal("12345.67"));
276
```
277
278
## Integration with Input Format
279
280
```java
281
import org.apache.flink.orc.OrcColumnarRowInputFormat;
282
283
// Create predicates
284
List<OrcFilters.Predicate> predicates = Arrays.asList(
285
new OrcFilters.Equals("department", PredicateLeaf.Type.STRING, "Engineering"),
286
new OrcFilters.Not(new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25))
287
);
288
289
// Use in input format
290
OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
291
OrcColumnarRowInputFormat.createPartitionedFormat(
292
OrcShim.defaultShim(),
293
hadoopConfig,
294
tableType,
295
Collections.emptyList(),
296
null,
297
selectedFields,
298
predicates, // Apply predicates
299
batchSize,
300
TypeConversions::fromLogicalToDataType
301
);
302
```
303
304
## Performance Benefits
305
306
### Stripe-Level Filtering
307
308
ORC predicates are evaluated at the stripe level, allowing entire stripes to be skipped:
309
310
```java
311
// This predicate can skip entire stripes where max(age) <= 25
312
OrcFilters.Predicate ageFilter = new OrcFilters.Not(
313
new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)
314
);
315
```
316
317
### Row Group Filtering
318
319
Predicates are also applied at the row group level within stripes:
320
321
```java
322
// Row groups where all salary values are < 50000 will be skipped
323
OrcFilters.Predicate salaryFilter = new OrcFilters.Not(
324
new OrcFilters.LessThan("salary", PredicateLeaf.Type.DECIMAL, new BigDecimal("50000"))
325
);
326
```
327
328
### Bloom Filter Integration
329
330
When ORC files have bloom filters, predicates can leverage them:
331
332
```java
333
// Bloom filters help with equality predicates
334
OrcFilters.Predicate nameFilter = new OrcFilters.Equals(
335
"customer_id",
336
PredicateLeaf.Type.STRING,
337
"CUST-12345"
338
);
339
```
340
341
## Best Practices
342
343
1. **Use Simple Predicates**: Simple equality and comparison predicates are most efficient
344
2. **Combine with Projection**: Use column projection with predicate pushdown for maximum benefit
345
3. **Consider Data Distribution**: Predicates work best on columns with good selectivity
346
4. **Test Performance**: Measure performance impact of different predicate combinations
347
5. **Monitor Skipped Data**: Use ORC statistics to verify predicate effectiveness