0
# Configuration and Advanced Usage
1
2
Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters, metadata handling, and version compatibility management for optimal performance in large-scale data processing environments.
3
4
## Capabilities
5
6
### Filter Pushdown
7
8
Advanced predicate pushdown capabilities for optimizing query performance by filtering data at the ORC reader level.
9
10
```java { .api }
11
/**
12
* Utility class for converting Flink expressions to ORC predicates.
13
* Enables filter pushdown optimization for improved query performance.
14
*/
15
public class OrcFilters {
16
/**
17
* Convert a Flink expression to an ORC predicate
18
* @param expression Flink filter expression
19
* @return ORC predicate for pushdown filtering
20
*/
21
public static Predicate toOrcPredicate(Expression expression);
22
23
/** Abstract base class for all ORC predicates */
24
public abstract static class Predicate implements Serializable {
25
public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
26
}
27
28
/** Base class for column-only predicates */
29
public abstract static class ColumnPredicate extends Predicate {
30
/** Column name this predicate applies to */
31
protected final String columnName;
32
33
public ColumnPredicate(String columnName);
34
public String getColumnName();
35
}
36
37
/** Base class for binary predicates (comparison operations) */
38
public abstract static class BinaryPredicate extends ColumnPredicate {
39
/** Literal value for comparison */
40
protected final Serializable literal;
41
/** Type of the literal value */
42
protected final PredicateLeaf.Type literalType;
43
44
public BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal);
45
public Serializable getLiteral();
46
public PredicateLeaf.Type getLiteralType();
47
}
48
49
/** Equality predicate: column = literal */
50
public static class Equals extends BinaryPredicate {
51
public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
52
}
53
54
/** Null-safe equality predicate: column <=> literal */
55
public static class NullSafeEquals extends BinaryPredicate {
56
public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
57
}
58
59
/** Less than predicate: column < literal */
60
public static class LessThan extends BinaryPredicate {
61
public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal);
62
}
63
64
/** Less than or equals predicate: column <= literal */
65
public static class LessThanEquals extends BinaryPredicate {
66
public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);
67
}
68
69
/** Is null predicate: column IS NULL */
70
public static class IsNull extends ColumnPredicate {
71
protected final PredicateLeaf.Type literalType;
72
73
public IsNull(String columnName, PredicateLeaf.Type literalType);
74
public PredicateLeaf.Type getLiteralType();
75
}
76
77
/** Between predicate: column BETWEEN lower AND upper */
78
public static class Between extends ColumnPredicate {
79
protected final PredicateLeaf.Type literalType;
80
protected final Serializable lowerBound;
81
protected final Serializable upperBound;
82
83
public Between(String columnName, PredicateLeaf.Type literalType, Serializable lowerBound, Serializable upperBound);
84
public Serializable getLowerBound();
85
public Serializable getUpperBound();
86
public PredicateLeaf.Type getLiteralType();
87
}
88
89
/** In predicate: column IN (value1, value2, ...) */
90
public static class In extends ColumnPredicate {
91
protected final PredicateLeaf.Type literalType;
92
protected final Serializable[] literals;
93
94
public In(String columnName, PredicateLeaf.Type literalType, Serializable[] literals);
95
public Serializable[] getLiterals();
96
public PredicateLeaf.Type getLiteralType();
97
}
98
99
/** Not predicate: NOT (predicate) */
100
public static class Not extends Predicate {
101
protected final Predicate childPredicate;
102
103
public Not(Predicate childPredicate);
104
public Predicate getChildPredicate();
105
}
106
107
/** Or predicate: predicate1 OR predicate2 */
108
public static class Or extends Predicate {
109
protected final Predicate[] childPredicates;
110
111
public Or(Predicate[] childPredicates);
112
public Predicate[] getChildPredicates();
113
}
114
}
115
```
116
117
**Usage Example:**
118
119
```java
120
import org.apache.flink.orc.OrcFilters;
121
import org.apache.flink.table.expressions.Expression;
122
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
123
124
// In practice, predicates are typically created by converting Flink expressions
125
// rather than constructing them directly
126
List<Expression> filterExpressions = // ... get from query planner
127
128
List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();
129
for (Expression expr : filterExpressions) {
130
OrcFilters.Predicate pred = OrcFilters.toOrcPredicate(expr);
131
if (pred != null) {
132
orcPredicates.add(pred);
133
}
134
}
135
136
// Apply converted predicates to input format for pushdown filtering
137
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
138
new OrcColumnarRowFileInputFormat<>(
139
inputPaths,
140
fieldNames,
141
fieldTypes,
142
selectedFields,
143
orcPredicates, // Predicates for pushdown
144
batchSize,
145
orcConfig,
146
hadoopConfig
147
);
148
149
// Direct predicate construction (advanced usage)
150
List<OrcFilters.Predicate> directPredicates = Arrays.asList(
151
// amount > 100.0 (note: constructor parameters from actual source)
152
new OrcFilters.LessThan("amount", PredicateLeaf.Type.DECIMAL, new BigDecimal("100.0")),
153
// user_id IS NULL
154
new OrcFilters.IsNull("user_id", PredicateLeaf.Type.LONG)
155
);
156
```
157
158
### Version Compatibility Layer
159
160
Compatibility layer for supporting different Hive and ORC versions seamlessly.
161
162
```java { .api }
163
/**
164
* Version compatibility interface for different Hive/ORC versions.
165
* Abstracts version-specific differences in ORC reader implementations.
166
* @param <BATCH> Type of batch used (typically VectorizedRowBatch)
167
*/
168
public interface OrcShim<BATCH> extends Serializable {
169
/**
170
* Create an ORC record reader for the specified file and range
171
* @param conf Hadoop configuration
172
* @param schema ORC schema description
173
* @param selectedFields Indices of fields to read (for projection)
174
* @param conjunctPredicates List of predicates for pushdown filtering
175
* @param path Path to ORC file
176
* @param splitStart Start offset for reading
177
* @param splitLength Length to read
178
* @return Configured ORC record reader
179
* @throws IOException If reader creation fails
180
*/
181
RecordReader createRecordReader(
182
Configuration conf,
183
TypeDescription schema,
184
int[] selectedFields,
185
List<OrcFilters.Predicate> conjunctPredicates,
186
org.apache.flink.core.fs.Path path,
187
long splitStart,
188
long splitLength
189
) throws IOException;
190
191
/**
192
* Create a batch wrapper for the given schema and batch size
193
* @param schema ORC schema description
194
* @param batchSize Expected batch size
195
* @return Wrapped batch for unified processing
196
*/
197
OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);
198
199
/**
200
* Read the next batch from the record reader
201
* @param reader ORC record reader
202
* @param batch Batch to populate
203
* @return true if batch was read, false if end of file
204
* @throws IOException If reading fails
205
*/
206
boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;
207
208
/**
209
* Get the default shim for the current runtime environment
210
* Typically uses Hive 2.3.0+ compatibility
211
* @return Default ORC shim instance
212
*/
213
static OrcShim<VectorizedRowBatch> defaultShim();
214
215
/**
216
* Create a shim for a specific Hive version
217
* @param hiveDependencyVersion Hive version string (e.g., "2.0.0", "2.1.0", "2.3.0")
218
* @return Version-specific ORC shim
219
*/
220
static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);
221
}
222
```
223
224
**Usage Example:**
225
226
```java
227
// Using specific Hive version compatibility
228
OrcShim<VectorizedRowBatch> shim;
229
230
// Auto-detect and use appropriate shim
231
shim = OrcShim.defaultShim();
232
233
// Or specify exact version for compatibility
234
shim = OrcShim.createShim("2.1.0"); // For Hive 2.1.x compatibility
235
236
// Use shim in reader configuration
237
OrcColumnarRowSplitReader<VectorizedRowBatch> reader =
238
new OrcColumnarRowSplitReader<>(
239
shim,
240
orcConfig,
241
fieldNames,
242
fieldTypes,
243
selectedFields,
244
predicates,
245
batchSize,
246
split,
247
batchGenerator
248
);
249
```
250
251
### Configuration Management
252
253
Utilities for managing Hadoop and ORC configurations in distributed environments.
254
255
```java { .api }
256
/**
257
* Serializable wrapper for Hadoop Configuration objects.
258
* Enables distribution of configuration across Flink cluster nodes.
259
*/
260
public final class SerializableHadoopConfigWrapper implements Serializable {
261
/**
262
* Constructor wrapping a Hadoop Configuration
263
* @param hadoopConfig Hadoop configuration to wrap
264
*/
265
public SerializableHadoopConfigWrapper(Configuration hadoopConfig);
266
267
/**
268
* Get the wrapped Hadoop configuration
269
* @return Hadoop Configuration object
270
*/
271
public Configuration getHadoopConfig();
272
}
273
```
274
275
**Usage Example:**
276
277
```java
278
// Configure ORC and Hadoop settings for optimal performance
279
Configuration orcConfig = new Configuration();
280
281
// Compression settings
282
orcConfig.set("orc.compress", "SNAPPY"); // or ZLIB, LZO, LZ4, ZSTD
283
orcConfig.set("orc.compress.size", "262144"); // 256KB compression blocks
284
285
// Stripe and batch settings for performance tuning
286
orcConfig.set("orc.stripe.size", "67108864"); // 64MB stripes
287
orcConfig.set("orc.row.index.stride", "10000"); // Row index every 10K rows
288
orcConfig.set("orc.create.index", "true"); // Enable indexes
289
290
// Vectorization settings
291
orcConfig.set("orc.row.batch.size", "1024"); // Batch size for vectorized processing
292
293
// Memory settings
294
orcConfig.set("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold
295
296
// Hadoop configuration for HDFS optimization
297
Configuration hadoopConfig = new Configuration();
298
hadoopConfig.set("dfs.client.read.shortcircuit", "true");
299
hadoopConfig.set("dfs.domain.socket.path", "/var/lib/hadoop-hdfs/dn_socket");
300
hadoopConfig.set("dfs.client.cache.readahead", "268435456"); // 256MB readahead
301
302
// Create serializable wrapper
303
SerializableHadoopConfigWrapper configWrapper =
304
new SerializableHadoopConfigWrapper(hadoopConfig);
305
306
// Use in ORC input format
307
OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =
308
new OrcColumnarRowFileInputFormat<>(
309
inputPaths,
310
fieldNames,
311
fieldTypes,
312
selectedFields,
313
predicates,
314
1024, // Batch size
315
orcConfig,
316
configWrapper
317
);
318
```
319
320
### Advanced Type Handling
321
322
Utilities for handling complex ORC type conversions and schema evolution.
323
324
```java { .api }
325
/**
326
* Utility methods for timestamp handling across different Hive versions
327
*/
328
public class TimestampUtil {
329
/**
330
* Check if a column vector is a Hive timestamp column vector
331
* @param columnVector Column vector to check
332
* @return true if it's a Hive timestamp vector
333
*/
334
public static boolean isHiveTimestampColumnVector(
335
org.apache.hadoop.hive.ql.exec.vector.ColumnVector columnVector
336
);
337
338
/**
339
* Create a column vector from a constant timestamp value
340
* @param value Constant timestamp value
341
* @param size Size of the vector
342
* @return Timestamp column vector with constant value
343
*/
344
public static org.apache.hadoop.hive.ql.exec.vector.ColumnVector createVectorFromConstant(
345
Object value,
346
int size
347
);
348
}
349
```
350
351
**Configuration Properties Reference:**
352
353
```java
354
// Common ORC configuration properties for SQL/Table API
355
Properties orcProperties = new Properties();
356
357
// Compression options
358
orcProperties.put("orc.compress", "SNAPPY"); // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
359
orcProperties.put("orc.compress.size", "262144"); // Compression chunk size
360
361
// File structure
362
orcProperties.put("orc.stripe.size", "67108864"); // Target stripe size (64MB)
363
orcProperties.put("orc.row.index.stride", "10000"); // Rows between index entries
364
orcProperties.put("orc.bloom.filter.columns", "user_id,product_id"); // Bloom filter columns
365
orcProperties.put("orc.bloom.filter.fpp", "0.05"); // False positive probability
366
367
// Performance tuning
368
orcProperties.put("orc.row.batch.size", "1024"); // Vectorized batch size
369
orcProperties.put("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold
370
orcProperties.put("orc.max.merge.distance", "1048576"); // Max merge distance (1MB)
371
372
// Schema evolution
373
orcProperties.put("orc.force.positional.evolution", "false"); // Use column names for evolution
374
orcProperties.put("orc.tolerate.missing.schema", "true"); // Handle missing columns
375
376
// For Table API usage in SQL DDL
377
tEnv.executeSql(
378
"CREATE TABLE optimized_sales (" +
379
" user_id BIGINT," +
380
" product_id BIGINT," +
381
" amount DECIMAL(10,2)," +
382
" purchase_time TIMESTAMP(3)" +
383
") WITH (" +
384
" 'connector' = 'filesystem'," +
385
" 'path' = '/path/to/data'," +
386
" 'format' = 'orc'," +
387
" 'orc.compress' = 'ZSTD'," +
388
" 'orc.stripe.size' = '128MB'," +
389
" 'orc.bloom.filter.columns' = 'user_id'," +
390
" 'orc.bloom.filter.fpp' = '0.01'" +
391
")"
392
);
393
```