Flink Table Planner connects Table/SQL API and runtime, responsible for translating and optimizing table programs into Flink pipelines.
Configuration classes provide sophisticated control over Calcite behavior and planner settings. The configuration system allows customization of SQL parsing, query optimization, and operator handling through a builder pattern API.
import org.apache.flink.table.planner.calcite.{CalciteConfig, CalciteConfigBuilder}
import org.apache.flink.table.planner.delegation.PlannerConfiguration
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.config.CalciteConnectionConfigimport org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.planner.delegation.PlannerConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.Configuration;Core configuration trait for customizing Apache Calcite behavior within the Flink planner.
trait CalciteConfig {
// SQL Parser Configuration
def getSqlParserConfig: Option[SqlParser.Config]
// SQL Operator Tables
def getSqlOperatorTable: Option[SqlOperatorTable]
// Optimization Programs
def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]
def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]
// Connection Configuration
def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]
def getCalciteConnectionConfig: Option[CalciteConnectionConfig]
}The CalciteConfig trait provides access to all major customization points in the Calcite integration:
Factory methods and builder for creating CalciteConfig instances.
object CalciteConfig {
def createBuilder(): CalciteConfigBuilder
def createBuilder(calciteConfig: CalciteConfig): CalciteConfigBuilder
}
class CalciteConfigBuilder {
// SQL Parser Customization
def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder
// SQL Operator Table Customization
def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
// Optimization Program Customization
def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder
def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder
// Converter Configuration
def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): CalciteConfigBuilder
def replaceCalciteConnectionConfig(config: CalciteConnectionConfig): CalciteConfigBuilder
// Build final configuration
def build(): CalciteConfig
}Key Builder Methods:
replaceSqlParserConfig(...): Completely replaces the default parser configurationaddSqlOperatorTable(...): Adds custom operators to the built-in operator tablereplaceSqlOperatorTable(...): Completely replaces the built-in operator tablereplaceBatchProgram(...): Replaces the default batch optimization programreplaceStreamProgram(...): Replaces the default stream optimization programUsage Example:
import org.apache.flink.table.planner.calcite.CalciteConfig
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.fun.SqlStdOperatorTable
// Create custom parser configuration
val parserConfig = SqlParser.Config.DEFAULT
.withLex(Lex.MYSQL)
.withIdentifierMaxLength(256)
// Create custom operator table
val customOperatorTable = // your custom operator table
// Build CalciteConfig with customizations
val calciteConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(parserConfig)
.addSqlOperatorTable(customOperatorTable)
.replaceBatchProgram(myCustomBatchProgram)
.build()
// Use with table environment
val tableConfig = new TableConfig()
tableConfig.setPlannerConfig(calciteConfig)Unified configuration access for the planner module, implementing Flink's configuration system.
public final class PlannerConfiguration implements ReadableConfig {
// Constructor
public PlannerConfiguration(
Configuration configuration,
ClassLoader classLoader,
ModuleManager moduleManager,
CatalogManager catalogManager,
FunctionCatalog functionCatalog
);
// Configuration access methods
public <T> T get(ConfigOption<T> option);
public <T> Optional<T> getOptional(ConfigOption<T> option);
public Configuration getConfiguration();
// Component access methods
public ClassLoader getClassLoader();
public ModuleManager getModuleManager();
public CatalogManager getCatalogManager();
public FunctionCatalog getFunctionCatalog();
}Key Features:
ConfigOption<T>Usage Example:
import org.apache.flink.table.planner.delegation.PlannerConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.config.TableConfigOptions;
// Create configuration
Configuration config = new Configuration();
config.setString(TableConfigOptions.TABLE_SQL_DIALECT, "hive");
// Create planner configuration
PlannerConfiguration plannerConfig = new PlannerConfiguration(
config,
classLoader,
moduleManager,
catalogManager,
functionCatalog
);
// Access configuration values
String sqlDialect = plannerConfig.get(TableConfigOptions.TABLE_SQL_DIALECT);
Optional<Duration> idleTimeout = plannerConfig.getOptional(TableConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
// Access components
CatalogManager catalogManager = plannerConfig.getCatalogManager();
FunctionCatalog functionCatalog = plannerConfig.getFunctionCatalog();import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.avatica.util.Casing
import org.apache.calcite.sql.parser.SqlParser.Config
// Configure parser for different SQL dialects
val mysqlParserConfig = SqlParser.Config.DEFAULT
.withLex(Lex.MYSQL)
.withUnquotedCasing(Casing.UNCHANGED)
.withQuotedCasing(Casing.UNCHANGED)
.withCaseSensitive(false)
val calciteConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(mysqlParserConfig)
.build()import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
// Create custom operator table with additional functions
val customOperatorTable = ChainedSqlOperatorTable.of(
SqlStdOperatorTable.instance(),
myCustomFunctions
)
val calciteConfig = CalciteConfig.createBuilder()
.addSqlOperatorTable(customOperatorTable)
.build()import org.apache.flink.table.planner.plan.optimize.program._
// Create custom optimization program for batch processing
val customBatchProgram = FlinkBatchProgram.buildProgram(
// Add custom optimization rules
Seq(
FlinkBatchProgram.OPTIMIZE_REWRITE,
FlinkBatchProgram.OPTIMIZE_JOIN_REORDER,
myCustomOptimizationRules
)
)
val calciteConfig = CalciteConfig.createBuilder()
.replaceBatchProgram(customBatchProgram)
.build()The configuration integrates with Flink's TableConfig for global table environment settings:
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.calcite.CalciteConfig;
// Set planner configuration on table config
TableConfig tableConfig = new TableConfig();
CalciteConfig calciteConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(customParserConfig)
.build();
tableConfig.setPlannerConfig(calciteConfig);
// Use with table environment
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.withConfiguration(tableConfig.getConfiguration())
.build()
);// Build configuration once and reuse
val baseConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(standardParserConfig)
.build()
// Create variations from base configuration
val batchConfig = CalciteConfig.createBuilder(baseConfig)
.replaceBatchProgram(optimizedBatchProgram)
.build()
val streamConfig = CalciteConfig.createBuilder(baseConfig)
.replaceStreamProgram(optimizedStreamProgram)
.build()val configBuilder = CalciteConfig.createBuilder()
// Add configurations conditionally
if (enableCustomFunctions) {
configBuilder.addSqlOperatorTable(myCustomFunctions)
}
if (useOptimizedBatchRules) {
configBuilder.replaceBatchProgram(optimizedBatchProgram)
}
val calciteConfig = configBuilder.build()// Development configuration
CalciteConfig devConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(lenientParserConfig) // More permissive parsing
.build();
// Production configuration
CalciteConfig prodConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(strictParserConfig) // Strict parsing
.replaceBatchProgram(highlyOptimizedProgram) // Aggressive optimization
.build();
// Use appropriate config based on environment
CalciteConfig config = isProduction ? prodConfig : devConfig;The configuration system includes validation to ensure consistency:
try {
CalciteConfig config = CalciteConfig.createBuilder()
.replaceSqlParserConfig(parserConfig)
.build();
} catch (IllegalArgumentException e) {
// Handle invalid configuration
logger.error("Invalid parser configuration: " + e.getMessage());
}Configuration validation covers:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-planner-2-11