Configuration classes provide sophisticated control over Calcite behavior and planner settings. The configuration system allows customization of SQL parsing, query optimization, and operator handling through a builder pattern API.
import org.apache.flink.table.planner.calcite.{CalciteConfig, CalciteConfigBuilder}
import org.apache.flink.table.planner.delegation.PlannerConfiguration
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.config.CalciteConnectionConfigimport org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.planner.delegation.PlannerConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.Configuration;Core configuration trait for customizing Apache Calcite behavior within the Flink planner.
trait CalciteConfig {
// SQL Parser Configuration
def getSqlParserConfig: Option[SqlParser.Config]
// SQL Operator Tables
def getSqlOperatorTable: Option[SqlOperatorTable]
// Optimization Programs
def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]
def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]
// Connection Configuration
def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]
def getCalciteConnectionConfig: Option[CalciteConnectionConfig]
}The CalciteConfig trait provides access to all major customization points in the Calcite integration:
Factory methods and builder for creating CalciteConfig instances.
object CalciteConfig {
def createBuilder(): CalciteConfigBuilder
def createBuilder(calciteConfig: CalciteConfig): CalciteConfigBuilder
}
class CalciteConfigBuilder {
// SQL Parser Customization
def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder
// SQL Operator Table Customization
def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
// Optimization Program Customization
def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder
def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder
// Converter Configuration
def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): CalciteConfigBuilder
def replaceCalciteConnectionConfig(config: CalciteConnectionConfig): CalciteConfigBuilder
// Build final configuration
def build(): CalciteConfig
}Key Builder Methods:
replaceSqlParserConfig(...): Completely replaces the default parser configurationaddSqlOperatorTable(...): Adds custom operators to the built-in operator tablereplaceSqlOperatorTable(...): Completely replaces the built-in operator tablereplaceBatchProgram(...): Replaces the default batch optimization programreplaceStreamProgram(...): Replaces the default stream optimization programUsage Example:
import org.apache.flink.table.planner.calcite.CalciteConfig
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.fun.SqlStdOperatorTable
// Create custom parser configuration
val parserConfig = SqlParser.Config.DEFAULT
.withLex(Lex.MYSQL)
.withIdentifierMaxLength(256)
// Create custom operator table
val customOperatorTable = // your custom operator table
// Build CalciteConfig with customizations
val calciteConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(parserConfig)
.addSqlOperatorTable(customOperatorTable)
.replaceBatchProgram(myCustomBatchProgram)
.build()
// Use with table environment
val tableConfig = new TableConfig()
tableConfig.setPlannerConfig(calciteConfig)Unified configuration access for the planner module, implementing Flink's configuration system.
public final class PlannerConfiguration implements ReadableConfig {
// Constructor
public PlannerConfiguration(
Configuration configuration,
ClassLoader classLoader,
ModuleManager moduleManager,
CatalogManager catalogManager,
FunctionCatalog functionCatalog
);
// Configuration access methods
public <T> T get(ConfigOption<T> option);
public <T> Optional<T> getOptional(ConfigOption<T> option);
public Configuration getConfiguration();
// Component access methods
public ClassLoader getClassLoader();
public ModuleManager getModuleManager();
public CatalogManager getCatalogManager();
public FunctionCatalog getFunctionCatalog();
}Key Features:
ConfigOption<T>Usage Example:
import org.apache.flink.table.planner.delegation.PlannerConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.config.TableConfigOptions;
// Create configuration
Configuration config = new Configuration();
config.setString(TableConfigOptions.TABLE_SQL_DIALECT, "hive");
// Create planner configuration
PlannerConfiguration plannerConfig = new PlannerConfiguration(
config,
classLoader,
moduleManager,
catalogManager,
functionCatalog
);
// Access configuration values
String sqlDialect = plannerConfig.get(TableConfigOptions.TABLE_SQL_DIALECT);
Optional<Duration> idleTimeout = plannerConfig.getOptional(TableConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
// Access components
CatalogManager catalogManager = plannerConfig.getCatalogManager();
FunctionCatalog functionCatalog = plannerConfig.getFunctionCatalog();import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.avatica.util.Casing
import org.apache.calcite.sql.parser.SqlParser.Config
// Configure parser for different SQL dialects
val mysqlParserConfig = SqlParser.Config.DEFAULT
.withLex(Lex.MYSQL)
.withUnquotedCasing(Casing.UNCHANGED)
.withQuotedCasing(Casing.UNCHANGED)
.withCaseSensitive(false)
val calciteConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(mysqlParserConfig)
.build()import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
// Create custom operator table with additional functions
val customOperatorTable = ChainedSqlOperatorTable.of(
SqlStdOperatorTable.instance(),
myCustomFunctions
)
val calciteConfig = CalciteConfig.createBuilder()
.addSqlOperatorTable(customOperatorTable)
.build()import org.apache.flink.table.planner.plan.optimize.program._
// Create custom optimization program for batch processing
val customBatchProgram = FlinkBatchProgram.buildProgram(
// Add custom optimization rules
Seq(
FlinkBatchProgram.OPTIMIZE_REWRITE,
FlinkBatchProgram.OPTIMIZE_JOIN_REORDER,
myCustomOptimizationRules
)
)
val calciteConfig = CalciteConfig.createBuilder()
.replaceBatchProgram(customBatchProgram)
.build()The configuration integrates with Flink's TableConfig for global table environment settings:
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.calcite.CalciteConfig;
// Set planner configuration on table config
TableConfig tableConfig = new TableConfig();
CalciteConfig calciteConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(customParserConfig)
.build();
tableConfig.setPlannerConfig(calciteConfig);
// Use with table environment
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.withConfiguration(tableConfig.getConfiguration())
.build()
);// Build configuration once and reuse
val baseConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(standardParserConfig)
.build()
// Create variations from base configuration
val batchConfig = CalciteConfig.createBuilder(baseConfig)
.replaceBatchProgram(optimizedBatchProgram)
.build()
val streamConfig = CalciteConfig.createBuilder(baseConfig)
.replaceStreamProgram(optimizedStreamProgram)
.build()val configBuilder = CalciteConfig.createBuilder()
// Add configurations conditionally
if (enableCustomFunctions) {
configBuilder.addSqlOperatorTable(myCustomFunctions)
}
if (useOptimizedBatchRules) {
configBuilder.replaceBatchProgram(optimizedBatchProgram)
}
val calciteConfig = configBuilder.build()// Development configuration
CalciteConfig devConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(lenientParserConfig) // More permissive parsing
.build();
// Production configuration
CalciteConfig prodConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(strictParserConfig) // Strict parsing
.replaceBatchProgram(highlyOptimizedProgram) // Aggressive optimization
.build();
// Use appropriate config based on environment
CalciteConfig config = isProduction ? prodConfig : devConfig;The configuration system includes validation to ensure consistency:
try {
CalciteConfig config = CalciteConfig.createBuilder()
.replaceSqlParserConfig(parserConfig)
.build();
} catch (IllegalArgumentException e) {
// Handle invalid configuration
logger.error("Invalid parser configuration: " + e.getMessage());
}Configuration validation covers: