or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md

enums-constants.mddocs/

0

# Enums and Constants

1

2

Public enums and constants provide standardized values for execution strategies, table operation traits, and configuration options. These enumerations ensure type safety and consistency across the planner's optimization and execution processes.

3

4

## Package Information

5

6

```java

7

import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;

8

import org.apache.flink.table.planner.plan.trait.UpdateKind;

9

import org.apache.flink.table.planner.plan.trait.ModifyKind;

10

import org.apache.flink.table.planner.plan.trait.MiniBatchMode;

11

import org.apache.flink.table.planner.plan.utils.OperatorType;

12

import org.apache.flink.table.planner.utils.InternalConfigOptions;

13

import org.apache.flink.configuration.ConfigOption;

14

```

15

16

## Capabilities

17

18

### AggregatePhaseStrategy

19

20

Defines strategies for executing aggregation operations, determining whether aggregations should be performed in single or multiple phases for optimization.

21

22

```java { .api }

23

public enum AggregatePhaseStrategy {

24

25

/**

26

* No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage

27

* aggregate depends on cost.

28

*/

29

AUTO,

30

31

/**

32

* Enforce to use one stage aggregate which only has CompleteGlobalAggregate.

33

*/

34

ONE_PHASE,

35

36

/**

37

* Enforce to use two stage aggregate which has localAggregate and globalAggregate.

38

* NOTE: If aggregate call does not support split into two phase, still use one stage aggregate.

39

*/

40

TWO_PHASE

41

}

42

```

43

44

**Usage Scenarios:**

45

- **ONE_PHASE**: Small datasets, low cardinality group-by keys, memory-constrained environments

46

- **TWO_PHASE**: Large datasets, high cardinality group-by keys, distributed processing scenarios

47

48

**Usage Example:**

49

50

```java

51

import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;

52

53

// Configure aggregation strategy based on data characteristics

54

public AggregatePhaseStrategy chooseAggregationStrategy(

55

long estimatedInputRows,

56

int groupByCardinality

57

) {

58

// Use two-phase for large datasets with high cardinality

59

if (estimatedInputRows > 1_000_000 && groupByCardinality > 10_000) {

60

return AggregatePhaseStrategy.TWO_PHASE;

61

} else {

62

return AggregatePhaseStrategy.ONE_PHASE;

63

}

64

}

65

66

// Apply strategy in query planning

67

AggregatePhaseStrategy strategy = chooseAggregationStrategy(rowCount, cardinality);

68

if (strategy.isMultiPhase()) {

69

// Configure multi-phase aggregation

70

configurePreAggregation();

71

configureFinalAggregation();

72

} else {

73

// Configure single-phase aggregation

74

configureSinglePhaseAggregation();

75

}

76

```

77

78

### UpdateKind

79

80

Specifies the type of updates that a streaming operator or table can handle, crucial for streaming table semantics and changelog processing.

81

82

```java { .api }

83

public enum UpdateKind {

84

85

/**

86

* Only update-after records are supported.

87

* Suitable for append-only streams and simple transformations.

88

*/

89

ONLY_UPDATE_AFTER,

90

91

/**

92

* Both update-before and update-after records are supported.

93

* Required for complex operations like joins, aggregations with retractions.

94

*/

95

BEFORE_AND_AFTER;

96

97

/**

98

* Returns whether this update kind supports retraction (update-before) messages.

99

*/

100

public boolean supportsRetractions() {

101

return this == BEFORE_AND_AFTER;

102

}

103

}

104

```

105

106

**Key Concepts:**

107

- **UPDATE_AFTER**: Represents the new value after an update operation

108

- **UPDATE_BEFORE**: Represents the old value before an update operation (retraction)

109

- **Changelog Streams**: Streams that contain insert, update, and delete operations

110

111

**Usage Example:**

112

113

```java

114

import org.apache.flink.table.planner.plan.trait.UpdateKind;

115

116

// Determine update kind based on operator requirements

117

public UpdateKind determineUpdateKind(StreamExecNode<?> node) {

118

// Aggregations and joins typically need retractions

119

if (node instanceof StreamExecGroupAggregate ||

120

node instanceof StreamExecJoin) {

121

return UpdateKind.BEFORE_AND_AFTER;

122

}

123

124

// Simple transformations can work with append-only

125

return UpdateKind.ONLY_UPDATE_AFTER;

126

}

127

128

// Configure changelog mode based on update kind

129

UpdateKind updateKind = determineUpdateKind(execNode);

130

ChangelogMode changelogMode;

131

132

if (updateKind.supportsRetractions()) {

133

changelogMode = ChangelogMode.all(); // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE

134

} else {

135

changelogMode = ChangelogMode.insertOnly(); // INSERT only

136

}

137

```

138

139

### ModifyKind

140

141

Defines the types of modification operations that can be performed on tables, essential for DML operation handling and sink compatibility.

142

143

```java { .api }

144

public enum ModifyKind {

145

146

/**

147

* Insertion operation.

148

*/

149

INSERT,

150

151

/**

152

* Update operation.

153

*/

154

UPDATE,

155

156

/**

157

* Deletion operation.

158

*/

159

DELETE

160

}

161

```

162

163

**Usage Example:**

164

165

```java

166

import org.apache.flink.table.planner.plan.trait.ModifyKind;

167

168

// Validate sink compatibility with modify operations

169

public void validateSinkCompatibility(

170

DynamicTableSink sink,

171

Set<ModifyKind> requiredOperations

172

) {

173

for (ModifyKind operation : requiredOperations) {

174

switch (operation) {

175

case INSERT:

176

if (!sink.supportsInsert()) {

177

throw new ValidationException("Sink doesn't support INSERT operations");

178

}

179

break;

180

case UPDATE:

181

if (!sink.supportsUpdate()) {

182

throw new ValidationException("Sink doesn't support UPDATE operations");

183

}

184

break;

185

case DELETE:

186

if (!sink.supportsDelete()) {

187

throw new ValidationException("Sink doesn't support DELETE operations");

188

}

189

break;

190

case UPSERT:

191

if (!sink.supportsUpsert()) {

192

throw new ValidationException("Sink doesn't support UPSERT operations");

193

}

194

break;

195

}

196

}

197

}

198

199

// Determine required operations from SQL statement

200

Set<ModifyKind> operations = analyzeModifyOperations(sqlStatement);

201

validateSinkCompatibility(tableSink, operations);

202

```

203

204

### MiniBatchMode

205

206

Controls mini-batch optimization for streaming operations, enabling higher throughput by batching multiple records for processing.

207

208

```java { .api }

209

public enum MiniBatchMode {

210

211

/**

212

* An operator in ProcTime mode requires watermarks emitted in proctime interval, i.e.,

213

* unbounded group agg with mini-batch enabled.

214

*/

215

ProcTime,

216

217

/**

218

* An operator in RowTime mode requires watermarks extracted from elements, and emitted

219

* in rowtime interval, e.g., window, window join...

220

*/

221

RowTime,

222

223

/**

224

* Default value, meaning no mini-batch interval is required.

225

*/

226

None

227

}

228

```

229

230

**Benefits of Mini-batching:**

231

- **Higher Throughput**: Reduces per-record processing overhead

232

- **Better Resource Utilization**: Amortizes fixed costs across multiple records

233

- **Improved Latency Control**: Configurable batch sizes and timeouts

234

235

**Usage Example:**

236

237

```java

238

import org.apache.flink.table.planner.plan.trait.MiniBatchMode;

239

240

// Configure mini-batch settings for streaming operations

241

public void configureMiniBatch(

242

StreamExecNode<?> node,

243

MiniBatchMode mode,

244

Duration batchInterval

245

) {

246

if (mode.isEnabled()) {

247

// Enable mini-batch with specified interval

248

node.setMiniBatchInterval(batchInterval);

249

node.setMiniBatchSize(1000); // Max records per batch

250

251

// Configure buffer settings

252

node.enableMiniBatchBuffer();

253

} else {

254

// Disable mini-batch for low-latency processing

255

node.disableMiniBatch();

256

}

257

}

258

259

// Determine mini-batch mode based on requirements

260

MiniBatchMode mode = requiresLowLatency ?

261

MiniBatchMode.NONE : MiniBatchMode.ENABLED;

262

263

configureMiniBatch(streamNode, mode, Duration.ofMilliseconds(100));

264

```

265

266

### OperatorType

267

268

Categorizes different types of operators in the query execution plan, used for optimization decisions and resource allocation.

269

270

```java { .api }

271

public enum OperatorType {

272

273

// Source and Sink Operators

274

TABLE_SOURCE_SCAN,

275

TABLE_SINK,

276

277

// Join Operators

278

HASH_JOIN,

279

SORT_MERGE_JOIN,

280

NESTED_LOOP_JOIN,

281

TEMPORAL_JOIN,

282

283

// Aggregation Operators

284

GROUP_AGGREGATE,

285

WINDOW_AGGREGATE,

286

OVER_AGGREGATE,

287

288

// Window Operators

289

TUMBLING_WINDOW,

290

SLIDING_WINDOW,

291

SESSION_WINDOW,

292

293

// Transformation Operators

294

CALC, // Projection and filtering

295

CORRELATE, // Table function operations

296

UNION, // Set union operations

297

SORT, // Sorting operations

298

LIMIT, // Top-N operations

299

300

// Exchange Operators

301

EXCHANGE, // Data redistribution

302

PARTITION_BY_HASH,

303

PARTITION_BY_RANGE;

304

305

/**

306

* Returns whether this operator performs join operations.

307

*/

308

public boolean isJoin() {

309

return this == HASH_JOIN || this == SORT_MERGE_JOIN ||

310

this == NESTED_LOOP_JOIN || this == TEMPORAL_JOIN;

311

}

312

313

/**

314

* Returns whether this operator performs aggregation.

315

*/

316

public boolean isAggregate() {

317

return this == GROUP_AGGREGATE || this == WINDOW_AGGREGATE ||

318

this == OVER_AGGREGATE;

319

}

320

321

/**

322

* Returns whether this operator handles windowing.

323

*/

324

public boolean isWindow() {

325

return this == TUMBLING_WINDOW || this == SLIDING_WINDOW ||

326

this == SESSION_WINDOW || this == WINDOW_AGGREGATE;

327

}

328

}

329

```

330

331

**Usage Example:**

332

333

```java

334

import org.apache.flink.table.planner.plan.utils.OperatorType;

335

336

// Optimize resource allocation based on operator type

337

public void configureOperatorResources(ExecNode<?> node, OperatorType operatorType) {

338

switch (operatorType) {

339

case HASH_JOIN:

340

case GROUP_AGGREGATE:

341

// Memory-intensive operators need more managed memory

342

node.setManagedMemoryFraction(0.4);

343

break;

344

345

case SORT:

346

case SORT_MERGE_JOIN:

347

// Sort operations benefit from spill-to-disk capability

348

node.enableSpilling(true);

349

node.setManagedMemoryFraction(0.3);

350

break;

351

352

case TABLE_SOURCE_SCAN:

353

// I/O intensive operations

354

node.setIOIntensive(true);

355

break;

356

357

default:

358

// Default resource allocation

359

node.setManagedMemoryFraction(0.1);

360

}

361

}

362

363

// Analyze operator characteristics for optimization

364

public OptimizationHints analyzeOperator(OperatorType operatorType) {

365

OptimizationHints hints = new OptimizationHints();

366

367

if (operatorType.isJoin()) {

368

hints.setRequiresShuffle(true);

369

hints.setMemoryIntensive(true);

370

}

371

372

if (operatorType.isAggregate()) {

373

hints.setSupportsPreAggregation(true);

374

hints.setRequiresGrouping(true);

375

}

376

377

if (operatorType.isWindow()) {

378

hints.setRequiresEventTime(true);

379

hints.setStateful(true);

380

}

381

382

return hints;

383

}

384

```

385

386

## Internal Configuration Options

387

388

### InternalConfigOptions

389

390

Configuration constants used internally by the planner for query execution and optimization.

391

392

```java { .api }

393

public final class InternalConfigOptions {

394

395

/**

396

* Query start time in epoch milliseconds.

397

* Used for time-based operations and temporal queries.

398

*/

399

public static final ConfigOption<Long> TABLE_QUERY_START_EPOCH_TIME =

400

ConfigOptions.key("table.query.start.epoch-time")

401

.longType()

402

.noDefaultValue()

403

.withDescription("Query start time in epoch milliseconds");

404

405

/**

406

* Query start time in local time zone.

407

* Used for local time calculations and display.

408

*/

409

public static final ConfigOption<String> TABLE_QUERY_START_LOCAL_TIME =

410

ConfigOptions.key("table.query.start.local-time")

411

.stringType()

412

.noDefaultValue()

413

.withDescription("Query start time in local time zone");

414

415

/**

416

* Maximum number of optimization passes.

417

* Controls the depth of Calcite rule-based optimization.

418

*/

419

public static final ConfigOption<Integer> TABLE_OPTIMIZER_MAX_ITERATIONS =

420

ConfigOptions.key("table.optimizer.max-iterations")

421

.intType()

422

.defaultValue(100)

423

.withDescription("Maximum number of optimizer iterations");

424

425

/**

426

* Whether to enable statistics-based optimization.

427

*/

428

public static final ConfigOption<Boolean> TABLE_OPTIMIZER_STATISTICS_ENABLED =

429

ConfigOptions.key("table.optimizer.statistics.enabled")

430

.booleanType()

431

.defaultValue(true)

432

.withDescription("Enable statistics-based optimization");

433

}

434

```

435

436

**Usage Example:**

437

438

```java

439

import org.apache.flink.table.planner.utils.InternalConfigOptions;

440

import org.apache.flink.configuration.Configuration;

441

442

// Configure planner with internal options

443

public void configurePlannerInternals(Configuration config) {

444

// Set query start time for temporal operations

445

long queryStartTime = System.currentTimeMillis();

446

config.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, queryStartTime);

447

448

// Set local time for display purposes

449

LocalDateTime localTime = LocalDateTime.now();

450

config.setString(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,

451

localTime.toString());

452

453

// Configure optimization limits

454

config.setInteger(InternalConfigOptions.TABLE_OPTIMIZER_MAX_ITERATIONS, 150);

455

config.setBoolean(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED, true);

456

}

457

458

// Access internal configuration in planner

459

public void setupPlannerContext(PlannerConfiguration plannerConfig) {

460

Configuration config = plannerConfig.getConfiguration();

461

462

// Get query start time for temporal operations

463

Long startEpochTime = config.get(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME);

464

if (startEpochTime != null) {

465

setupTemporalQueries(startEpochTime);

466

}

467

468

// Configure optimizer based on settings

469

boolean statisticsEnabled = config.get(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED);

470

if (statisticsEnabled) {

471

enableStatisticsBasedOptimization();

472

}

473

}

474

```

475

476

## Enum Combination Patterns

477

478

### Comprehensive Operation Configuration

479

480

```java

481

// Configure streaming operation with multiple traits

482

public class StreamOperationConfig {

483

private final UpdateKind updateKind;

484

private final ModifyKind modifyKind;

485

private final MiniBatchMode miniBatchMode;

486

private final AggregatePhaseStrategy aggregateStrategy;

487

488

public StreamOperationConfig(

489

UpdateKind updateKind,

490

ModifyKind modifyKind,

491

MiniBatchMode miniBatchMode,

492

AggregatePhaseStrategy aggregateStrategy

493

) {

494

this.updateKind = updateKind;

495

this.modifyKind = modifyKind;

496

this.miniBatchMode = miniBatchMode;

497

this.aggregateStrategy = aggregateStrategy;

498

}

499

500

public boolean requiresStateBackend() {

501

return updateKind.supportsRetractions() ||

502

aggregateStrategy.isMultiPhase();

503

}

504

505

public boolean supportsLowLatency() {

506

return !miniBatchMode.isEnabled() &&

507

aggregateStrategy == AggregatePhaseStrategy.ONE_PHASE;

508

}

509

}

510

511

// Create optimized configuration

512

StreamOperationConfig config = new StreamOperationConfig(

513

UpdateKind.BEFORE_AND_AFTER, // Support retractions

514

ModifyKind.UPSERT, // Upsert operations

515

MiniBatchMode.ENABLED, // Enable batching

516

AggregatePhaseStrategy.TWO_PHASE // Multi-phase aggregation

517

);

518

```

519

520

### Validation Using Enums

521

522

```java

523

// Comprehensive validation using enum combinations

524

public class OperationValidator {

525

526

public ValidationResult validate(

527

OperatorType operatorType,

528

UpdateKind updateKind,

529

ModifyKind modifyKind,

530

MiniBatchMode miniBatchMode

531

) {

532

List<String> errors = new ArrayList<>();

533

534

// Validate join operators

535

if (operatorType.isJoin() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {

536

errors.add("Join operators require BEFORE_AND_AFTER update kind for correctness");

537

}

538

539

// Validate aggregation operators

540

if (operatorType.isAggregate() && !updateKind.supportsRetractions()) {

541

errors.add("Aggregate operators need retraction support for accurate results");

542

}

543

544

// Validate modify operations

545

if (modifyKind.isModification() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {

546

errors.add("Update/Delete operations require retraction support");

547

}

548

549

// Validate mini-batch compatibility

550

if (miniBatchMode.isEnabled() && operatorType.isWindow()) {

551

errors.add("Mini-batch mode may interfere with window semantics");

552

}

553

554

return errors.isEmpty() ?

555

ValidationResult.success() :

556

ValidationResult.failure(errors);

557

}

558

}

559

```