or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md

core-planning.mddocs/

0

# Core Planning Components

1

2

Core planner implementations handle the translation and optimization of table programs into Flink execution plans. The planning system supports both streaming and batch processing modes with sophisticated SQL parsing capabilities.

3

4

## Package Information

5

6

```scala

7

import org.apache.flink.table.planner.delegation.{PlannerBase, StreamPlanner, BatchPlanner}

8

import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory}

9

import org.apache.flink.table.api.TableEnvironment

10

```

11

12

```java

13

import org.apache.flink.table.planner.delegation.ParserImpl;

14

import org.apache.flink.table.delegation.Parser;

15

import org.apache.flink.table.operations.Operation;

16

import org.apache.flink.table.expressions.ResolvedExpression;

17

import org.apache.flink.table.catalog.UnresolvedIdentifier;

18

```

19

20

## Capabilities

21

22

### PlannerBase (Abstract Base Class)

23

24

The foundational abstract class that provides common functionality for both streaming and batch planners.

25

26

```scala { .api }

27

abstract class PlannerBase extends Planner {

28

def getTableEnvironment: TableEnvironment

29

def getFlinkRelBuilder: FlinkRelBuilder

30

def getTypeFactory: FlinkTypeFactory

31

def getExecEnv: StreamExecutionEnvironment

32

def getTableConfig: TableConfig

33

def getFlinkContext: FlinkContext

34

def getCatalogManager: CatalogManager

35

def getFunctionCatalog: FunctionCatalog

36

37

// Core planning methods

38

def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]

39

def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String

40

def getCompletionHints(statement: String, position: Int): Array[String]

41

def getParser: Parser

42

}

43

```

44

45

**Key Responsibilities:**

46

- Manages the planner context and configuration (`PlannerContext`, `PlannerConfiguration`)

47

- Provides access to Calcite components (`FlinkRelBuilder`, `FlinkTypeFactory`)

48

- Handles query optimization and execution plan generation

49

- Integrates with Flink's catalog system and function registry

50

- Manages the lifecycle of planning operations

51

52

**Usage Example:**

53

54

```scala

55

// PlannerBase is typically accessed through concrete implementations

56

val planner: PlannerBase = // obtained from factory

57

58

// Access core components

59

val tableEnv = planner.getTableEnvironment

60

val relBuilder = planner.getFlinkRelBuilder

61

val typeFactory = planner.getTypeFactory

62

63

// Translate operations to transformations

64

val operations: util.List[ModifyOperation] = // parsed operations

65

val transformations = planner.translate(operations)

66

67

// Explain query plans

68

val explanation = planner.explain(operations, ExplainDetail.ESTIMATED_COST)

69

```

70

71

### StreamPlanner

72

73

Concrete planner implementation specialized for streaming workloads.

74

75

```scala { .api }

76

class StreamPlanner(

77

executor: Executor,

78

config: TableConfig,

79

functionCatalog: FunctionCatalog,

80

catalogManager: CatalogManager,

81

isStreamingMode: Boolean

82

) extends PlannerBase {

83

84

// Streaming-specific optimization and translation

85

override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]

86

87

// Streaming-specific explain functionality

88

override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String

89

}

90

```

91

92

**Key Features:**

93

- Optimized for continuous stream processing

94

- Supports streaming-specific operators (windowing, watermarks, state management)

95

- Handles incremental computation and result updates

96

- Manages streaming execution semantics (event-time processing, late data handling)

97

98

**Usage Example:**

99

100

```scala

101

// StreamPlanner is created through DefaultPlannerFactory for streaming mode

102

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

103

import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

104

105

val env = StreamExecutionEnvironment.getExecutionEnvironment

106

val tableEnv = StreamTableEnvironment.create(env)

107

108

// The planner is internal to the table environment

109

val streamPlanner = // accessed internally

110

111

// Process streaming operations

112

val operations = // parsed streaming operations

113

val transformations = streamPlanner.translate(operations)

114

```

115

116

### BatchPlanner

117

118

Concrete planner implementation specialized for batch workloads.

119

120

```scala { .api }

121

class BatchPlanner(

122

executor: Executor,

123

config: TableConfig,

124

functionCatalog: FunctionCatalog,

125

catalogManager: CatalogManager

126

) extends PlannerBase {

127

128

// Batch-specific optimization and translation

129

override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]

130

131

// Batch-specific explain functionality

132

override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String

133

}

134

```

135

136

**Key Features:**

137

- Optimized for finite dataset processing

138

- Supports batch-specific optimizations (join reordering, predicate pushdown)

139

- Handles bounded data processing patterns

140

- Manages batch execution semantics and resource allocation

141

142

**Usage Example:**

143

144

```scala

145

// BatchPlanner is created through DefaultPlannerFactory for batch mode

146

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

147

148

val tableEnv = TableEnvironment.create(

149

EnvironmentSettings.newInstance()

150

.inBatchMode()

151

.build()

152

)

153

154

// The planner is internal to the table environment

155

val batchPlanner = // accessed internally

156

157

// Process batch operations

158

val operations = // parsed batch operations

159

val transformations = batchPlanner.translate(operations)

160

```

161

162

### ParserImpl

163

164

Default SQL parser implementation using Apache Calcite for parsing SQL statements, identifiers, and expressions.

165

166

```java { .api }

167

public class ParserImpl implements Parser {

168

169

// Constructor

170

public ParserImpl(

171

CatalogManager catalogManager,

172

Supplier<FlinkPlannerImpl> validatorSupplier,

173

Supplier<CalciteParser> calciteParserSupplier,

174

RexNodeToExpressionConverter rexNodeToExpressionConverter

175

);

176

177

// Core parsing methods

178

public List<Operation> parse(String statement);

179

public UnresolvedIdentifier parseIdentifier(String identifier);

180

public ResolvedExpression parseSqlExpression(

181

String sqlExpression,

182

RowType inputRowType,

183

LogicalType outputType

184

);

185

186

// Completion and validation

187

public String[] getCompletionHints(String statement, int position);

188

}

189

```

190

191

**Key Methods:**

192

193

- **`parse(String statement)`**: Parses SQL statements into executable operations

194

- Handles DDL (CREATE TABLE, ALTER TABLE, etc.)

195

- Handles DML (SELECT, INSERT, UPDATE, DELETE)

196

- Handles utility statements (SHOW TABLES, DESCRIBE, etc.)

197

- Returns list of `Operation` objects for execution

198

199

- **`parseIdentifier(String identifier)`**: Parses table/column identifiers

200

- Supports qualified names (catalog.database.table)

201

- Handles quoted and unquoted identifiers

202

- Returns `UnresolvedIdentifier` for catalog resolution

203

204

- **`parseSqlExpression(...)`**: Parses SQL expressions within contexts

205

- Used for computed columns, filters, projections

206

- Requires input row type for context

207

- Returns resolved expressions with type information

208

209

**Usage Example:**

210

211

```java

212

import org.apache.flink.table.planner.delegation.ParserImpl;

213

import org.apache.flink.table.operations.Operation;

214

import org.apache.flink.table.catalog.UnresolvedIdentifier;

215

216

// Parser is typically obtained from PlannerBase

217

Parser parser = planner.getParser();

218

219

// Parse SQL statements

220

List<Operation> operations = parser.parse("CREATE TABLE my_table (id INT, name STRING)");

221

List<Operation> queryOps = parser.parse("SELECT id, UPPER(name) FROM my_table WHERE id > 100");

222

223

// Parse identifiers

224

UnresolvedIdentifier tableId = parser.parseIdentifier("catalog.database.table");

225

UnresolvedIdentifier simpleId = parser.parseIdentifier("my_table");

226

227

// Parse expressions

228

RowType inputType = // define input row type

229

LogicalType outputType = DataTypes.STRING().getLogicalType();

230

ResolvedExpression expr = parser.parseSqlExpression(

231

"UPPER(name)",

232

inputType,

233

outputType

234

);

235

236

// Get completion hints for IDE integration

237

String[] hints = parser.getCompletionHints("SELECT * FROM my_ta", 18);

238

```

239

240

## Planning Process Flow

241

242

The core planning process follows these stages:

243

244

1. **Parsing**: `ParserImpl` converts SQL text into `Operation` objects

245

2. **Validation**: Operations are validated against catalog and type system

246

3. **Optimization**: `PlannerBase` applies Calcite optimization rules

247

4. **Translation**: Optimized plans are translated to Flink `Transformation` objects

248

5. **Execution**: `Executor` converts transformations to executable job graphs

249

250

```java

251

// Typical planning flow

252

Parser parser = planner.getParser();

253

254

// 1. Parse SQL to operations

255

List<Operation> operations = parser.parse(sqlStatement);

256

257

// 2. Translate to transformations (includes validation & optimization)

258

List<Transformation<?>> transformations = planner.translate(operations);

259

260

// 3. Execute transformations

261

JobExecutionResult result = executor.execute(transformations);

262

```

263

264

## Optimization Integration

265

266

The planner integrates deeply with Apache Calcite's optimization framework:

267

268

- **Rule-Based Optimization**: Applies transformation rules to improve query plans

269

- **Cost-Based Optimization**: Uses statistics to choose optimal join orders and algorithms

270

- **Custom Rules**: Flink-specific optimization rules for streaming and batch scenarios

271

- **Program Chaining**: Supports custom optimization programs via `CalciteConfig`

272

273

## Error Handling

274

275

The planning components provide comprehensive error handling:

276

277

```java

278

try {

279

List<Operation> operations = parser.parse(sql);

280

List<Transformation<?>> transformations = planner.translate(operations);

281

} catch (SqlParseException e) {

282

// Handle SQL syntax errors

283

} catch (ValidationException e) {

284

// Handle semantic validation errors

285

} catch (TableException e) {

286

// Handle table-specific errors

287

}

288

```

289

290

Common error scenarios:

291

- **Parse Errors**: Invalid SQL syntax, unrecognized keywords

292

- **Validation Errors**: Unknown tables/columns, type mismatches, unsupported operations

293

- **Planning Errors**: Optimization failures, unsupported query patterns

294

- **Resource Errors**: Insufficient memory, configuration issues