or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdcode-generation.mdexpression-system.mdfunction-integration.mdindex.mdplanner-factory.mdquery-planning.md

query-planning.mddocs/

0

# Query Planning and Optimization

1

2

The query planning system provides comprehensive query optimization capabilities for both streaming and batch workloads. It includes logical plan creation, rule-based optimization, cost-based optimization, and execution graph generation using Apache Calcite as the underlying optimization framework.

3

4

## Capabilities

5

6

### PlannerBase - Abstract Planner Foundation

7

8

Base class providing common functionality for both streaming and batch planners, including query translation, optimization, and execution plan generation.

9

10

```scala { .api }

11

/**

12

* Abstract base class for Blink planners providing common functionality

13

*/

14

abstract class PlannerBase(

15

executor: Executor,

16

config: TableConfig,

17

functionCatalog: FunctionCatalog,

18

catalogManager: CatalogManager,

19

isStreamingMode: Boolean

20

) extends Planner {

21

22

/**

23

* Translates modify operation to relational algebra representation

24

* @param modifyOperation The operation to translate (INSERT, UPDATE, DELETE)

25

* @return RelNode representing the operation in Calcite's algebra

26

*/

27

def translateToRel(modifyOperation: ModifyOperation): RelNode

28

29

/**

30

* Optimizes a sequence of relational nodes using optimization rules

31

* @param relNodes Sequence of RelNode instances to optimize

32

* @return Optimized sequence of RelNode instances

33

*/

34

def optimize(relNodes: Seq[RelNode]): Seq[RelNode]

35

36

/**

37

* Translates execution graph to Flink transformations

38

* @param execGraph Execution node graph to translate

39

* @return List of Flink transformations for job execution

40

*/

41

def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]

42

43

/**

44

* Generates execution plan explanation for debugging and analysis

45

* @param operations List of operations to explain

46

* @param extraDetails Additional detail levels for explanation

47

* @return String representation of the execution plan

48

*/

49

def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String

50

51

/**

52

* Returns trait definitions for relational algebra optimization

53

* @return Array of trait definitions used by this planner

54

*/

55

protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]]

56

57

/**

58

* Returns optimizer instance for this planner

59

* @return Optimizer instance implementing optimization strategies

60

*/

61

protected def getOptimizer: Optimizer

62

63

/**

64

* Returns execution node graph processors for post-optimization processing

65

* @return Sequence of processors for execution graph transformation

66

*/

67

protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor]

68

}

69

```

70

71

### StreamPlanner - Streaming Query Planning

72

73

Planner implementation optimized for streaming workloads with support for watermarks, windowing, and continuous query processing.

74

75

```scala { .api }

76

/**

77

* Planner implementation for streaming execution mode

78

*/

79

class StreamPlanner(

80

executor: Executor,

81

config: TableConfig,

82

functionCatalog: FunctionCatalog,

83

catalogManager: CatalogManager

84

) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {

85

86

/**

87

* Returns trait definitions specific to streaming execution

88

* Includes distribution, mini-batch interval, modify kind, and update kind traits

89

*/

90

protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = Array(

91

ConventionTraitDef.INSTANCE,

92

FlinkRelDistributionTraitDef.INSTANCE,

93

MiniBatchIntervalTraitDef.INSTANCE,

94

ModifyKindSetTraitDef.INSTANCE,

95

UpdateKindTraitDef.INSTANCE

96

)

97

98

/**

99

* Returns stream-specific optimizer with common sub-graph optimization

100

* @return StreamCommonSubGraphBasedOptimizer instance

101

*/

102

protected def getOptimizer: Optimizer = new StreamCommonSubGraphBasedOptimizer(this)

103

104

/**

105

* Returns empty processor sequence for streaming (no additional processing needed)

106

* @return Empty sequence of processors

107

*/

108

protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor] = Seq()

109

110

/**

111

* Translates execution graph to stream transformations

112

* @param execGraph Execution node graph for streaming

113

* @return List of streaming transformations

114

*/

115

protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]

116

}

117

```

118

119

**Usage Example:**

120

121

```scala

122

import org.apache.flink.table.planner.delegation.StreamPlanner

123

124

// StreamPlanner is typically created via BlinkPlannerFactory

125

// but can be instantiated directly for advanced use cases

126

val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager)

127

128

// Translate SQL to execution plan

129

val operations = parseAndValidateSQL("SELECT * FROM source WHERE value > 100")

130

val transformations = streamPlanner.translateToRel(operations)

131

```

132

133

### BatchPlanner - Batch Query Planning

134

135

Planner implementation optimized for batch workloads with support for batch-specific optimizations and execution strategies.

136

137

```scala { .api }

138

/**

139

* Planner implementation for batch execution mode

140

*/

141

class BatchPlanner(

142

executor: Executor,

143

config: TableConfig,

144

functionCatalog: FunctionCatalog,

145

catalogManager: CatalogManager

146

) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = false) {

147

148

/**

149

* Returns trait definitions specific to batch execution

150

* Focuses on distribution and convention traits without streaming-specific traits

151

*/

152

protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]]

153

154

/**

155

* Returns batch-specific optimizer with batch optimization strategies

156

* @return BatchCommonSubGraphBasedOptimizer instance

157

*/

158

protected def getOptimizer: Optimizer

159

160

/**

161

* Returns batch-specific processors for execution graph optimization

162

* @return Sequence of batch-specific processors

163

*/

164

protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor]

165

166

/**

167

* Translates execution graph to batch transformations

168

* @param execGraph Execution node graph for batch processing

169

* @return List of batch transformations

170

*/

171

protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]

172

}

173

```

174

175

### Optimizer System

176

177

The optimization system provides multi-phase query optimization using Apache Calcite's rule-based and cost-based optimization framework.

178

179

```scala { .api }

180

/**

181

* Base optimizer interface for query optimization

182

*/

183

trait Optimizer {

184

/**

185

* Optimizes relational algebra expressions using optimization rules

186

* @param roots Root nodes of the query plan to optimize

187

* @return Optimized execution node graph

188

*/

189

def optimize(roots: Seq[RelNode]): ExecNodeGraph

190

}

191

192

/**

193

* Stream-specific optimizer with common sub-graph based optimization

194

*/

195

class StreamCommonSubGraphBasedOptimizer(planner: PlannerBase) extends Optimizer {

196

/**

197

* Applies streaming-specific optimization rules and strategies

198

* Includes watermark propagation, mini-batch optimization, and state optimization

199

*/

200

def optimize(roots: Seq[RelNode]): ExecNodeGraph

201

}

202

203

/**

204

* Batch-specific optimizer with batch optimization strategies

205

*/

206

class BatchCommonSubGraphBasedOptimizer(planner: PlannerBase) extends Optimizer {

207

/**

208

* Applies batch-specific optimization rules and strategies

209

* Includes join reordering, aggregation pushdown, and partition pruning

210

*/

211

def optimize(roots: Seq[RelNode]): ExecNodeGraph

212

}

213

```

214

215

### Execution Node Graph

216

217

Represents the optimized execution plan as a graph of execution nodes, providing the bridge between logical planning and physical execution.

218

219

```java { .api }

220

/**

221

* Graph representation of execution nodes for query execution

222

*/

223

public class ExecNodeGraph {

224

/**

225

* Returns root nodes of the execution graph

226

* @return List of root execution nodes

227

*/

228

public List<ExecNode<?>> getRootNodes();

229

230

/**

231

* Accepts visitor for traversing the execution graph

232

* @param visitor Visitor implementation for graph traversal

233

*/

234

public void accept(ExecNodeVisitor visitor);

235

236

/**

237

* Returns all nodes in the execution graph

238

* @return Set of all execution nodes in the graph

239

*/

240

public Set<ExecNode<?>> getAllNodes();

241

}

242

243

/**

244

* Base interface for execution nodes in the graph

245

*/

246

public interface ExecNode<T> {

247

/**

248

* Translates this execution node to Flink transformation

249

* @param planner Planner context for translation

250

* @return Flink transformation representing this node

251

*/

252

Transformation<T> translateToPlan(Planner planner);

253

254

/**

255

* Returns input nodes of this execution node

256

* @return List of input execution nodes

257

*/

258

List<ExecNode<?>> getInputNodes();

259

260

/**

261

* Returns output type of this execution node

262

* @return Type information for the output

263

*/

264

TypeInformation<T> getOutputType();

265

}

266

```

267

268

## Optimization Phases

269

270

### Phase 1: Logical Planning

271

- SQL parsing and validation using Calcite

272

- Logical plan creation with RelNode representation

273

- Initial semantic validation and type checking

274

275

### Phase 2: Rule-Based Optimization

276

- Application of transformation rules (projection pushdown, filter pushdown, etc.)

277

- Join reordering and optimization

278

- Predicate simplification and constant folding

279

280

### Phase 3: Cost-Based Optimization

281

- Statistics-based cost estimation

282

- Join algorithm selection (hash join, sort-merge join, etc.)

283

- Access path selection and index usage

284

285

### Phase 4: Physical Planning

286

- Translation to execution nodes (ExecNode graph)

287

- Operator selection and configuration

288

- Resource allocation and parallelism planning

289

290

### Phase 5: Code Generation

291

- Dynamic code generation for optimized execution

292

- Fusion of operators where beneficial

293

- Memory management optimization

294

295

## Configuration and Tuning

296

297

Key configuration options for query planning:

298

299

```java

300

// Enable cost-based optimization

301

tableConfig.getConfiguration().setString("table.optimizer.cbo-enabled", "true");

302

303

// Set join reordering optimization

304

tableConfig.getConfiguration().setString("table.optimizer.join-reorder-enabled", "true");

305

306

// Configure mini-batch settings for streaming

307

tableConfig.getConfiguration().setString("table.exec.mini-batch.enabled", "true");

308

tableConfig.getConfiguration().setString("table.exec.mini-batch.allow-latency", "1s");

309

```