Public enums and constants provide standardized values for execution strategies, table operation traits, and configuration options. These enumerations ensure type safety and consistency across the planner's optimization and execution processes.
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.plan.trait.UpdateKind;
import org.apache.flink.table.planner.plan.trait.ModifyKind;
import org.apache.flink.table.planner.plan.trait.MiniBatchMode;
import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.planner.utils.InternalConfigOptions;
import org.apache.flink.configuration.ConfigOption;Defines strategies for executing aggregation operations, determining whether aggregations should be performed in single or multiple phases for optimization.
public enum AggregatePhaseStrategy {
/**
* No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage
* aggregate depends on cost.
*/
AUTO,
/**
* Enforce to use one stage aggregate which only has CompleteGlobalAggregate.
*/
ONE_PHASE,
/**
* Enforce to use two stage aggregate which has localAggregate and globalAggregate.
* NOTE: If aggregate call does not support split into two phase, still use one stage aggregate.
*/
TWO_PHASE
}Usage Scenarios:
Usage Example:
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
// Configure aggregation strategy based on data characteristics
public AggregatePhaseStrategy chooseAggregationStrategy(
long estimatedInputRows,
int groupByCardinality
) {
// Use two-phase for large datasets with high cardinality
if (estimatedInputRows > 1_000_000 && groupByCardinality > 10_000) {
return AggregatePhaseStrategy.TWO_PHASE;
} else {
return AggregatePhaseStrategy.ONE_PHASE;
}
}
// Apply strategy in query planning
AggregatePhaseStrategy strategy = chooseAggregationStrategy(rowCount, cardinality);
if (strategy.isMultiPhase()) {
// Configure multi-phase aggregation
configurePreAggregation();
configureFinalAggregation();
} else {
// Configure single-phase aggregation
configureSinglePhaseAggregation();
}Specifies the type of updates that a streaming operator or table can handle, crucial for streaming table semantics and changelog processing.
public enum UpdateKind {
/**
* Only update-after records are supported.
* Suitable for append-only streams and simple transformations.
*/
ONLY_UPDATE_AFTER,
/**
* Both update-before and update-after records are supported.
* Required for complex operations like joins, aggregations with retractions.
*/
BEFORE_AND_AFTER;
/**
* Returns whether this update kind supports retraction (update-before) messages.
*/
public boolean supportsRetractions() {
return this == BEFORE_AND_AFTER;
}
}Key Concepts:
Usage Example:
import org.apache.flink.table.planner.plan.trait.UpdateKind;
// Determine update kind based on operator requirements
public UpdateKind determineUpdateKind(StreamExecNode<?> node) {
// Aggregations and joins typically need retractions
if (node instanceof StreamExecGroupAggregate ||
node instanceof StreamExecJoin) {
return UpdateKind.BEFORE_AND_AFTER;
}
// Simple transformations can work with append-only
return UpdateKind.ONLY_UPDATE_AFTER;
}
// Configure changelog mode based on update kind
UpdateKind updateKind = determineUpdateKind(execNode);
ChangelogMode changelogMode;
if (updateKind.supportsRetractions()) {
changelogMode = ChangelogMode.all(); // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
} else {
changelogMode = ChangelogMode.insertOnly(); // INSERT only
}Defines the types of modification operations that can be performed on tables, essential for DML operation handling and sink compatibility.
public enum ModifyKind {
/**
* Insertion operation.
*/
INSERT,
/**
* Update operation.
*/
UPDATE,
/**
* Deletion operation.
*/
DELETE
}Usage Example:
import org.apache.flink.table.planner.plan.trait.ModifyKind;
// Validate sink compatibility with modify operations
public void validateSinkCompatibility(
DynamicTableSink sink,
Set<ModifyKind> requiredOperations
) {
for (ModifyKind operation : requiredOperations) {
switch (operation) {
case INSERT:
if (!sink.supportsInsert()) {
throw new ValidationException("Sink doesn't support INSERT operations");
}
break;
case UPDATE:
if (!sink.supportsUpdate()) {
throw new ValidationException("Sink doesn't support UPDATE operations");
}
break;
case DELETE:
if (!sink.supportsDelete()) {
throw new ValidationException("Sink doesn't support DELETE operations");
}
break;
case UPSERT:
if (!sink.supportsUpsert()) {
throw new ValidationException("Sink doesn't support UPSERT operations");
}
break;
}
}
}
// Determine required operations from SQL statement
Set<ModifyKind> operations = analyzeModifyOperations(sqlStatement);
validateSinkCompatibility(tableSink, operations);Controls mini-batch optimization for streaming operations, enabling higher throughput by batching multiple records for processing.
public enum MiniBatchMode {
/**
* An operator in ProcTime mode requires watermarks emitted in proctime interval, i.e.,
* unbounded group agg with mini-batch enabled.
*/
ProcTime,
/**
* An operator in RowTime mode requires watermarks extracted from elements, and emitted
* in rowtime interval, e.g., window, window join...
*/
RowTime,
/**
* Default value, meaning no mini-batch interval is required.
*/
None
}Benefits of Mini-batching:
Usage Example:
import org.apache.flink.table.planner.plan.trait.MiniBatchMode;
// Configure mini-batch settings for streaming operations
public void configureMiniBatch(
StreamExecNode<?> node,
MiniBatchMode mode,
Duration batchInterval
) {
if (mode.isEnabled()) {
// Enable mini-batch with specified interval
node.setMiniBatchInterval(batchInterval);
node.setMiniBatchSize(1000); // Max records per batch
// Configure buffer settings
node.enableMiniBatchBuffer();
} else {
// Disable mini-batch for low-latency processing
node.disableMiniBatch();
}
}
// Determine mini-batch mode based on requirements
MiniBatchMode mode = requiresLowLatency ?
MiniBatchMode.NONE : MiniBatchMode.ENABLED;
configureMiniBatch(streamNode, mode, Duration.ofMilliseconds(100));Categorizes different types of operators in the query execution plan, used for optimization decisions and resource allocation.
public enum OperatorType {
// Source and Sink Operators
TABLE_SOURCE_SCAN,
TABLE_SINK,
// Join Operators
HASH_JOIN,
SORT_MERGE_JOIN,
NESTED_LOOP_JOIN,
TEMPORAL_JOIN,
// Aggregation Operators
GROUP_AGGREGATE,
WINDOW_AGGREGATE,
OVER_AGGREGATE,
// Window Operators
TUMBLING_WINDOW,
SLIDING_WINDOW,
SESSION_WINDOW,
// Transformation Operators
CALC, // Projection and filtering
CORRELATE, // Table function operations
UNION, // Set union operations
SORT, // Sorting operations
LIMIT, // Top-N operations
// Exchange Operators
EXCHANGE, // Data redistribution
PARTITION_BY_HASH,
PARTITION_BY_RANGE;
/**
* Returns whether this operator performs join operations.
*/
public boolean isJoin() {
return this == HASH_JOIN || this == SORT_MERGE_JOIN ||
this == NESTED_LOOP_JOIN || this == TEMPORAL_JOIN;
}
/**
* Returns whether this operator performs aggregation.
*/
public boolean isAggregate() {
return this == GROUP_AGGREGATE || this == WINDOW_AGGREGATE ||
this == OVER_AGGREGATE;
}
/**
* Returns whether this operator handles windowing.
*/
public boolean isWindow() {
return this == TUMBLING_WINDOW || this == SLIDING_WINDOW ||
this == SESSION_WINDOW || this == WINDOW_AGGREGATE;
}
}Usage Example:
import org.apache.flink.table.planner.plan.utils.OperatorType;
// Optimize resource allocation based on operator type
public void configureOperatorResources(ExecNode<?> node, OperatorType operatorType) {
switch (operatorType) {
case HASH_JOIN:
case GROUP_AGGREGATE:
// Memory-intensive operators need more managed memory
node.setManagedMemoryFraction(0.4);
break;
case SORT:
case SORT_MERGE_JOIN:
// Sort operations benefit from spill-to-disk capability
node.enableSpilling(true);
node.setManagedMemoryFraction(0.3);
break;
case TABLE_SOURCE_SCAN:
// I/O intensive operations
node.setIOIntensive(true);
break;
default:
// Default resource allocation
node.setManagedMemoryFraction(0.1);
}
}
// Analyze operator characteristics for optimization
public OptimizationHints analyzeOperator(OperatorType operatorType) {
OptimizationHints hints = new OptimizationHints();
if (operatorType.isJoin()) {
hints.setRequiresShuffle(true);
hints.setMemoryIntensive(true);
}
if (operatorType.isAggregate()) {
hints.setSupportsPreAggregation(true);
hints.setRequiresGrouping(true);
}
if (operatorType.isWindow()) {
hints.setRequiresEventTime(true);
hints.setStateful(true);
}
return hints;
}Configuration constants used internally by the planner for query execution and optimization.
public final class InternalConfigOptions {
/**
* Query start time in epoch milliseconds.
* Used for time-based operations and temporal queries.
*/
public static final ConfigOption<Long> TABLE_QUERY_START_EPOCH_TIME =
ConfigOptions.key("table.query.start.epoch-time")
.longType()
.noDefaultValue()
.withDescription("Query start time in epoch milliseconds");
/**
* Query start time in local time zone.
* Used for local time calculations and display.
*/
public static final ConfigOption<String> TABLE_QUERY_START_LOCAL_TIME =
ConfigOptions.key("table.query.start.local-time")
.stringType()
.noDefaultValue()
.withDescription("Query start time in local time zone");
/**
* Maximum number of optimization passes.
* Controls the depth of Calcite rule-based optimization.
*/
public static final ConfigOption<Integer> TABLE_OPTIMIZER_MAX_ITERATIONS =
ConfigOptions.key("table.optimizer.max-iterations")
.intType()
.defaultValue(100)
.withDescription("Maximum number of optimizer iterations");
/**
* Whether to enable statistics-based optimization.
*/
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_STATISTICS_ENABLED =
ConfigOptions.key("table.optimizer.statistics.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Enable statistics-based optimization");
}Usage Example:
import org.apache.flink.table.planner.utils.InternalConfigOptions;
import org.apache.flink.configuration.Configuration;
// Configure planner with internal options
public void configurePlannerInternals(Configuration config) {
// Set query start time for temporal operations
long queryStartTime = System.currentTimeMillis();
config.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, queryStartTime);
// Set local time for display purposes
LocalDateTime localTime = LocalDateTime.now();
config.setString(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
localTime.toString());
// Configure optimization limits
config.setInteger(InternalConfigOptions.TABLE_OPTIMIZER_MAX_ITERATIONS, 150);
config.setBoolean(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED, true);
}
// Access internal configuration in planner
public void setupPlannerContext(PlannerConfiguration plannerConfig) {
Configuration config = plannerConfig.getConfiguration();
// Get query start time for temporal operations
Long startEpochTime = config.get(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME);
if (startEpochTime != null) {
setupTemporalQueries(startEpochTime);
}
// Configure optimizer based on settings
boolean statisticsEnabled = config.get(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED);
if (statisticsEnabled) {
enableStatisticsBasedOptimization();
}
}// Configure streaming operation with multiple traits
public class StreamOperationConfig {
private final UpdateKind updateKind;
private final ModifyKind modifyKind;
private final MiniBatchMode miniBatchMode;
private final AggregatePhaseStrategy aggregateStrategy;
public StreamOperationConfig(
UpdateKind updateKind,
ModifyKind modifyKind,
MiniBatchMode miniBatchMode,
AggregatePhaseStrategy aggregateStrategy
) {
this.updateKind = updateKind;
this.modifyKind = modifyKind;
this.miniBatchMode = miniBatchMode;
this.aggregateStrategy = aggregateStrategy;
}
public boolean requiresStateBackend() {
return updateKind.supportsRetractions() ||
aggregateStrategy.isMultiPhase();
}
public boolean supportsLowLatency() {
return !miniBatchMode.isEnabled() &&
aggregateStrategy == AggregatePhaseStrategy.ONE_PHASE;
}
}
// Create optimized configuration
StreamOperationConfig config = new StreamOperationConfig(
UpdateKind.BEFORE_AND_AFTER, // Support retractions
ModifyKind.UPSERT, // Upsert operations
MiniBatchMode.ENABLED, // Enable batching
AggregatePhaseStrategy.TWO_PHASE // Multi-phase aggregation
);// Comprehensive validation using enum combinations
public class OperationValidator {
public ValidationResult validate(
OperatorType operatorType,
UpdateKind updateKind,
ModifyKind modifyKind,
MiniBatchMode miniBatchMode
) {
List<String> errors = new ArrayList<>();
// Validate join operators
if (operatorType.isJoin() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {
errors.add("Join operators require BEFORE_AND_AFTER update kind for correctness");
}
// Validate aggregation operators
if (operatorType.isAggregate() && !updateKind.supportsRetractions()) {
errors.add("Aggregate operators need retraction support for accurate results");
}
// Validate modify operations
if (modifyKind.isModification() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {
errors.add("Update/Delete operations require retraction support");
}
// Validate mini-batch compatibility
if (miniBatchMode.isEnabled() && operatorType.isWindow()) {
errors.add("Mini-batch mode may interfere with window semantics");
}
return errors.isEmpty() ?
ValidationResult.success() :
ValidationResult.failure(errors);
}
}