Flink Table Planner connects Table/SQL API and runtime, responsible for translating and optimizing table programs into Flink pipelines.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-planner_2-11@1.14.0Flink Table Planner connects Table/SQL API and runtime, responsible for translating and optimizing table programs into Flink pipelines. This module serves as the critical bridge between Flink's declarative Table/SQL API and the runtime execution engine, leveraging Apache Calcite for sophisticated query planning, optimization, and code generation capabilities for both streaming and batch processing workloads.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.table.planner.delegation.{DefaultPlannerFactory, DefaultParserFactory}
import org.apache.flink.table.planner.calcite.CalciteConfig
import org.apache.flink.table.factories.{PlannerFactory, ParserFactory}import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;
import org.apache.flink.table.planner.delegation.DefaultParserFactory;
import org.apache.flink.table.planner.calcite.CalciteConfig;import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;
import org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.api.*;
// Create planner factory
PlannerFactory plannerFactory = new DefaultPlannerFactory();
// Create planner with custom configuration
CalciteConfig calciteConfig = CalciteConfig.createBuilder()
.replaceSqlParserConfig(customParserConfig)
.build();
// Use with TableEnvironment (typically handled automatically)
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.build()
);The flink-table-planner module consists of several key architectural components:
Entry points for creating planner components through the Service Provider Interface (SPI).
public final class DefaultPlannerFactory implements PlannerFactory {
public String factoryIdentifier();
public Planner create(Context context);
}
public class DefaultParserFactory implements ParserFactory {
public String factoryIdentifier();
public Parser create(Context context);
}
public final class DefaultExecutorFactory implements ExecutorFactory {
public Executor create(Context context);
}Factory Classes and Service Providers
Core planner implementations for streaming and batch processing modes with SQL parsing capabilities.
abstract class PlannerBase extends Planner {
def getTableEnvironment: TableEnvironment
def getFlinkRelBuilder: FlinkRelBuilder
def getTypeFactory: FlinkTypeFactory
}
class StreamPlanner extends PlannerBase
class BatchPlanner extends PlannerBasepublic class ParserImpl implements Parser {
public List<Operation> parse(String statement);
public UnresolvedIdentifier parseIdentifier(String identifier);
public ResolvedExpression parseSqlExpression(String sqlExpression, RowType inputRowType, LogicalType outputType);
}Configuration classes for customizing Calcite behavior and planner settings.
trait CalciteConfig {
def getSqlParserConfig: Option[SqlParser.Config]
def getSqlOperatorTable: Option[SqlOperatorTable]
def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]
def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]
}
object CalciteConfig {
def createBuilder(): CalciteConfigBuilder
}
class CalciteConfigBuilder {
def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder
def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder
def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder
def build(): CalciteConfig
}Interfaces and utilities for integrating custom table sources and sinks with the planner.
public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider {
Transformation<RowData> createTransformation(Context context);
}
public interface TransformationSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
Transformation<?> createTransformation(Context context);
}
public final class DynamicSourceUtils {
public static RelNode convertDataStreamToRel(StreamTableEnvironment tableEnv, DataStream<RowData> dataStream, List<String> fieldNames);
public static RelNode convertSourceToRel(FlinkOptimizeContext optimizeContext, RelOptTable relOptTable, DynamicTableSource tableSource, FlinkStatistic statistic);
}ExecNode hierarchy and translator interfaces for query execution plan generation.
public interface ExecNode<T> extends ExecNodeTranslator<T> {
int getId();
String getDescription();
LogicalType getOutputType();
List<InputProperty> getInputProperties();
List<ExecEdge> getInputEdges();
void setInputEdges(List<ExecEdge> inputEdges);
void replaceInputEdge(int index, ExecEdge newInputEdge);
void accept(ExecNodeVisitor visitor);
}
public interface StreamExecNode<T> extends ExecNode<T> {}
public interface BatchExecNode<T> extends ExecNode<T> {}Type system bridging between Flink and Calcite with expression handling utilities.
public final class RexNodeExpression implements ResolvedExpression {
public RexNode getRexNode();
public LogicalType getOutputDataType();
public List<ResolvedExpression> getResolvedChildren();
public <R> R accept(ExpressionVisitor<R> visitor);
}
public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil {
public static final PlannerTypeInferenceUtilImpl INSTANCE;
public TypeInference runTypeInference(CallExpression callExpression, CallContext callContext);
}Public enums for execution strategies, traits, and configuration constants.
public enum AggregatePhaseStrategy {
AUTO, ONE_PHASE, TWO_PHASE
}
public enum UpdateKind {
ONLY_UPDATE_AFTER, BEFORE_AND_AFTER
}
public enum ModifyKind {
INSERT, UPDATE, DELETE
}
public enum MiniBatchMode {
ProcTime, RowTime, None
}The module registers factories via Java SPI in META-INF/services/org.apache.flink.table.factories.Factory:
DefaultPlannerFactory for planner creationDefaultParserFactory for parser creationDefaultExecutorFactory for executor creationExtensive use of Apache Calcite for:
Seamless integration between Flink's type system and Calcite's type system:
FlinkTypeFactory for type creation and conversionRexNodeExpression for expression bridging