0
# Enums and Constants
1
2
Public enums and constants provide standardized values for execution strategies, table operation traits, and configuration options. These enumerations ensure type safety and consistency across the planner's optimization and execution processes.
3
4
## Package Information
5
6
```java
7
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
8
import org.apache.flink.table.planner.plan.trait.UpdateKind;
9
import org.apache.flink.table.planner.plan.trait.ModifyKind;
10
import org.apache.flink.table.planner.plan.trait.MiniBatchMode;
11
import org.apache.flink.table.planner.plan.utils.OperatorType;
12
import org.apache.flink.table.planner.utils.InternalConfigOptions;
13
import org.apache.flink.configuration.ConfigOption;
14
```
15
16
## Capabilities
17
18
### AggregatePhaseStrategy
19
20
Defines strategies for executing aggregation operations, determining whether aggregations should be performed in single or multiple phases for optimization.
21
22
```java { .api }
23
public enum AggregatePhaseStrategy {
24
25
/**
26
* No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage
27
* aggregate depends on cost.
28
*/
29
AUTO,
30
31
/**
32
* Enforce to use one stage aggregate which only has CompleteGlobalAggregate.
33
*/
34
ONE_PHASE,
35
36
/**
37
* Enforce to use two stage aggregate which has localAggregate and globalAggregate.
38
* NOTE: If aggregate call does not support split into two phase, still use one stage aggregate.
39
*/
40
TWO_PHASE
41
}
42
```
43
44
**Usage Scenarios:**
45
- **ONE_PHASE**: Small datasets, low cardinality group-by keys, memory-constrained environments
46
- **TWO_PHASE**: Large datasets, high cardinality group-by keys, distributed processing scenarios
47
48
**Usage Example:**
49
50
```java
51
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
52
53
// Configure aggregation strategy based on data characteristics
54
public AggregatePhaseStrategy chooseAggregationStrategy(
55
long estimatedInputRows,
56
int groupByCardinality
57
) {
58
// Use two-phase for large datasets with high cardinality
59
if (estimatedInputRows > 1_000_000 && groupByCardinality > 10_000) {
60
return AggregatePhaseStrategy.TWO_PHASE;
61
} else {
62
return AggregatePhaseStrategy.ONE_PHASE;
63
}
64
}
65
66
// Apply strategy in query planning
67
AggregatePhaseStrategy strategy = chooseAggregationStrategy(rowCount, cardinality);
68
if (strategy.isMultiPhase()) {
69
// Configure multi-phase aggregation
70
configurePreAggregation();
71
configureFinalAggregation();
72
} else {
73
// Configure single-phase aggregation
74
configureSinglePhaseAggregation();
75
}
76
```
77
78
### UpdateKind
79
80
Specifies the type of updates that a streaming operator or table can handle, crucial for streaming table semantics and changelog processing.
81
82
```java { .api }
83
public enum UpdateKind {
84
85
/**
86
* Only update-after records are supported.
87
* Suitable for append-only streams and simple transformations.
88
*/
89
ONLY_UPDATE_AFTER,
90
91
/**
92
* Both update-before and update-after records are supported.
93
* Required for complex operations like joins, aggregations with retractions.
94
*/
95
BEFORE_AND_AFTER;
96
97
/**
98
* Returns whether this update kind supports retraction (update-before) messages.
99
*/
100
public boolean supportsRetractions() {
101
return this == BEFORE_AND_AFTER;
102
}
103
}
104
```
105
106
**Key Concepts:**
107
- **UPDATE_AFTER**: Represents the new value after an update operation
108
- **UPDATE_BEFORE**: Represents the old value before an update operation (retraction)
109
- **Changelog Streams**: Streams that contain insert, update, and delete operations
110
111
**Usage Example:**
112
113
```java
114
import org.apache.flink.table.planner.plan.trait.UpdateKind;
115
116
// Determine update kind based on operator requirements
117
public UpdateKind determineUpdateKind(StreamExecNode<?> node) {
118
// Aggregations and joins typically need retractions
119
if (node instanceof StreamExecGroupAggregate ||
120
node instanceof StreamExecJoin) {
121
return UpdateKind.BEFORE_AND_AFTER;
122
}
123
124
// Simple transformations can work with append-only
125
return UpdateKind.ONLY_UPDATE_AFTER;
126
}
127
128
// Configure changelog mode based on update kind
129
UpdateKind updateKind = determineUpdateKind(execNode);
130
ChangelogMode changelogMode;
131
132
if (updateKind.supportsRetractions()) {
133
changelogMode = ChangelogMode.all(); // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
134
} else {
135
changelogMode = ChangelogMode.insertOnly(); // INSERT only
136
}
137
```
138
139
### ModifyKind
140
141
Defines the types of modification operations that can be performed on tables, essential for DML operation handling and sink compatibility.
142
143
```java { .api }
144
public enum ModifyKind {
145
146
/**
147
* Insertion operation.
148
*/
149
INSERT,
150
151
/**
152
* Update operation.
153
*/
154
UPDATE,
155
156
/**
157
* Deletion operation.
158
*/
159
DELETE
160
}
161
```
162
163
**Usage Example:**
164
165
```java
166
import org.apache.flink.table.planner.plan.trait.ModifyKind;
167
168
// Validate sink compatibility with modify operations
169
public void validateSinkCompatibility(
170
DynamicTableSink sink,
171
Set<ModifyKind> requiredOperations
172
) {
173
for (ModifyKind operation : requiredOperations) {
174
switch (operation) {
175
case INSERT:
176
if (!sink.supportsInsert()) {
177
throw new ValidationException("Sink doesn't support INSERT operations");
178
}
179
break;
180
case UPDATE:
181
if (!sink.supportsUpdate()) {
182
throw new ValidationException("Sink doesn't support UPDATE operations");
183
}
184
break;
185
case DELETE:
186
if (!sink.supportsDelete()) {
187
throw new ValidationException("Sink doesn't support DELETE operations");
188
}
189
break;
190
case UPSERT:
191
if (!sink.supportsUpsert()) {
192
throw new ValidationException("Sink doesn't support UPSERT operations");
193
}
194
break;
195
}
196
}
197
}
198
199
// Determine required operations from SQL statement
200
Set<ModifyKind> operations = analyzeModifyOperations(sqlStatement);
201
validateSinkCompatibility(tableSink, operations);
202
```
203
204
### MiniBatchMode
205
206
Controls mini-batch optimization for streaming operations, enabling higher throughput by batching multiple records for processing.
207
208
```java { .api }
209
public enum MiniBatchMode {
210
211
/**
212
* An operator in ProcTime mode requires watermarks emitted in proctime interval, i.e.,
213
* unbounded group agg with mini-batch enabled.
214
*/
215
ProcTime,
216
217
/**
218
* An operator in RowTime mode requires watermarks extracted from elements, and emitted
219
* in rowtime interval, e.g., window, window join...
220
*/
221
RowTime,
222
223
/**
224
* Default value, meaning no mini-batch interval is required.
225
*/
226
None
227
}
228
```
229
230
**Benefits of Mini-batching:**
231
- **Higher Throughput**: Reduces per-record processing overhead
232
- **Better Resource Utilization**: Amortizes fixed costs across multiple records
233
- **Improved Latency Control**: Configurable batch sizes and timeouts
234
235
**Usage Example:**
236
237
```java
238
import org.apache.flink.table.planner.plan.trait.MiniBatchMode;
239
240
// Configure mini-batch settings for streaming operations
241
public void configureMiniBatch(
242
StreamExecNode<?> node,
243
MiniBatchMode mode,
244
Duration batchInterval
245
) {
246
if (mode.isEnabled()) {
247
// Enable mini-batch with specified interval
248
node.setMiniBatchInterval(batchInterval);
249
node.setMiniBatchSize(1000); // Max records per batch
250
251
// Configure buffer settings
252
node.enableMiniBatchBuffer();
253
} else {
254
// Disable mini-batch for low-latency processing
255
node.disableMiniBatch();
256
}
257
}
258
259
// Determine mini-batch mode based on requirements
260
MiniBatchMode mode = requiresLowLatency ?
261
MiniBatchMode.NONE : MiniBatchMode.ENABLED;
262
263
configureMiniBatch(streamNode, mode, Duration.ofMilliseconds(100));
264
```
265
266
### OperatorType
267
268
Categorizes different types of operators in the query execution plan, used for optimization decisions and resource allocation.
269
270
```java { .api }
271
public enum OperatorType {
272
273
// Source and Sink Operators
274
TABLE_SOURCE_SCAN,
275
TABLE_SINK,
276
277
// Join Operators
278
HASH_JOIN,
279
SORT_MERGE_JOIN,
280
NESTED_LOOP_JOIN,
281
TEMPORAL_JOIN,
282
283
// Aggregation Operators
284
GROUP_AGGREGATE,
285
WINDOW_AGGREGATE,
286
OVER_AGGREGATE,
287
288
// Window Operators
289
TUMBLING_WINDOW,
290
SLIDING_WINDOW,
291
SESSION_WINDOW,
292
293
// Transformation Operators
294
CALC, // Projection and filtering
295
CORRELATE, // Table function operations
296
UNION, // Set union operations
297
SORT, // Sorting operations
298
LIMIT, // Top-N operations
299
300
// Exchange Operators
301
EXCHANGE, // Data redistribution
302
PARTITION_BY_HASH,
303
PARTITION_BY_RANGE;
304
305
/**
306
* Returns whether this operator performs join operations.
307
*/
308
public boolean isJoin() {
309
return this == HASH_JOIN || this == SORT_MERGE_JOIN ||
310
this == NESTED_LOOP_JOIN || this == TEMPORAL_JOIN;
311
}
312
313
/**
314
* Returns whether this operator performs aggregation.
315
*/
316
public boolean isAggregate() {
317
return this == GROUP_AGGREGATE || this == WINDOW_AGGREGATE ||
318
this == OVER_AGGREGATE;
319
}
320
321
/**
322
* Returns whether this operator handles windowing.
323
*/
324
public boolean isWindow() {
325
return this == TUMBLING_WINDOW || this == SLIDING_WINDOW ||
326
this == SESSION_WINDOW || this == WINDOW_AGGREGATE;
327
}
328
}
329
```
330
331
**Usage Example:**
332
333
```java
334
import org.apache.flink.table.planner.plan.utils.OperatorType;
335
336
// Optimize resource allocation based on operator type
337
public void configureOperatorResources(ExecNode<?> node, OperatorType operatorType) {
338
switch (operatorType) {
339
case HASH_JOIN:
340
case GROUP_AGGREGATE:
341
// Memory-intensive operators need more managed memory
342
node.setManagedMemoryFraction(0.4);
343
break;
344
345
case SORT:
346
case SORT_MERGE_JOIN:
347
// Sort operations benefit from spill-to-disk capability
348
node.enableSpilling(true);
349
node.setManagedMemoryFraction(0.3);
350
break;
351
352
case TABLE_SOURCE_SCAN:
353
// I/O intensive operations
354
node.setIOIntensive(true);
355
break;
356
357
default:
358
// Default resource allocation
359
node.setManagedMemoryFraction(0.1);
360
}
361
}
362
363
// Analyze operator characteristics for optimization
364
public OptimizationHints analyzeOperator(OperatorType operatorType) {
365
OptimizationHints hints = new OptimizationHints();
366
367
if (operatorType.isJoin()) {
368
hints.setRequiresShuffle(true);
369
hints.setMemoryIntensive(true);
370
}
371
372
if (operatorType.isAggregate()) {
373
hints.setSupportsPreAggregation(true);
374
hints.setRequiresGrouping(true);
375
}
376
377
if (operatorType.isWindow()) {
378
hints.setRequiresEventTime(true);
379
hints.setStateful(true);
380
}
381
382
return hints;
383
}
384
```
385
386
## Internal Configuration Options
387
388
### InternalConfigOptions
389
390
Configuration constants used internally by the planner for query execution and optimization.
391
392
```java { .api }
393
public final class InternalConfigOptions {
394
395
/**
396
* Query start time in epoch milliseconds.
397
* Used for time-based operations and temporal queries.
398
*/
399
public static final ConfigOption<Long> TABLE_QUERY_START_EPOCH_TIME =
400
ConfigOptions.key("table.query.start.epoch-time")
401
.longType()
402
.noDefaultValue()
403
.withDescription("Query start time in epoch milliseconds");
404
405
/**
406
* Query start time in local time zone.
407
* Used for local time calculations and display.
408
*/
409
public static final ConfigOption<String> TABLE_QUERY_START_LOCAL_TIME =
410
ConfigOptions.key("table.query.start.local-time")
411
.stringType()
412
.noDefaultValue()
413
.withDescription("Query start time in local time zone");
414
415
/**
416
* Maximum number of optimization passes.
417
* Controls the depth of Calcite rule-based optimization.
418
*/
419
public static final ConfigOption<Integer> TABLE_OPTIMIZER_MAX_ITERATIONS =
420
ConfigOptions.key("table.optimizer.max-iterations")
421
.intType()
422
.defaultValue(100)
423
.withDescription("Maximum number of optimizer iterations");
424
425
/**
426
* Whether to enable statistics-based optimization.
427
*/
428
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_STATISTICS_ENABLED =
429
ConfigOptions.key("table.optimizer.statistics.enabled")
430
.booleanType()
431
.defaultValue(true)
432
.withDescription("Enable statistics-based optimization");
433
}
434
```
435
436
**Usage Example:**
437
438
```java
439
import org.apache.flink.table.planner.utils.InternalConfigOptions;
440
import org.apache.flink.configuration.Configuration;
441
442
// Configure planner with internal options
443
public void configurePlannerInternals(Configuration config) {
444
// Set query start time for temporal operations
445
long queryStartTime = System.currentTimeMillis();
446
config.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, queryStartTime);
447
448
// Set local time for display purposes
449
LocalDateTime localTime = LocalDateTime.now();
450
config.setString(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
451
localTime.toString());
452
453
// Configure optimization limits
454
config.setInteger(InternalConfigOptions.TABLE_OPTIMIZER_MAX_ITERATIONS, 150);
455
config.setBoolean(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED, true);
456
}
457
458
// Access internal configuration in planner
459
public void setupPlannerContext(PlannerConfiguration plannerConfig) {
460
Configuration config = plannerConfig.getConfiguration();
461
462
// Get query start time for temporal operations
463
Long startEpochTime = config.get(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME);
464
if (startEpochTime != null) {
465
setupTemporalQueries(startEpochTime);
466
}
467
468
// Configure optimizer based on settings
469
boolean statisticsEnabled = config.get(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED);
470
if (statisticsEnabled) {
471
enableStatisticsBasedOptimization();
472
}
473
}
474
```
475
476
## Enum Combination Patterns
477
478
### Comprehensive Operation Configuration
479
480
```java
481
// Configure streaming operation with multiple traits
482
public class StreamOperationConfig {
483
private final UpdateKind updateKind;
484
private final ModifyKind modifyKind;
485
private final MiniBatchMode miniBatchMode;
486
private final AggregatePhaseStrategy aggregateStrategy;
487
488
public StreamOperationConfig(
489
UpdateKind updateKind,
490
ModifyKind modifyKind,
491
MiniBatchMode miniBatchMode,
492
AggregatePhaseStrategy aggregateStrategy
493
) {
494
this.updateKind = updateKind;
495
this.modifyKind = modifyKind;
496
this.miniBatchMode = miniBatchMode;
497
this.aggregateStrategy = aggregateStrategy;
498
}
499
500
public boolean requiresStateBackend() {
501
return updateKind.supportsRetractions() ||
502
aggregateStrategy.isMultiPhase();
503
}
504
505
public boolean supportsLowLatency() {
506
return !miniBatchMode.isEnabled() &&
507
aggregateStrategy == AggregatePhaseStrategy.ONE_PHASE;
508
}
509
}
510
511
// Create optimized configuration
512
StreamOperationConfig config = new StreamOperationConfig(
513
UpdateKind.BEFORE_AND_AFTER, // Support retractions
514
ModifyKind.UPSERT, // Upsert operations
515
MiniBatchMode.ENABLED, // Enable batching
516
AggregatePhaseStrategy.TWO_PHASE // Multi-phase aggregation
517
);
518
```
519
520
### Validation Using Enums
521
522
```java
523
// Comprehensive validation using enum combinations
524
public class OperationValidator {
525
526
public ValidationResult validate(
527
OperatorType operatorType,
528
UpdateKind updateKind,
529
ModifyKind modifyKind,
530
MiniBatchMode miniBatchMode
531
) {
532
List<String> errors = new ArrayList<>();
533
534
// Validate join operators
535
if (operatorType.isJoin() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {
536
errors.add("Join operators require BEFORE_AND_AFTER update kind for correctness");
537
}
538
539
// Validate aggregation operators
540
if (operatorType.isAggregate() && !updateKind.supportsRetractions()) {
541
errors.add("Aggregate operators need retraction support for accurate results");
542
}
543
544
// Validate modify operations
545
if (modifyKind.isModification() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {
546
errors.add("Update/Delete operations require retraction support");
547
}
548
549
// Validate mini-batch compatibility
550
if (miniBatchMode.isEnabled() && operatorType.isWindow()) {
551
errors.add("Mini-batch mode may interfere with window semantics");
552
}
553
554
return errors.isEmpty() ?
555
ValidationResult.success() :
556
ValidationResult.failure(errors);
557
}
558
}
559
```