0
# Core Planning Components
1
2
Core planner implementations handle the translation and optimization of table programs into Flink execution plans. The planning system supports both streaming and batch processing modes with sophisticated SQL parsing capabilities.
3
4
## Package Information
5
6
```scala
7
import org.apache.flink.table.planner.delegation.{PlannerBase, StreamPlanner, BatchPlanner}
8
import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory}
9
import org.apache.flink.table.api.TableEnvironment
10
```
11
12
```java
13
import org.apache.flink.table.planner.delegation.ParserImpl;
14
import org.apache.flink.table.delegation.Parser;
15
import org.apache.flink.table.operations.Operation;
16
import org.apache.flink.table.expressions.ResolvedExpression;
17
import org.apache.flink.table.catalog.UnresolvedIdentifier;
18
```
19
20
## Capabilities
21
22
### PlannerBase (Abstract Base Class)
23
24
The foundational abstract class that provides common functionality for both streaming and batch planners.
25
26
```scala { .api }
27
abstract class PlannerBase extends Planner {
28
def getTableEnvironment: TableEnvironment
29
def getFlinkRelBuilder: FlinkRelBuilder
30
def getTypeFactory: FlinkTypeFactory
31
def getExecEnv: StreamExecutionEnvironment
32
def getTableConfig: TableConfig
33
def getFlinkContext: FlinkContext
34
def getCatalogManager: CatalogManager
35
def getFunctionCatalog: FunctionCatalog
36
37
// Core planning methods
38
def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
39
def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
40
def getCompletionHints(statement: String, position: Int): Array[String]
41
def getParser: Parser
42
}
43
```
44
45
**Key Responsibilities:**
46
- Manages the planner context and configuration (`PlannerContext`, `PlannerConfiguration`)
47
- Provides access to Calcite components (`FlinkRelBuilder`, `FlinkTypeFactory`)
48
- Handles query optimization and execution plan generation
49
- Integrates with Flink's catalog system and function registry
50
- Manages the lifecycle of planning operations
51
52
**Usage Example:**
53
54
```scala
55
// PlannerBase is typically accessed through concrete implementations
56
val planner: PlannerBase = // obtained from factory
57
58
// Access core components
59
val tableEnv = planner.getTableEnvironment
60
val relBuilder = planner.getFlinkRelBuilder
61
val typeFactory = planner.getTypeFactory
62
63
// Translate operations to transformations
64
val operations: util.List[ModifyOperation] = // parsed operations
65
val transformations = planner.translate(operations)
66
67
// Explain query plans
68
val explanation = planner.explain(operations, ExplainDetail.ESTIMATED_COST)
69
```
70
71
### StreamPlanner
72
73
Concrete planner implementation specialized for streaming workloads.
74
75
```scala { .api }
76
class StreamPlanner(
77
executor: Executor,
78
config: TableConfig,
79
functionCatalog: FunctionCatalog,
80
catalogManager: CatalogManager,
81
isStreamingMode: Boolean
82
) extends PlannerBase {
83
84
// Streaming-specific optimization and translation
85
override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
86
87
// Streaming-specific explain functionality
88
override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
89
}
90
```
91
92
**Key Features:**
93
- Optimized for continuous stream processing
94
- Supports streaming-specific operators (windowing, watermarks, state management)
95
- Handles incremental computation and result updates
96
- Manages streaming execution semantics (event-time processing, late data handling)
97
98
**Usage Example:**
99
100
```scala
101
// StreamPlanner is created through DefaultPlannerFactory for streaming mode
102
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
103
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
104
105
val env = StreamExecutionEnvironment.getExecutionEnvironment
106
val tableEnv = StreamTableEnvironment.create(env)
107
108
// The planner is internal to the table environment
109
val streamPlanner = // accessed internally
110
111
// Process streaming operations
112
val operations = // parsed streaming operations
113
val transformations = streamPlanner.translate(operations)
114
```
115
116
### BatchPlanner
117
118
Concrete planner implementation specialized for batch workloads.
119
120
```scala { .api }
121
class BatchPlanner(
122
executor: Executor,
123
config: TableConfig,
124
functionCatalog: FunctionCatalog,
125
catalogManager: CatalogManager
126
) extends PlannerBase {
127
128
// Batch-specific optimization and translation
129
override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
130
131
// Batch-specific explain functionality
132
override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
133
}
134
```
135
136
**Key Features:**
137
- Optimized for finite dataset processing
138
- Supports batch-specific optimizations (join reordering, predicate pushdown)
139
- Handles bounded data processing patterns
140
- Manages batch execution semantics and resource allocation
141
142
**Usage Example:**
143
144
```scala
145
// BatchPlanner is created through DefaultPlannerFactory for batch mode
146
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
147
148
val tableEnv = TableEnvironment.create(
149
EnvironmentSettings.newInstance()
150
.inBatchMode()
151
.build()
152
)
153
154
// The planner is internal to the table environment
155
val batchPlanner = // accessed internally
156
157
// Process batch operations
158
val operations = // parsed batch operations
159
val transformations = batchPlanner.translate(operations)
160
```
161
162
### ParserImpl
163
164
Default SQL parser implementation using Apache Calcite for parsing SQL statements, identifiers, and expressions.
165
166
```java { .api }
167
public class ParserImpl implements Parser {
168
169
// Constructor
170
public ParserImpl(
171
CatalogManager catalogManager,
172
Supplier<FlinkPlannerImpl> validatorSupplier,
173
Supplier<CalciteParser> calciteParserSupplier,
174
RexNodeToExpressionConverter rexNodeToExpressionConverter
175
);
176
177
// Core parsing methods
178
public List<Operation> parse(String statement);
179
public UnresolvedIdentifier parseIdentifier(String identifier);
180
public ResolvedExpression parseSqlExpression(
181
String sqlExpression,
182
RowType inputRowType,
183
LogicalType outputType
184
);
185
186
// Completion and validation
187
public String[] getCompletionHints(String statement, int position);
188
}
189
```
190
191
**Key Methods:**
192
193
- **`parse(String statement)`**: Parses SQL statements into executable operations
194
- Handles DDL (CREATE TABLE, ALTER TABLE, etc.)
195
- Handles DML (SELECT, INSERT, UPDATE, DELETE)
196
- Handles utility statements (SHOW TABLES, DESCRIBE, etc.)
197
- Returns list of `Operation` objects for execution
198
199
- **`parseIdentifier(String identifier)`**: Parses table/column identifiers
200
- Supports qualified names (catalog.database.table)
201
- Handles quoted and unquoted identifiers
202
- Returns `UnresolvedIdentifier` for catalog resolution
203
204
- **`parseSqlExpression(...)`**: Parses SQL expressions within contexts
205
- Used for computed columns, filters, projections
206
- Requires input row type for context
207
- Returns resolved expressions with type information
208
209
**Usage Example:**
210
211
```java
212
import org.apache.flink.table.planner.delegation.ParserImpl;
213
import org.apache.flink.table.operations.Operation;
214
import org.apache.flink.table.catalog.UnresolvedIdentifier;
215
216
// Parser is typically obtained from PlannerBase
217
Parser parser = planner.getParser();
218
219
// Parse SQL statements
220
List<Operation> operations = parser.parse("CREATE TABLE my_table (id INT, name STRING)");
221
List<Operation> queryOps = parser.parse("SELECT id, UPPER(name) FROM my_table WHERE id > 100");
222
223
// Parse identifiers
224
UnresolvedIdentifier tableId = parser.parseIdentifier("catalog.database.table");
225
UnresolvedIdentifier simpleId = parser.parseIdentifier("my_table");
226
227
// Parse expressions
228
RowType inputType = // define input row type
229
LogicalType outputType = DataTypes.STRING().getLogicalType();
230
ResolvedExpression expr = parser.parseSqlExpression(
231
"UPPER(name)",
232
inputType,
233
outputType
234
);
235
236
// Get completion hints for IDE integration
237
String[] hints = parser.getCompletionHints("SELECT * FROM my_ta", 18);
238
```
239
240
## Planning Process Flow
241
242
The core planning process follows these stages:
243
244
1. **Parsing**: `ParserImpl` converts SQL text into `Operation` objects
245
2. **Validation**: Operations are validated against catalog and type system
246
3. **Optimization**: `PlannerBase` applies Calcite optimization rules
247
4. **Translation**: Optimized plans are translated to Flink `Transformation` objects
248
5. **Execution**: `Executor` converts transformations to executable job graphs
249
250
```java
251
// Typical planning flow
252
Parser parser = planner.getParser();
253
254
// 1. Parse SQL to operations
255
List<Operation> operations = parser.parse(sqlStatement);
256
257
// 2. Translate to transformations (includes validation & optimization)
258
List<Transformation<?>> transformations = planner.translate(operations);
259
260
// 3. Execute transformations
261
JobExecutionResult result = executor.execute(transformations);
262
```
263
264
## Optimization Integration
265
266
The planner integrates deeply with Apache Calcite's optimization framework:
267
268
- **Rule-Based Optimization**: Applies transformation rules to improve query plans
269
- **Cost-Based Optimization**: Uses statistics to choose optimal join orders and algorithms
270
- **Custom Rules**: Flink-specific optimization rules for streaming and batch scenarios
271
- **Program Chaining**: Supports custom optimization programs via `CalciteConfig`
272
273
## Error Handling
274
275
The planning components provide comprehensive error handling:
276
277
```java
278
try {
279
List<Operation> operations = parser.parse(sql);
280
List<Transformation<?>> transformations = planner.translate(operations);
281
} catch (SqlParseException e) {
282
// Handle SQL syntax errors
283
} catch (ValidationException e) {
284
// Handle semantic validation errors
285
} catch (TableException e) {
286
// Handle table-specific errors
287
}
288
```
289
290
Common error scenarios:
291
- **Parse Errors**: Invalid SQL syntax, unrecognized keywords
292
- **Validation Errors**: Unknown tables/columns, type mismatches, unsupported operations
293
- **Planning Errors**: Optimization failures, unsupported query patterns
294
- **Resource Errors**: Insufficient memory, configuration issues