or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md

index.mddocs/

0

# Flink Table Planner

1

2

Flink Table Planner connects Table/SQL API and runtime, responsible for translating and optimizing table programs into Flink pipelines. This module serves as the critical bridge between Flink's declarative Table/SQL API and the runtime execution engine, leveraging Apache Calcite for sophisticated query planning, optimization, and code generation capabilities for both streaming and batch processing workloads.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-table-planner_2.11

7

- **Package Type**: maven

8

- **Language**: Scala/Java

9

- **Version**: 1.14.6

10

- **Installation**: Add dependency to your Maven `pom.xml`:

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-table-planner_2.11</artifactId>

16

<version>1.14.6</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```scala

23

import org.apache.flink.table.planner.delegation.{DefaultPlannerFactory, DefaultParserFactory}

24

import org.apache.flink.table.planner.calcite.CalciteConfig

25

import org.apache.flink.table.factories.{PlannerFactory, ParserFactory}

26

```

27

28

```java

29

import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;

30

import org.apache.flink.table.planner.delegation.DefaultParserFactory;

31

import org.apache.flink.table.planner.calcite.CalciteConfig;

32

```

33

34

## Basic Usage

35

36

```java

37

import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;

38

import org.apache.flink.table.planner.calcite.CalciteConfig;

39

import org.apache.flink.table.api.*;

40

41

// Create planner factory

42

PlannerFactory plannerFactory = new DefaultPlannerFactory();

43

44

// Create planner with custom configuration

45

CalciteConfig calciteConfig = CalciteConfig.createBuilder()

46

.replaceSqlParserConfig(customParserConfig)

47

.build();

48

49

// Use with TableEnvironment (typically handled automatically)

50

TableEnvironment tableEnv = TableEnvironment.create(

51

EnvironmentSettings.newInstance()

52

.useBlinkPlanner()

53

.build()

54

);

55

```

56

57

## Architecture

58

59

The flink-table-planner module consists of several key architectural components:

60

61

- **Factory Layer**: Service provider implementations for creating planners, parsers, and executors

62

- **Planning Layer**: Core planner implementations for streaming and batch processing modes

63

- **Configuration Layer**: Calcite configuration and planner configuration management

64

- **Connector Layer**: Integration interfaces and utilities for table sources and sinks

65

- **Execution Layer**: ExecNode hierarchy and transformation translators

66

- **Type System**: Bridging between Flink and Calcite type systems

67

- **Expression System**: RexNode expressions and type inference utilities

68

69

## Capabilities

70

71

### Factory Classes and Service Providers

72

73

Entry points for creating planner components through the Service Provider Interface (SPI).

74

75

```java { .api }

76

public final class DefaultPlannerFactory implements PlannerFactory {

77

public String factoryIdentifier();

78

public Planner create(Context context);

79

}

80

81

public class DefaultParserFactory implements ParserFactory {

82

public String factoryIdentifier();

83

public Parser create(Context context);

84

}

85

86

public final class DefaultExecutorFactory implements ExecutorFactory {

87

public Executor create(Context context);

88

}

89

```

90

91

[Factory Classes and Service Providers](./factory-classes.md)

92

93

### Core Planning Components

94

95

Core planner implementations for streaming and batch processing modes with SQL parsing capabilities.

96

97

```scala { .api }

98

abstract class PlannerBase extends Planner {

99

def getTableEnvironment: TableEnvironment

100

def getFlinkRelBuilder: FlinkRelBuilder

101

def getTypeFactory: FlinkTypeFactory

102

}

103

104

class StreamPlanner extends PlannerBase

105

class BatchPlanner extends PlannerBase

106

```

107

108

```java { .api }

109

public class ParserImpl implements Parser {

110

public List<Operation> parse(String statement);

111

public UnresolvedIdentifier parseIdentifier(String identifier);

112

public ResolvedExpression parseSqlExpression(String sqlExpression, RowType inputRowType, LogicalType outputType);

113

}

114

```

115

116

[Core Planning Components](./core-planning.md)

117

118

### Configuration and Builders

119

120

Configuration classes for customizing Calcite behavior and planner settings.

121

122

```scala { .api }

123

trait CalciteConfig {

124

def getSqlParserConfig: Option[SqlParser.Config]

125

def getSqlOperatorTable: Option[SqlOperatorTable]

126

def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]

127

def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]

128

}

129

130

object CalciteConfig {

131

def createBuilder(): CalciteConfigBuilder

132

}

133

134

class CalciteConfigBuilder {

135

def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder

136

def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder

137

def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder

138

def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder

139

def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder

140

def build(): CalciteConfig

141

}

142

```

143

144

[Configuration and Builders](./configuration.md)

145

146

### Connector Integration

147

148

Interfaces and utilities for integrating custom table sources and sinks with the planner.

149

150

```java { .api }

151

public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider {

152

Transformation<RowData> createTransformation(Context context);

153

}

154

155

public interface TransformationSinkProvider extends DynamicTableSink.SinkRuntimeProvider {

156

Transformation<?> createTransformation(Context context);

157

}

158

159

public final class DynamicSourceUtils {

160

public static RelNode convertDataStreamToRel(StreamTableEnvironment tableEnv, DataStream<RowData> dataStream, List<String> fieldNames);

161

public static RelNode convertSourceToRel(FlinkOptimizeContext optimizeContext, RelOptTable relOptTable, DynamicTableSource tableSource, FlinkStatistic statistic);

162

}

163

```

164

165

[Connector Integration](./connector-integration.md)

166

167

### Execution Nodes

168

169

ExecNode hierarchy and translator interfaces for query execution plan generation.

170

171

```java { .api }

172

public interface ExecNode<T> extends ExecNodeTranslator<T> {

173

int getId();

174

String getDescription();

175

LogicalType getOutputType();

176

List<InputProperty> getInputProperties();

177

List<ExecEdge> getInputEdges();

178

void setInputEdges(List<ExecEdge> inputEdges);

179

void replaceInputEdge(int index, ExecEdge newInputEdge);

180

void accept(ExecNodeVisitor visitor);

181

}

182

183

public interface StreamExecNode<T> extends ExecNode<T> {}

184

185

public interface BatchExecNode<T> extends ExecNode<T> {}

186

```

187

188

[Execution Nodes](./execution-nodes.md)

189

190

### Type System and Expressions

191

192

Type system bridging between Flink and Calcite with expression handling utilities.

193

194

```java { .api }

195

public final class RexNodeExpression implements ResolvedExpression {

196

public RexNode getRexNode();

197

public LogicalType getOutputDataType();

198

public List<ResolvedExpression> getResolvedChildren();

199

public <R> R accept(ExpressionVisitor<R> visitor);

200

}

201

202

public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil {

203

public static final PlannerTypeInferenceUtilImpl INSTANCE;

204

public TypeInference runTypeInference(CallExpression callExpression, CallContext callContext);

205

}

206

```

207

208

[Type System and Expressions](./type-system.md)

209

210

### Enums and Constants

211

212

Public enums for execution strategies, traits, and configuration constants.

213

214

```java { .api }

215

public enum AggregatePhaseStrategy {

216

AUTO, ONE_PHASE, TWO_PHASE

217

}

218

219

public enum UpdateKind {

220

ONLY_UPDATE_AFTER, BEFORE_AND_AFTER

221

}

222

223

public enum ModifyKind {

224

INSERT, UPDATE, DELETE

225

}

226

227

public enum MiniBatchMode {

228

ProcTime, RowTime, None

229

}

230

```

231

232

[Enums and Constants](./enums-constants.md)

233

234

## Integration Points

235

236

### Service Provider Interface (SPI)

237

238

The module registers factories via Java SPI in `META-INF/services/org.apache.flink.table.factories.Factory`:

239

- `DefaultPlannerFactory` for planner creation

240

- `DefaultParserFactory` for parser creation

241

- `DefaultExecutorFactory` for executor creation

242

243

### Calcite Integration

244

245

Extensive use of Apache Calcite for:

246

- SQL parsing with customizable parser configurations

247

- Query optimization through rule-based and cost-based optimization

248

- Relational algebra representation and transformation

249

250

### Type System Bridge

251

252

Seamless integration between Flink's type system and Calcite's type system:

253

- `FlinkTypeFactory` for type creation and conversion

254

- `RexNodeExpression` for expression bridging

255

- Type inference utilities for function calls and expressions