or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md
tile.json

configuration.mddocs/

Configuration and Builders

Configuration classes provide sophisticated control over Calcite behavior and planner settings. The configuration system allows customization of SQL parsing, query optimization, and operator handling through a builder pattern API.

Package Information

import org.apache.flink.table.planner.calcite.{CalciteConfig, CalciteConfigBuilder}
import org.apache.flink.table.planner.delegation.PlannerConfiguration
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.config.CalciteConnectionConfig
import org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.planner.delegation.PlannerConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.Configuration;

Capabilities

CalciteConfig

Core configuration trait for customizing Apache Calcite behavior within the Flink planner.

trait CalciteConfig {
  
  // SQL Parser Configuration
  def getSqlParserConfig: Option[SqlParser.Config]
  
  // SQL Operator Tables
  def getSqlOperatorTable: Option[SqlOperatorTable]
  
  // Optimization Programs
  def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]
  def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]
  
  // Connection Configuration
  def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]
  def getCalciteConnectionConfig: Option[CalciteConnectionConfig]
}

The CalciteConfig trait provides access to all major customization points in the Calcite integration:

  • Parser Configuration: Customize SQL dialect, keywords, and parsing behavior
  • Operator Tables: Add custom functions and operators beyond built-in ones
  • Optimization Programs: Define custom optimization rule sequences for batch and streaming
  • Converter Configuration: Control SQL-to-relational algebra conversion settings

CalciteConfig Builder

Factory methods and builder for creating CalciteConfig instances.

object CalciteConfig {
  def createBuilder(): CalciteConfigBuilder
  def createBuilder(calciteConfig: CalciteConfig): CalciteConfigBuilder
}

class CalciteConfigBuilder {
  
  // SQL Parser Customization
  def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder
  
  // SQL Operator Table Customization  
  def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
  def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
  
  // Optimization Program Customization
  def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder
  def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder
  
  // Converter Configuration
  def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): CalciteConfigBuilder
  def replaceCalciteConnectionConfig(config: CalciteConnectionConfig): CalciteConfigBuilder
  
  // Build final configuration
  def build(): CalciteConfig
}

Key Builder Methods:

  • replaceSqlParserConfig(...): Completely replaces the default parser configuration
  • addSqlOperatorTable(...): Adds custom operators to the built-in operator table
  • replaceSqlOperatorTable(...): Completely replaces the built-in operator table
  • replaceBatchProgram(...): Replaces the default batch optimization program
  • replaceStreamProgram(...): Replaces the default stream optimization program

Usage Example:

import org.apache.flink.table.planner.calcite.CalciteConfig
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.fun.SqlStdOperatorTable

// Create custom parser configuration
val parserConfig = SqlParser.Config.DEFAULT
  .withLex(Lex.MYSQL)
  .withIdentifierMaxLength(256)

// Create custom operator table
val customOperatorTable = // your custom operator table

// Build CalciteConfig with customizations
val calciteConfig = CalciteConfig.createBuilder()
  .replaceSqlParserConfig(parserConfig)
  .addSqlOperatorTable(customOperatorTable)
  .replaceBatchProgram(myCustomBatchProgram)
  .build()

// Use with table environment
val tableConfig = new TableConfig()
tableConfig.setPlannerConfig(calciteConfig)

PlannerConfiguration

Unified configuration access for the planner module, implementing Flink's configuration system.

public final class PlannerConfiguration implements ReadableConfig {
    
    // Constructor
    public PlannerConfiguration(
        Configuration configuration,
        ClassLoader classLoader,
        ModuleManager moduleManager,
        CatalogManager catalogManager,
        FunctionCatalog functionCatalog
    );
    
    // Configuration access methods
    public <T> T get(ConfigOption<T> option);
    public <T> Optional<T> getOptional(ConfigOption<T> option);
    public Configuration getConfiguration();
    
    // Component access methods  
    public ClassLoader getClassLoader();
    public ModuleManager getModuleManager();
    public CatalogManager getCatalogManager();
    public FunctionCatalog getFunctionCatalog();
}

Key Features:

  • Unified Access: Single point of access for all planner configuration
  • Type Safety: Strongly typed configuration options through ConfigOption<T>
  • Component Integration: Direct access to catalog, modules, and function registry
  • Immutable Design: Configuration objects are immutable after creation

Usage Example:

import org.apache.flink.table.planner.delegation.PlannerConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.config.TableConfigOptions;

// Create configuration
Configuration config = new Configuration();
config.setString(TableConfigOptions.TABLE_SQL_DIALECT, "hive");

// Create planner configuration  
PlannerConfiguration plannerConfig = new PlannerConfiguration(
    config,
    classLoader,
    moduleManager, 
    catalogManager,
    functionCatalog
);

// Access configuration values
String sqlDialect = plannerConfig.get(TableConfigOptions.TABLE_SQL_DIALECT);
Optional<Duration> idleTimeout = plannerConfig.getOptional(TableConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);

// Access components
CatalogManager catalogManager = plannerConfig.getCatalogManager();
FunctionCatalog functionCatalog = plannerConfig.getFunctionCatalog();

Configuration Examples

Custom SQL Parser Configuration

import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.avatica.util.Casing
import org.apache.calcite.sql.parser.SqlParser.Config

// Configure parser for different SQL dialects
val mysqlParserConfig = SqlParser.Config.DEFAULT
  .withLex(Lex.MYSQL)
  .withUnquotedCasing(Casing.UNCHANGED)
  .withQuotedCasing(Casing.UNCHANGED)
  .withCaseSensitive(false)

val calciteConfig = CalciteConfig.createBuilder()
  .replaceSqlParserConfig(mysqlParserConfig)
  .build()

Custom Function Registration

import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.util.ChainedSqlOperatorTable

// Create custom operator table with additional functions
val customOperatorTable = ChainedSqlOperatorTable.of(
  SqlStdOperatorTable.instance(),
  myCustomFunctions
)

val calciteConfig = CalciteConfig.createBuilder()
  .addSqlOperatorTable(customOperatorTable)
  .build()

Custom Optimization Programs

import org.apache.flink.table.planner.plan.optimize.program._

// Create custom optimization program for batch processing
val customBatchProgram = FlinkBatchProgram.buildProgram(
  // Add custom optimization rules
  Seq(
    FlinkBatchProgram.OPTIMIZE_REWRITE,
    FlinkBatchProgram.OPTIMIZE_JOIN_REORDER,
    myCustomOptimizationRules
  )
)

val calciteConfig = CalciteConfig.createBuilder()
  .replaceBatchProgram(customBatchProgram)
  .build()

Integration with TableConfig

The configuration integrates with Flink's TableConfig for global table environment settings:

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.calcite.CalciteConfig;

// Set planner configuration on table config
TableConfig tableConfig = new TableConfig();
CalciteConfig calciteConfig = CalciteConfig.createBuilder()
    .replaceSqlParserConfig(customParserConfig)
    .build();

tableConfig.setPlannerConfig(calciteConfig);

// Use with table environment
TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance()
        .withConfiguration(tableConfig.getConfiguration())
        .build()
);

Configuration Best Practices

Immutable Configuration Pattern

// Build configuration once and reuse
val baseConfig = CalciteConfig.createBuilder()
  .replaceSqlParserConfig(standardParserConfig)
  .build()

// Create variations from base configuration  
val batchConfig = CalciteConfig.createBuilder(baseConfig)
  .replaceBatchProgram(optimizedBatchProgram)
  .build()

val streamConfig = CalciteConfig.createBuilder(baseConfig)
  .replaceStreamProgram(optimizedStreamProgram) 
  .build()

Conditional Configuration

val configBuilder = CalciteConfig.createBuilder()

// Add configurations conditionally
if (enableCustomFunctions) {
  configBuilder.addSqlOperatorTable(myCustomFunctions)
}

if (useOptimizedBatchRules) {
  configBuilder.replaceBatchProgram(optimizedBatchProgram)
}

val calciteConfig = configBuilder.build()

Environment-Specific Configuration

// Development configuration
CalciteConfig devConfig = CalciteConfig.createBuilder()
    .replaceSqlParserConfig(lenientParserConfig)  // More permissive parsing
    .build();

// Production configuration  
CalciteConfig prodConfig = CalciteConfig.createBuilder()
    .replaceSqlParserConfig(strictParserConfig)   // Strict parsing
    .replaceBatchProgram(highlyOptimizedProgram)  // Aggressive optimization
    .build();

// Use appropriate config based on environment
CalciteConfig config = isProduction ? prodConfig : devConfig;

Configuration Validation

The configuration system includes validation to ensure consistency:

try {
    CalciteConfig config = CalciteConfig.createBuilder()
        .replaceSqlParserConfig(parserConfig)
        .build();
} catch (IllegalArgumentException e) {
    // Handle invalid configuration
    logger.error("Invalid parser configuration: " + e.getMessage());
}

Configuration validation covers:

  • Parser Compatibility: Ensuring parser config matches expected SQL dialect
  • Operator Consistency: Validating custom operators are compatible
  • Program Validity: Checking optimization programs are well-formed
  • Resource Limits: Ensuring configuration values are within acceptable ranges