or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md
tile.json

enums-constants.mddocs/

Enums and Constants

Public enums and constants provide standardized values for execution strategies, table operation traits, and configuration options. These enumerations ensure type safety and consistency across the planner's optimization and execution processes.

Package Information

import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.plan.trait.UpdateKind;
import org.apache.flink.table.planner.plan.trait.ModifyKind;
import org.apache.flink.table.planner.plan.trait.MiniBatchMode;
import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.planner.utils.InternalConfigOptions;
import org.apache.flink.configuration.ConfigOption;

Capabilities

AggregatePhaseStrategy

Defines strategies for executing aggregation operations, determining whether aggregations should be performed in single or multiple phases for optimization.

public enum AggregatePhaseStrategy {
    
    /**
     * No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage
     * aggregate depends on cost.
     */
    AUTO,
    
    /**
     * Enforce to use one stage aggregate which only has CompleteGlobalAggregate.
     */
    ONE_PHASE,
    
    /**
     * Enforce to use two stage aggregate which has localAggregate and globalAggregate.
     * NOTE: If aggregate call does not support split into two phase, still use one stage aggregate.
     */
    TWO_PHASE
}

Usage Scenarios:

  • ONE_PHASE: Small datasets, low cardinality group-by keys, memory-constrained environments
  • TWO_PHASE: Large datasets, high cardinality group-by keys, distributed processing scenarios

Usage Example:

import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;

// Configure aggregation strategy based on data characteristics
public AggregatePhaseStrategy chooseAggregationStrategy(
    long estimatedInputRows,
    int groupByCardinality
) {
    // Use two-phase for large datasets with high cardinality
    if (estimatedInputRows > 1_000_000 && groupByCardinality > 10_000) {
        return AggregatePhaseStrategy.TWO_PHASE;
    } else {
        return AggregatePhaseStrategy.ONE_PHASE;
    }
}

// Apply strategy in query planning
AggregatePhaseStrategy strategy = chooseAggregationStrategy(rowCount, cardinality);
if (strategy.isMultiPhase()) {
    // Configure multi-phase aggregation
    configurePreAggregation();
    configureFinalAggregation();
} else {
    // Configure single-phase aggregation  
    configureSinglePhaseAggregation();
}

UpdateKind

Specifies the type of updates that a streaming operator or table can handle, crucial for streaming table semantics and changelog processing.

public enum UpdateKind {
    
    /**
     * Only update-after records are supported.
     * Suitable for append-only streams and simple transformations.
     */
    ONLY_UPDATE_AFTER,
    
    /**
     * Both update-before and update-after records are supported.
     * Required for complex operations like joins, aggregations with retractions.
     */
    BEFORE_AND_AFTER;
    
    /**
     * Returns whether this update kind supports retraction (update-before) messages.
     */
    public boolean supportsRetractions() {
        return this == BEFORE_AND_AFTER;
    }
}

Key Concepts:

  • UPDATE_AFTER: Represents the new value after an update operation
  • UPDATE_BEFORE: Represents the old value before an update operation (retraction)
  • Changelog Streams: Streams that contain insert, update, and delete operations

Usage Example:

import org.apache.flink.table.planner.plan.trait.UpdateKind;

// Determine update kind based on operator requirements
public UpdateKind determineUpdateKind(StreamExecNode<?> node) {
    // Aggregations and joins typically need retractions
    if (node instanceof StreamExecGroupAggregate || 
        node instanceof StreamExecJoin) {
        return UpdateKind.BEFORE_AND_AFTER;
    }
    
    // Simple transformations can work with append-only
    return UpdateKind.ONLY_UPDATE_AFTER;
}

// Configure changelog mode based on update kind
UpdateKind updateKind = determineUpdateKind(execNode);
ChangelogMode changelogMode;

if (updateKind.supportsRetractions()) {
    changelogMode = ChangelogMode.all(); // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
} else {
    changelogMode = ChangelogMode.insertOnly(); // INSERT only
}

ModifyKind

Defines the types of modification operations that can be performed on tables, essential for DML operation handling and sink compatibility.

public enum ModifyKind {
    
    /**
     * Insertion operation.
     */
    INSERT,
    
    /**
     * Update operation.
     */
    UPDATE,
    
    /**
     * Deletion operation.
     */
    DELETE
}

Usage Example:

import org.apache.flink.table.planner.plan.trait.ModifyKind;

// Validate sink compatibility with modify operations
public void validateSinkCompatibility(
    DynamicTableSink sink, 
    Set<ModifyKind> requiredOperations
) {
    for (ModifyKind operation : requiredOperations) {
        switch (operation) {
            case INSERT:
                if (!sink.supportsInsert()) {
                    throw new ValidationException("Sink doesn't support INSERT operations");
                }
                break;
            case UPDATE:
                if (!sink.supportsUpdate()) {
                    throw new ValidationException("Sink doesn't support UPDATE operations");  
                }
                break;
            case DELETE:
                if (!sink.supportsDelete()) {
                    throw new ValidationException("Sink doesn't support DELETE operations");
                }
                break;
            case UPSERT:
                if (!sink.supportsUpsert()) {
                    throw new ValidationException("Sink doesn't support UPSERT operations");
                }
                break;
        }
    }
}

// Determine required operations from SQL statement
Set<ModifyKind> operations = analyzeModifyOperations(sqlStatement);
validateSinkCompatibility(tableSink, operations);

MiniBatchMode

Controls mini-batch optimization for streaming operations, enabling higher throughput by batching multiple records for processing.

public enum MiniBatchMode {
    
    /**
     * An operator in ProcTime mode requires watermarks emitted in proctime interval, i.e.,
     * unbounded group agg with mini-batch enabled.
     */
    ProcTime,
    
    /**
     * An operator in RowTime mode requires watermarks extracted from elements, and emitted
     * in rowtime interval, e.g., window, window join...
     */
    RowTime,
    
    /**
     * Default value, meaning no mini-batch interval is required.
     */
    None
}

Benefits of Mini-batching:

  • Higher Throughput: Reduces per-record processing overhead
  • Better Resource Utilization: Amortizes fixed costs across multiple records
  • Improved Latency Control: Configurable batch sizes and timeouts

Usage Example:

import org.apache.flink.table.planner.plan.trait.MiniBatchMode;

// Configure mini-batch settings for streaming operations
public void configureMiniBatch(
    StreamExecNode<?> node, 
    MiniBatchMode mode,
    Duration batchInterval
) {
    if (mode.isEnabled()) {
        // Enable mini-batch with specified interval
        node.setMiniBatchInterval(batchInterval);
        node.setMiniBatchSize(1000); // Max records per batch
        
        // Configure buffer settings
        node.enableMiniBatchBuffer();
    } else {
        // Disable mini-batch for low-latency processing
        node.disableMiniBatch();
    }
}

// Determine mini-batch mode based on requirements  
MiniBatchMode mode = requiresLowLatency ? 
    MiniBatchMode.NONE : MiniBatchMode.ENABLED;

configureMiniBatch(streamNode, mode, Duration.ofMilliseconds(100));

OperatorType

Categorizes different types of operators in the query execution plan, used for optimization decisions and resource allocation.

public enum OperatorType {
    
    // Source and Sink Operators
    TABLE_SOURCE_SCAN,
    TABLE_SINK,
    
    // Join Operators  
    HASH_JOIN,
    SORT_MERGE_JOIN,
    NESTED_LOOP_JOIN,
    TEMPORAL_JOIN,
    
    // Aggregation Operators
    GROUP_AGGREGATE,
    WINDOW_AGGREGATE,
    OVER_AGGREGATE,
    
    // Window Operators
    TUMBLING_WINDOW,
    SLIDING_WINDOW, 
    SESSION_WINDOW,
    
    // Transformation Operators
    CALC,           // Projection and filtering
    CORRELATE,      // Table function operations
    UNION,          // Set union operations
    SORT,           // Sorting operations
    LIMIT,          // Top-N operations
    
    // Exchange Operators
    EXCHANGE,       // Data redistribution
    PARTITION_BY_HASH,
    PARTITION_BY_RANGE;
    
    /**
     * Returns whether this operator performs join operations.
     */
    public boolean isJoin() {
        return this == HASH_JOIN || this == SORT_MERGE_JOIN || 
               this == NESTED_LOOP_JOIN || this == TEMPORAL_JOIN;
    }
    
    /**
     * Returns whether this operator performs aggregation.
     */
    public boolean isAggregate() {
        return this == GROUP_AGGREGATE || this == WINDOW_AGGREGATE || 
               this == OVER_AGGREGATE;
    }
    
    /**
     * Returns whether this operator handles windowing.
     */
    public boolean isWindow() {
        return this == TUMBLING_WINDOW || this == SLIDING_WINDOW || 
               this == SESSION_WINDOW || this == WINDOW_AGGREGATE;
    }
}

Usage Example:

import org.apache.flink.table.planner.plan.utils.OperatorType;

// Optimize resource allocation based on operator type
public void configureOperatorResources(ExecNode<?> node, OperatorType operatorType) {
    switch (operatorType) {
        case HASH_JOIN:
        case GROUP_AGGREGATE:
            // Memory-intensive operators need more managed memory
            node.setManagedMemoryFraction(0.4);
            break;
            
        case SORT:
        case SORT_MERGE_JOIN:
            // Sort operations benefit from spill-to-disk capability
            node.enableSpilling(true);
            node.setManagedMemoryFraction(0.3);
            break;
            
        case TABLE_SOURCE_SCAN:
            // I/O intensive operations
            node.setIOIntensive(true);
            break;
            
        default:
            // Default resource allocation
            node.setManagedMemoryFraction(0.1);
    }
}

// Analyze operator characteristics for optimization
public OptimizationHints analyzeOperator(OperatorType operatorType) {
    OptimizationHints hints = new OptimizationHints();
    
    if (operatorType.isJoin()) {
        hints.setRequiresShuffle(true);
        hints.setMemoryIntensive(true);
    }
    
    if (operatorType.isAggregate()) {
        hints.setSupportsPreAggregation(true);
        hints.setRequiresGrouping(true);
    }
    
    if (operatorType.isWindow()) {
        hints.setRequiresEventTime(true);
        hints.setStateful(true);
    }
    
    return hints;
}

Internal Configuration Options

InternalConfigOptions

Configuration constants used internally by the planner for query execution and optimization.

public final class InternalConfigOptions {
    
    /**
     * Query start time in epoch milliseconds.
     * Used for time-based operations and temporal queries.
     */
    public static final ConfigOption<Long> TABLE_QUERY_START_EPOCH_TIME = 
        ConfigOptions.key("table.query.start.epoch-time")
            .longType()
            .noDefaultValue()
            .withDescription("Query start time in epoch milliseconds");
    
    /**
     * Query start time in local time zone.
     * Used for local time calculations and display.
     */  
    public static final ConfigOption<String> TABLE_QUERY_START_LOCAL_TIME =
        ConfigOptions.key("table.query.start.local-time")
            .stringType()
            .noDefaultValue()
            .withDescription("Query start time in local time zone");
    
    /**
     * Maximum number of optimization passes.
     * Controls the depth of Calcite rule-based optimization.
     */
    public static final ConfigOption<Integer> TABLE_OPTIMIZER_MAX_ITERATIONS =
        ConfigOptions.key("table.optimizer.max-iterations")
            .intType()
            .defaultValue(100)
            .withDescription("Maximum number of optimizer iterations");
    
    /**
     * Whether to enable statistics-based optimization.
     */
    public static final ConfigOption<Boolean> TABLE_OPTIMIZER_STATISTICS_ENABLED =
        ConfigOptions.key("table.optimizer.statistics.enabled")
            .booleanType()
            .defaultValue(true)
            .withDescription("Enable statistics-based optimization");
}

Usage Example:

import org.apache.flink.table.planner.utils.InternalConfigOptions;
import org.apache.flink.configuration.Configuration;

// Configure planner with internal options
public void configurePlannerInternals(Configuration config) {
    // Set query start time for temporal operations
    long queryStartTime = System.currentTimeMillis();
    config.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, queryStartTime);
    
    // Set local time for display purposes
    LocalDateTime localTime = LocalDateTime.now();
    config.setString(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME, 
                    localTime.toString());
    
    // Configure optimization limits
    config.setInteger(InternalConfigOptions.TABLE_OPTIMIZER_MAX_ITERATIONS, 150);
    config.setBoolean(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED, true);
}

// Access internal configuration in planner
public void setupPlannerContext(PlannerConfiguration plannerConfig) {
    Configuration config = plannerConfig.getConfiguration();
    
    // Get query start time for temporal operations
    Long startEpochTime = config.get(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME);
    if (startEpochTime != null) {
        setupTemporalQueries(startEpochTime);
    }
    
    // Configure optimizer based on settings
    boolean statisticsEnabled = config.get(InternalConfigOptions.TABLE_OPTIMIZER_STATISTICS_ENABLED);
    if (statisticsEnabled) {
        enableStatisticsBasedOptimization();
    }
}

Enum Combination Patterns

Comprehensive Operation Configuration

// Configure streaming operation with multiple traits
public class StreamOperationConfig {
    private final UpdateKind updateKind;
    private final ModifyKind modifyKind;
    private final MiniBatchMode miniBatchMode;
    private final AggregatePhaseStrategy aggregateStrategy;
    
    public StreamOperationConfig(
        UpdateKind updateKind,
        ModifyKind modifyKind, 
        MiniBatchMode miniBatchMode,
        AggregatePhaseStrategy aggregateStrategy
    ) {
        this.updateKind = updateKind;
        this.modifyKind = modifyKind;
        this.miniBatchMode = miniBatchMode;
        this.aggregateStrategy = aggregateStrategy;
    }
    
    public boolean requiresStateBackend() {
        return updateKind.supportsRetractions() || 
               aggregateStrategy.isMultiPhase();
    }
    
    public boolean supportsLowLatency() {
        return !miniBatchMode.isEnabled() && 
               aggregateStrategy == AggregatePhaseStrategy.ONE_PHASE;
    }
}

// Create optimized configuration
StreamOperationConfig config = new StreamOperationConfig(
    UpdateKind.BEFORE_AND_AFTER,      // Support retractions
    ModifyKind.UPSERT,                // Upsert operations  
    MiniBatchMode.ENABLED,            // Enable batching
    AggregatePhaseStrategy.TWO_PHASE  // Multi-phase aggregation
);

Validation Using Enums

// Comprehensive validation using enum combinations
public class OperationValidator {
    
    public ValidationResult validate(
        OperatorType operatorType,
        UpdateKind updateKind,
        ModifyKind modifyKind,
        MiniBatchMode miniBatchMode
    ) {
        List<String> errors = new ArrayList<>();
        
        // Validate join operators  
        if (operatorType.isJoin() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {
            errors.add("Join operators require BEFORE_AND_AFTER update kind for correctness");
        }
        
        // Validate aggregation operators
        if (operatorType.isAggregate() && !updateKind.supportsRetractions()) {
            errors.add("Aggregate operators need retraction support for accurate results");
        }
        
        // Validate modify operations
        if (modifyKind.isModification() && updateKind == UpdateKind.ONLY_UPDATE_AFTER) {
            errors.add("Update/Delete operations require retraction support");
        }
        
        // Validate mini-batch compatibility
        if (miniBatchMode.isEnabled() && operatorType.isWindow()) {
            errors.add("Mini-batch mode may interfere with window semantics");
        }
        
        return errors.isEmpty() ? 
            ValidationResult.success() : 
            ValidationResult.failure(errors);
    }
}