0
# Flink Table Planner
1
2
Flink Table Planner connects Table/SQL API and runtime, responsible for translating and optimizing table programs into Flink pipelines. This module serves as the critical bridge between Flink's declarative Table/SQL API and the runtime execution engine, leveraging Apache Calcite for sophisticated query planning, optimization, and code generation capabilities for both streaming and batch processing workloads.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-table-planner_2.11
7
- **Package Type**: maven
8
- **Language**: Scala/Java
9
- **Version**: 1.14.6
10
- **Installation**: Add dependency to your Maven `pom.xml`:
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-table-planner_2.11</artifactId>
16
<version>1.14.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```scala
23
import org.apache.flink.table.planner.delegation.{DefaultPlannerFactory, DefaultParserFactory}
24
import org.apache.flink.table.planner.calcite.CalciteConfig
25
import org.apache.flink.table.factories.{PlannerFactory, ParserFactory}
26
```
27
28
```java
29
import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;
30
import org.apache.flink.table.planner.delegation.DefaultParserFactory;
31
import org.apache.flink.table.planner.calcite.CalciteConfig;
32
```
33
34
## Basic Usage
35
36
```java
37
import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;
38
import org.apache.flink.table.planner.calcite.CalciteConfig;
39
import org.apache.flink.table.api.*;
40
41
// Create planner factory
42
PlannerFactory plannerFactory = new DefaultPlannerFactory();
43
44
// Create planner with custom configuration
45
CalciteConfig calciteConfig = CalciteConfig.createBuilder()
46
.replaceSqlParserConfig(customParserConfig)
47
.build();
48
49
// Use with TableEnvironment (typically handled automatically)
50
TableEnvironment tableEnv = TableEnvironment.create(
51
EnvironmentSettings.newInstance()
52
.useBlinkPlanner()
53
.build()
54
);
55
```
56
57
## Architecture
58
59
The flink-table-planner module consists of several key architectural components:
60
61
- **Factory Layer**: Service provider implementations for creating planners, parsers, and executors
62
- **Planning Layer**: Core planner implementations for streaming and batch processing modes
63
- **Configuration Layer**: Calcite configuration and planner configuration management
64
- **Connector Layer**: Integration interfaces and utilities for table sources and sinks
65
- **Execution Layer**: ExecNode hierarchy and transformation translators
66
- **Type System**: Bridging between Flink and Calcite type systems
67
- **Expression System**: RexNode expressions and type inference utilities
68
69
## Capabilities
70
71
### Factory Classes and Service Providers
72
73
Entry points for creating planner components through the Service Provider Interface (SPI).
74
75
```java { .api }
76
public final class DefaultPlannerFactory implements PlannerFactory {
77
public String factoryIdentifier();
78
public Planner create(Context context);
79
}
80
81
public class DefaultParserFactory implements ParserFactory {
82
public String factoryIdentifier();
83
public Parser create(Context context);
84
}
85
86
public final class DefaultExecutorFactory implements ExecutorFactory {
87
public Executor create(Context context);
88
}
89
```
90
91
[Factory Classes and Service Providers](./factory-classes.md)
92
93
### Core Planning Components
94
95
Core planner implementations for streaming and batch processing modes with SQL parsing capabilities.
96
97
```scala { .api }
98
abstract class PlannerBase extends Planner {
99
def getTableEnvironment: TableEnvironment
100
def getFlinkRelBuilder: FlinkRelBuilder
101
def getTypeFactory: FlinkTypeFactory
102
}
103
104
class StreamPlanner extends PlannerBase
105
class BatchPlanner extends PlannerBase
106
```
107
108
```java { .api }
109
public class ParserImpl implements Parser {
110
public List<Operation> parse(String statement);
111
public UnresolvedIdentifier parseIdentifier(String identifier);
112
public ResolvedExpression parseSqlExpression(String sqlExpression, RowType inputRowType, LogicalType outputType);
113
}
114
```
115
116
[Core Planning Components](./core-planning.md)
117
118
### Configuration and Builders
119
120
Configuration classes for customizing Calcite behavior and planner settings.
121
122
```scala { .api }
123
trait CalciteConfig {
124
def getSqlParserConfig: Option[SqlParser.Config]
125
def getSqlOperatorTable: Option[SqlOperatorTable]
126
def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]
127
def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]
128
}
129
130
object CalciteConfig {
131
def createBuilder(): CalciteConfigBuilder
132
}
133
134
class CalciteConfigBuilder {
135
def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder
136
def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
137
def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
138
def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder
139
def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder
140
def build(): CalciteConfig
141
}
142
```
143
144
[Configuration and Builders](./configuration.md)
145
146
### Connector Integration
147
148
Interfaces and utilities for integrating custom table sources and sinks with the planner.
149
150
```java { .api }
151
public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider {
152
Transformation<RowData> createTransformation(Context context);
153
}
154
155
public interface TransformationSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
156
Transformation<?> createTransformation(Context context);
157
}
158
159
public final class DynamicSourceUtils {
160
public static RelNode convertDataStreamToRel(StreamTableEnvironment tableEnv, DataStream<RowData> dataStream, List<String> fieldNames);
161
public static RelNode convertSourceToRel(FlinkOptimizeContext optimizeContext, RelOptTable relOptTable, DynamicTableSource tableSource, FlinkStatistic statistic);
162
}
163
```
164
165
[Connector Integration](./connector-integration.md)
166
167
### Execution Nodes
168
169
ExecNode hierarchy and translator interfaces for query execution plan generation.
170
171
```java { .api }
172
public interface ExecNode<T> extends ExecNodeTranslator<T> {
173
int getId();
174
String getDescription();
175
LogicalType getOutputType();
176
List<InputProperty> getInputProperties();
177
List<ExecEdge> getInputEdges();
178
void setInputEdges(List<ExecEdge> inputEdges);
179
void replaceInputEdge(int index, ExecEdge newInputEdge);
180
void accept(ExecNodeVisitor visitor);
181
}
182
183
public interface StreamExecNode<T> extends ExecNode<T> {}
184
185
public interface BatchExecNode<T> extends ExecNode<T> {}
186
```
187
188
[Execution Nodes](./execution-nodes.md)
189
190
### Type System and Expressions
191
192
Type system bridging between Flink and Calcite with expression handling utilities.
193
194
```java { .api }
195
public final class RexNodeExpression implements ResolvedExpression {
196
public RexNode getRexNode();
197
public LogicalType getOutputDataType();
198
public List<ResolvedExpression> getResolvedChildren();
199
public <R> R accept(ExpressionVisitor<R> visitor);
200
}
201
202
public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil {
203
public static final PlannerTypeInferenceUtilImpl INSTANCE;
204
public TypeInference runTypeInference(CallExpression callExpression, CallContext callContext);
205
}
206
```
207
208
[Type System and Expressions](./type-system.md)
209
210
### Enums and Constants
211
212
Public enums for execution strategies, traits, and configuration constants.
213
214
```java { .api }
215
public enum AggregatePhaseStrategy {
216
AUTO, ONE_PHASE, TWO_PHASE
217
}
218
219
public enum UpdateKind {
220
ONLY_UPDATE_AFTER, BEFORE_AND_AFTER
221
}
222
223
public enum ModifyKind {
224
INSERT, UPDATE, DELETE
225
}
226
227
public enum MiniBatchMode {
228
ProcTime, RowTime, None
229
}
230
```
231
232
[Enums and Constants](./enums-constants.md)
233
234
## Integration Points
235
236
### Service Provider Interface (SPI)
237
238
The module registers factories via Java SPI in `META-INF/services/org.apache.flink.table.factories.Factory`:
239
- `DefaultPlannerFactory` for planner creation
240
- `DefaultParserFactory` for parser creation
241
- `DefaultExecutorFactory` for executor creation
242
243
### Calcite Integration
244
245
Extensive use of Apache Calcite for:
246
- SQL parsing with customizable parser configurations
247
- Query optimization through rule-based and cost-based optimization
248
- Relational algebra representation and transformation
249
250
### Type System Bridge
251
252
Seamless integration between Flink's type system and Calcite's type system:
253
- `FlinkTypeFactory` for type creation and conversion
254
- `RexNodeExpression` for expression bridging
255
- Type inference utilities for function calls and expressions