or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatastream-api.mdindex.mdtable-api.md

configuration.mddocs/

0

# Configuration and Advanced Usage

1

2

Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters, metadata handling, and version compatibility management for optimal performance in large-scale data processing environments.

3

4

## Capabilities

5

6

### Filter Pushdown

7

8

Advanced predicate pushdown capabilities for optimizing query performance by filtering data at the ORC reader level.

9

10

```java { .api }

11

/**

12

* Utility class for converting Flink expressions to ORC predicates.

13

* Enables filter pushdown optimization for improved query performance.

14

*/

15

public class OrcFilters {

16

/**

17

* Convert a Flink expression to an ORC predicate

18

* @param expression Flink filter expression

19

* @return ORC predicate for pushdown filtering

20

*/

21

public static Predicate toOrcPredicate(Expression expression);

22

23

/** Abstract base class for all ORC predicates */

24

public abstract static class Predicate implements Serializable {

25

public abstract SearchArgument.Builder add(SearchArgument.Builder builder);

26

}

27

28

/** Base class for column-only predicates */

29

public abstract static class ColumnPredicate extends Predicate {

30

/** Column name this predicate applies to */

31

protected final String columnName;

32

33

public ColumnPredicate(String columnName);

34

public String getColumnName();

35

}

36

37

/** Base class for binary predicates (comparison operations) */

38

public abstract static class BinaryPredicate extends ColumnPredicate {

39

/** Literal value for comparison */

40

protected final Serializable literal;

41

/** Type of the literal value */

42

protected final PredicateLeaf.Type literalType;

43

44

public BinaryPredicate(String columnName, PredicateLeaf.Type literalType, Serializable literal);

45

public Serializable getLiteral();

46

public PredicateLeaf.Type getLiteralType();

47

}

48

49

/** Equality predicate: column = literal */

50

public static class Equals extends BinaryPredicate {

51

public Equals(String columnName, PredicateLeaf.Type literalType, Serializable literal);

52

}

53

54

/** Null-safe equality predicate: column <=> literal */

55

public static class NullSafeEquals extends BinaryPredicate {

56

public NullSafeEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);

57

}

58

59

/** Less than predicate: column < literal */

60

public static class LessThan extends BinaryPredicate {

61

public LessThan(String columnName, PredicateLeaf.Type literalType, Serializable literal);

62

}

63

64

/** Less than or equals predicate: column <= literal */

65

public static class LessThanEquals extends BinaryPredicate {

66

public LessThanEquals(String columnName, PredicateLeaf.Type literalType, Serializable literal);

67

}

68

69

/** Is null predicate: column IS NULL */

70

public static class IsNull extends ColumnPredicate {

71

protected final PredicateLeaf.Type literalType;

72

73

public IsNull(String columnName, PredicateLeaf.Type literalType);

74

public PredicateLeaf.Type getLiteralType();

75

}

76

77

/** Between predicate: column BETWEEN lower AND upper */

78

public static class Between extends ColumnPredicate {

79

protected final PredicateLeaf.Type literalType;

80

protected final Serializable lowerBound;

81

protected final Serializable upperBound;

82

83

public Between(String columnName, PredicateLeaf.Type literalType, Serializable lowerBound, Serializable upperBound);

84

public Serializable getLowerBound();

85

public Serializable getUpperBound();

86

public PredicateLeaf.Type getLiteralType();

87

}

88

89

/** In predicate: column IN (value1, value2, ...) */

90

public static class In extends ColumnPredicate {

91

protected final PredicateLeaf.Type literalType;

92

protected final Serializable[] literals;

93

94

public In(String columnName, PredicateLeaf.Type literalType, Serializable[] literals);

95

public Serializable[] getLiterals();

96

public PredicateLeaf.Type getLiteralType();

97

}

98

99

/** Not predicate: NOT (predicate) */

100

public static class Not extends Predicate {

101

protected final Predicate childPredicate;

102

103

public Not(Predicate childPredicate);

104

public Predicate getChildPredicate();

105

}

106

107

/** Or predicate: predicate1 OR predicate2 */

108

public static class Or extends Predicate {

109

protected final Predicate[] childPredicates;

110

111

public Or(Predicate[] childPredicates);

112

public Predicate[] getChildPredicates();

113

}

114

}

115

```

116

117

**Usage Example:**

118

119

```java

120

import org.apache.flink.orc.OrcFilters;

121

import org.apache.flink.table.expressions.Expression;

122

import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;

123

124

// In practice, predicates are typically created by converting Flink expressions

125

// rather than constructing them directly

126

List<Expression> filterExpressions = // ... get from query planner

127

128

List<OrcFilters.Predicate> orcPredicates = new ArrayList<>();

129

for (Expression expr : filterExpressions) {

130

OrcFilters.Predicate pred = OrcFilters.toOrcPredicate(expr);

131

if (pred != null) {

132

orcPredicates.add(pred);

133

}

134

}

135

136

// Apply converted predicates to input format for pushdown filtering

137

OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =

138

new OrcColumnarRowFileInputFormat<>(

139

inputPaths,

140

fieldNames,

141

fieldTypes,

142

selectedFields,

143

orcPredicates, // Predicates for pushdown

144

batchSize,

145

orcConfig,

146

hadoopConfig

147

);

148

149

// Direct predicate construction (advanced usage)

150

List<OrcFilters.Predicate> directPredicates = Arrays.asList(

151

// amount > 100.0 (note: constructor parameters from actual source)

152

new OrcFilters.LessThan("amount", PredicateLeaf.Type.DECIMAL, new BigDecimal("100.0")),

153

// user_id IS NULL

154

new OrcFilters.IsNull("user_id", PredicateLeaf.Type.LONG)

155

);

156

```

157

158

### Version Compatibility Layer

159

160

Compatibility layer for supporting different Hive and ORC versions seamlessly.

161

162

```java { .api }

163

/**

164

* Version compatibility interface for different Hive/ORC versions.

165

* Abstracts version-specific differences in ORC reader implementations.

166

* @param <BATCH> Type of batch used (typically VectorizedRowBatch)

167

*/

168

public interface OrcShim<BATCH> extends Serializable {

169

/**

170

* Create an ORC record reader for the specified file and range

171

* @param conf Hadoop configuration

172

* @param schema ORC schema description

173

* @param selectedFields Indices of fields to read (for projection)

174

* @param conjunctPredicates List of predicates for pushdown filtering

175

* @param path Path to ORC file

176

* @param splitStart Start offset for reading

177

* @param splitLength Length to read

178

* @return Configured ORC record reader

179

* @throws IOException If reader creation fails

180

*/

181

RecordReader createRecordReader(

182

Configuration conf,

183

TypeDescription schema,

184

int[] selectedFields,

185

List<OrcFilters.Predicate> conjunctPredicates,

186

org.apache.flink.core.fs.Path path,

187

long splitStart,

188

long splitLength

189

) throws IOException;

190

191

/**

192

* Create a batch wrapper for the given schema and batch size

193

* @param schema ORC schema description

194

* @param batchSize Expected batch size

195

* @return Wrapped batch for unified processing

196

*/

197

OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);

198

199

/**

200

* Read the next batch from the record reader

201

* @param reader ORC record reader

202

* @param batch Batch to populate

203

* @return true if batch was read, false if end of file

204

* @throws IOException If reading fails

205

*/

206

boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;

207

208

/**

209

* Get the default shim for the current runtime environment

210

* Typically uses Hive 2.3.0+ compatibility

211

* @return Default ORC shim instance

212

*/

213

static OrcShim<VectorizedRowBatch> defaultShim();

214

215

/**

216

* Create a shim for a specific Hive version

217

* @param hiveDependencyVersion Hive version string (e.g., "2.0.0", "2.1.0", "2.3.0")

218

* @return Version-specific ORC shim

219

*/

220

static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);

221

}

222

```

223

224

**Usage Example:**

225

226

```java

227

// Using specific Hive version compatibility

228

OrcShim<VectorizedRowBatch> shim;

229

230

// Auto-detect and use appropriate shim

231

shim = OrcShim.defaultShim();

232

233

// Or specify exact version for compatibility

234

shim = OrcShim.createShim("2.1.0"); // For Hive 2.1.x compatibility

235

236

// Use shim in reader configuration

237

OrcColumnarRowSplitReader<VectorizedRowBatch> reader =

238

new OrcColumnarRowSplitReader<>(

239

shim,

240

orcConfig,

241

fieldNames,

242

fieldTypes,

243

selectedFields,

244

predicates,

245

batchSize,

246

split,

247

batchGenerator

248

);

249

```

250

251

### Configuration Management

252

253

Utilities for managing Hadoop and ORC configurations in distributed environments.

254

255

```java { .api }

256

/**

257

* Serializable wrapper for Hadoop Configuration objects.

258

* Enables distribution of configuration across Flink cluster nodes.

259

*/

260

public final class SerializableHadoopConfigWrapper implements Serializable {

261

/**

262

* Constructor wrapping a Hadoop Configuration

263

* @param hadoopConfig Hadoop configuration to wrap

264

*/

265

public SerializableHadoopConfigWrapper(Configuration hadoopConfig);

266

267

/**

268

* Get the wrapped Hadoop configuration

269

* @return Hadoop Configuration object

270

*/

271

public Configuration getHadoopConfig();

272

}

273

```

274

275

**Usage Example:**

276

277

```java

278

// Configure ORC and Hadoop settings for optimal performance

279

Configuration orcConfig = new Configuration();

280

281

// Compression settings

282

orcConfig.set("orc.compress", "SNAPPY"); // or ZLIB, LZO, LZ4, ZSTD

283

orcConfig.set("orc.compress.size", "262144"); // 256KB compression blocks

284

285

// Stripe and batch settings for performance tuning

286

orcConfig.set("orc.stripe.size", "67108864"); // 64MB stripes

287

orcConfig.set("orc.row.index.stride", "10000"); // Row index every 10K rows

288

orcConfig.set("orc.create.index", "true"); // Enable indexes

289

290

// Vectorization settings

291

orcConfig.set("orc.row.batch.size", "1024"); // Batch size for vectorized processing

292

293

// Memory settings

294

orcConfig.set("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold

295

296

// Hadoop configuration for HDFS optimization

297

Configuration hadoopConfig = new Configuration();

298

hadoopConfig.set("dfs.client.read.shortcircuit", "true");

299

hadoopConfig.set("dfs.domain.socket.path", "/var/lib/hadoop-hdfs/dn_socket");

300

hadoopConfig.set("dfs.client.cache.readahead", "268435456"); // 256MB readahead

301

302

// Create serializable wrapper

303

SerializableHadoopConfigWrapper configWrapper =

304

new SerializableHadoopConfigWrapper(hadoopConfig);

305

306

// Use in ORC input format

307

OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =

308

new OrcColumnarRowFileInputFormat<>(

309

inputPaths,

310

fieldNames,

311

fieldTypes,

312

selectedFields,

313

predicates,

314

1024, // Batch size

315

orcConfig,

316

configWrapper

317

);

318

```

319

320

### Advanced Type Handling

321

322

Utilities for handling complex ORC type conversions and schema evolution.

323

324

```java { .api }

325

/**

326

* Utility methods for timestamp handling across different Hive versions

327

*/

328

public class TimestampUtil {

329

/**

330

* Check if a column vector is a Hive timestamp column vector

331

* @param columnVector Column vector to check

332

* @return true if it's a Hive timestamp vector

333

*/

334

public static boolean isHiveTimestampColumnVector(

335

org.apache.hadoop.hive.ql.exec.vector.ColumnVector columnVector

336

);

337

338

/**

339

* Create a column vector from a constant timestamp value

340

* @param value Constant timestamp value

341

* @param size Size of the vector

342

* @return Timestamp column vector with constant value

343

*/

344

public static org.apache.hadoop.hive.ql.exec.vector.ColumnVector createVectorFromConstant(

345

Object value,

346

int size

347

);

348

}

349

```

350

351

**Configuration Properties Reference:**

352

353

```java

354

// Common ORC configuration properties for SQL/Table API

355

Properties orcProperties = new Properties();

356

357

// Compression options

358

orcProperties.put("orc.compress", "SNAPPY"); // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD

359

orcProperties.put("orc.compress.size", "262144"); // Compression chunk size

360

361

// File structure

362

orcProperties.put("orc.stripe.size", "67108864"); // Target stripe size (64MB)

363

orcProperties.put("orc.row.index.stride", "10000"); // Rows between index entries

364

orcProperties.put("orc.bloom.filter.columns", "user_id,product_id"); // Bloom filter columns

365

orcProperties.put("orc.bloom.filter.fpp", "0.05"); // False positive probability

366

367

// Performance tuning

368

orcProperties.put("orc.row.batch.size", "1024"); // Vectorized batch size

369

orcProperties.put("orc.dictionary.key.threshold", "0.8"); // Dictionary encoding threshold

370

orcProperties.put("orc.max.merge.distance", "1048576"); // Max merge distance (1MB)

371

372

// Schema evolution

373

orcProperties.put("orc.force.positional.evolution", "false"); // Use column names for evolution

374

orcProperties.put("orc.tolerate.missing.schema", "true"); // Handle missing columns

375

376

// For Table API usage in SQL DDL

377

tEnv.executeSql(

378

"CREATE TABLE optimized_sales (" +

379

" user_id BIGINT," +

380

" product_id BIGINT," +

381

" amount DECIMAL(10,2)," +

382

" purchase_time TIMESTAMP(3)" +

383

") WITH (" +

384

" 'connector' = 'filesystem'," +

385

" 'path' = '/path/to/data'," +

386

" 'format' = 'orc'," +

387

" 'orc.compress' = 'ZSTD'," +

388

" 'orc.stripe.size' = '128MB'," +

389

" 'orc.bloom.filter.columns' = 'user_id'," +

390

" 'orc.bloom.filter.fpp' = '0.01'" +

391

")"

392

);

393

```