0
# Query Planning and Optimization
1
2
The query planning system provides comprehensive query optimization capabilities for both streaming and batch workloads. It includes logical plan creation, rule-based optimization, cost-based optimization, and execution graph generation using Apache Calcite as the underlying optimization framework.
3
4
## Capabilities
5
6
### PlannerBase - Abstract Planner Foundation
7
8
Base class providing common functionality for both streaming and batch planners, including query translation, optimization, and execution plan generation.
9
10
```scala { .api }
11
/**
12
* Abstract base class for Blink planners providing common functionality
13
*/
14
abstract class PlannerBase(
15
executor: Executor,
16
config: TableConfig,
17
functionCatalog: FunctionCatalog,
18
catalogManager: CatalogManager,
19
isStreamingMode: Boolean
20
) extends Planner {
21
22
/**
23
* Translates modify operation to relational algebra representation
24
* @param modifyOperation The operation to translate (INSERT, UPDATE, DELETE)
25
* @return RelNode representing the operation in Calcite's algebra
26
*/
27
def translateToRel(modifyOperation: ModifyOperation): RelNode
28
29
/**
30
* Optimizes a sequence of relational nodes using optimization rules
31
* @param relNodes Sequence of RelNode instances to optimize
32
* @return Optimized sequence of RelNode instances
33
*/
34
def optimize(relNodes: Seq[RelNode]): Seq[RelNode]
35
36
/**
37
* Translates execution graph to Flink transformations
38
* @param execGraph Execution node graph to translate
39
* @return List of Flink transformations for job execution
40
*/
41
def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
42
43
/**
44
* Generates execution plan explanation for debugging and analysis
45
* @param operations List of operations to explain
46
* @param extraDetails Additional detail levels for explanation
47
* @return String representation of the execution plan
48
*/
49
def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
50
51
/**
52
* Returns trait definitions for relational algebra optimization
53
* @return Array of trait definitions used by this planner
54
*/
55
protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]]
56
57
/**
58
* Returns optimizer instance for this planner
59
* @return Optimizer instance implementing optimization strategies
60
*/
61
protected def getOptimizer: Optimizer
62
63
/**
64
* Returns execution node graph processors for post-optimization processing
65
* @return Sequence of processors for execution graph transformation
66
*/
67
protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor]
68
}
69
```
70
71
### StreamPlanner - Streaming Query Planning
72
73
Planner implementation optimized for streaming workloads with support for watermarks, windowing, and continuous query processing.
74
75
```scala { .api }
76
/**
77
* Planner implementation for streaming execution mode
78
*/
79
class StreamPlanner(
80
executor: Executor,
81
config: TableConfig,
82
functionCatalog: FunctionCatalog,
83
catalogManager: CatalogManager
84
) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
85
86
/**
87
* Returns trait definitions specific to streaming execution
88
* Includes distribution, mini-batch interval, modify kind, and update kind traits
89
*/
90
protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = Array(
91
ConventionTraitDef.INSTANCE,
92
FlinkRelDistributionTraitDef.INSTANCE,
93
MiniBatchIntervalTraitDef.INSTANCE,
94
ModifyKindSetTraitDef.INSTANCE,
95
UpdateKindTraitDef.INSTANCE
96
)
97
98
/**
99
* Returns stream-specific optimizer with common sub-graph optimization
100
* @return StreamCommonSubGraphBasedOptimizer instance
101
*/
102
protected def getOptimizer: Optimizer = new StreamCommonSubGraphBasedOptimizer(this)
103
104
/**
105
* Returns empty processor sequence for streaming (no additional processing needed)
106
* @return Empty sequence of processors
107
*/
108
protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor] = Seq()
109
110
/**
111
* Translates execution graph to stream transformations
112
* @param execGraph Execution node graph for streaming
113
* @return List of streaming transformations
114
*/
115
protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
116
}
117
```
118
119
**Usage Example:**
120
121
```scala
122
import org.apache.flink.table.planner.delegation.StreamPlanner
123
124
// StreamPlanner is typically created via BlinkPlannerFactory
125
// but can be instantiated directly for advanced use cases
126
val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager)
127
128
// Translate SQL to execution plan
129
val operations = parseAndValidateSQL("SELECT * FROM source WHERE value > 100")
130
val transformations = streamPlanner.translateToRel(operations)
131
```
132
133
### BatchPlanner - Batch Query Planning
134
135
Planner implementation optimized for batch workloads with support for batch-specific optimizations and execution strategies.
136
137
```scala { .api }
138
/**
139
* Planner implementation for batch execution mode
140
*/
141
class BatchPlanner(
142
executor: Executor,
143
config: TableConfig,
144
functionCatalog: FunctionCatalog,
145
catalogManager: CatalogManager
146
) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = false) {
147
148
/**
149
* Returns trait definitions specific to batch execution
150
* Focuses on distribution and convention traits without streaming-specific traits
151
*/
152
protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]]
153
154
/**
155
* Returns batch-specific optimizer with batch optimization strategies
156
* @return BatchCommonSubGraphBasedOptimizer instance
157
*/
158
protected def getOptimizer: Optimizer
159
160
/**
161
* Returns batch-specific processors for execution graph optimization
162
* @return Sequence of batch-specific processors
163
*/
164
protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor]
165
166
/**
167
* Translates execution graph to batch transformations
168
* @param execGraph Execution node graph for batch processing
169
* @return List of batch transformations
170
*/
171
protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
172
}
173
```
174
175
### Optimizer System
176
177
The optimization system provides multi-phase query optimization using Apache Calcite's rule-based and cost-based optimization framework.
178
179
```scala { .api }
180
/**
181
* Base optimizer interface for query optimization
182
*/
183
trait Optimizer {
184
/**
185
* Optimizes relational algebra expressions using optimization rules
186
* @param roots Root nodes of the query plan to optimize
187
* @return Optimized execution node graph
188
*/
189
def optimize(roots: Seq[RelNode]): ExecNodeGraph
190
}
191
192
/**
193
* Stream-specific optimizer with common sub-graph based optimization
194
*/
195
class StreamCommonSubGraphBasedOptimizer(planner: PlannerBase) extends Optimizer {
196
/**
197
* Applies streaming-specific optimization rules and strategies
198
* Includes watermark propagation, mini-batch optimization, and state optimization
199
*/
200
def optimize(roots: Seq[RelNode]): ExecNodeGraph
201
}
202
203
/**
204
* Batch-specific optimizer with batch optimization strategies
205
*/
206
class BatchCommonSubGraphBasedOptimizer(planner: PlannerBase) extends Optimizer {
207
/**
208
* Applies batch-specific optimization rules and strategies
209
* Includes join reordering, aggregation pushdown, and partition pruning
210
*/
211
def optimize(roots: Seq[RelNode]): ExecNodeGraph
212
}
213
```
214
215
### Execution Node Graph
216
217
Represents the optimized execution plan as a graph of execution nodes, providing the bridge between logical planning and physical execution.
218
219
```java { .api }
220
/**
221
* Graph representation of execution nodes for query execution
222
*/
223
public class ExecNodeGraph {
224
/**
225
* Returns root nodes of the execution graph
226
* @return List of root execution nodes
227
*/
228
public List<ExecNode<?>> getRootNodes();
229
230
/**
231
* Accepts visitor for traversing the execution graph
232
* @param visitor Visitor implementation for graph traversal
233
*/
234
public void accept(ExecNodeVisitor visitor);
235
236
/**
237
* Returns all nodes in the execution graph
238
* @return Set of all execution nodes in the graph
239
*/
240
public Set<ExecNode<?>> getAllNodes();
241
}
242
243
/**
244
* Base interface for execution nodes in the graph
245
*/
246
public interface ExecNode<T> {
247
/**
248
* Translates this execution node to Flink transformation
249
* @param planner Planner context for translation
250
* @return Flink transformation representing this node
251
*/
252
Transformation<T> translateToPlan(Planner planner);
253
254
/**
255
* Returns input nodes of this execution node
256
* @return List of input execution nodes
257
*/
258
List<ExecNode<?>> getInputNodes();
259
260
/**
261
* Returns output type of this execution node
262
* @return Type information for the output
263
*/
264
TypeInformation<T> getOutputType();
265
}
266
```
267
268
## Optimization Phases
269
270
### Phase 1: Logical Planning
271
- SQL parsing and validation using Calcite
272
- Logical plan creation with RelNode representation
273
- Initial semantic validation and type checking
274
275
### Phase 2: Rule-Based Optimization
276
- Application of transformation rules (projection pushdown, filter pushdown, etc.)
277
- Join reordering and optimization
278
- Predicate simplification and constant folding
279
280
### Phase 3: Cost-Based Optimization
281
- Statistics-based cost estimation
282
- Join algorithm selection (hash join, sort-merge join, etc.)
283
- Access path selection and index usage
284
285
### Phase 4: Physical Planning
286
- Translation to execution nodes (ExecNode graph)
287
- Operator selection and configuration
288
- Resource allocation and parallelism planning
289
290
### Phase 5: Code Generation
291
- Dynamic code generation for optimized execution
292
- Fusion of operators where beneficial
293
- Memory management optimization
294
295
## Configuration and Tuning
296
297
Key configuration options for query planning:
298
299
```java
300
// Enable cost-based optimization
301
tableConfig.getConfiguration().setString("table.optimizer.cbo-enabled", "true");
302
303
// Set join reordering optimization
304
tableConfig.getConfiguration().setString("table.optimizer.join-reorder-enabled", "true");
305
306
// Configure mini-batch settings for streaming
307
tableConfig.getConfiguration().setString("table.exec.mini-batch.enabled", "true");
308
tableConfig.getConfiguration().setString("table.exec.mini-batch.allow-latency", "1s");
309
```