Core planner implementations handle the translation and optimization of table programs into Flink execution plans. The planning system supports both streaming and batch processing modes with sophisticated SQL parsing capabilities.
import org.apache.flink.table.planner.delegation.{PlannerBase, StreamPlanner, BatchPlanner}
import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.catalog.UnresolvedIdentifier;The foundational abstract class that provides common functionality for both streaming and batch planners.
abstract class PlannerBase extends Planner {
def getTableEnvironment: TableEnvironment
def getFlinkRelBuilder: FlinkRelBuilder
def getTypeFactory: FlinkTypeFactory
def getExecEnv: StreamExecutionEnvironment
def getTableConfig: TableConfig
def getFlinkContext: FlinkContext
def getCatalogManager: CatalogManager
def getFunctionCatalog: FunctionCatalog
// Core planning methods
def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
def getCompletionHints(statement: String, position: Int): Array[String]
def getParser: Parser
}Key Responsibilities:
PlannerContext, PlannerConfiguration)FlinkRelBuilder, FlinkTypeFactory)Usage Example:
// PlannerBase is typically accessed through concrete implementations
val planner: PlannerBase = // obtained from factory
// Access core components
val tableEnv = planner.getTableEnvironment
val relBuilder = planner.getFlinkRelBuilder
val typeFactory = planner.getTypeFactory
// Translate operations to transformations
val operations: util.List[ModifyOperation] = // parsed operations
val transformations = planner.translate(operations)
// Explain query plans
val explanation = planner.explain(operations, ExplainDetail.ESTIMATED_COST)Concrete planner implementation specialized for streaming workloads.
class StreamPlanner(
executor: Executor,
config: TableConfig,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager,
isStreamingMode: Boolean
) extends PlannerBase {
// Streaming-specific optimization and translation
override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
// Streaming-specific explain functionality
override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
}Key Features:
Usage Example:
// StreamPlanner is created through DefaultPlannerFactory for streaming mode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// The planner is internal to the table environment
val streamPlanner = // accessed internally
// Process streaming operations
val operations = // parsed streaming operations
val transformations = streamPlanner.translate(operations)Concrete planner implementation specialized for batch workloads.
class BatchPlanner(
executor: Executor,
config: TableConfig,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager
) extends PlannerBase {
// Batch-specific optimization and translation
override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
// Batch-specific explain functionality
override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
}Key Features:
Usage Example:
// BatchPlanner is created through DefaultPlannerFactory for batch mode
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.inBatchMode()
.build()
)
// The planner is internal to the table environment
val batchPlanner = // accessed internally
// Process batch operations
val operations = // parsed batch operations
val transformations = batchPlanner.translate(operations)Default SQL parser implementation using Apache Calcite for parsing SQL statements, identifiers, and expressions.
public class ParserImpl implements Parser {
// Constructor
public ParserImpl(
CatalogManager catalogManager,
Supplier<FlinkPlannerImpl> validatorSupplier,
Supplier<CalciteParser> calciteParserSupplier,
RexNodeToExpressionConverter rexNodeToExpressionConverter
);
// Core parsing methods
public List<Operation> parse(String statement);
public UnresolvedIdentifier parseIdentifier(String identifier);
public ResolvedExpression parseSqlExpression(
String sqlExpression,
RowType inputRowType,
LogicalType outputType
);
// Completion and validation
public String[] getCompletionHints(String statement, int position);
}Key Methods:
parse(String statement): Parses SQL statements into executable operations
Operation objects for executionparseIdentifier(String identifier): Parses table/column identifiers
UnresolvedIdentifier for catalog resolutionparseSqlExpression(...): Parses SQL expressions within contexts
Usage Example:
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
// Parser is typically obtained from PlannerBase
Parser parser = planner.getParser();
// Parse SQL statements
List<Operation> operations = parser.parse("CREATE TABLE my_table (id INT, name STRING)");
List<Operation> queryOps = parser.parse("SELECT id, UPPER(name) FROM my_table WHERE id > 100");
// Parse identifiers
UnresolvedIdentifier tableId = parser.parseIdentifier("catalog.database.table");
UnresolvedIdentifier simpleId = parser.parseIdentifier("my_table");
// Parse expressions
RowType inputType = // define input row type
LogicalType outputType = DataTypes.STRING().getLogicalType();
ResolvedExpression expr = parser.parseSqlExpression(
"UPPER(name)",
inputType,
outputType
);
// Get completion hints for IDE integration
String[] hints = parser.getCompletionHints("SELECT * FROM my_ta", 18);The core planning process follows these stages:
ParserImpl converts SQL text into Operation objectsPlannerBase applies Calcite optimization rulesTransformation objectsExecutor converts transformations to executable job graphs// Typical planning flow
Parser parser = planner.getParser();
// 1. Parse SQL to operations
List<Operation> operations = parser.parse(sqlStatement);
// 2. Translate to transformations (includes validation & optimization)
List<Transformation<?>> transformations = planner.translate(operations);
// 3. Execute transformations
JobExecutionResult result = executor.execute(transformations);The planner integrates deeply with Apache Calcite's optimization framework:
CalciteConfigThe planning components provide comprehensive error handling:
try {
List<Operation> operations = parser.parse(sql);
List<Transformation<?>> transformations = planner.translate(operations);
} catch (SqlParseException e) {
// Handle SQL syntax errors
} catch (ValidationException e) {
// Handle semantic validation errors
} catch (TableException e) {
// Handle table-specific errors
}Common error scenarios: