or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdcode-generation.mdexpression-system.mdfunction-integration.mdindex.mdplanner-factory.mdquery-planning.md

index.mddocs/

0

# Apache Flink Table Planner Blink

1

2

Apache Flink's Table API Planner Blink module provides a sophisticated SQL and Table API query planning and execution engine that bridges high-level declarative queries with Flink's streaming and batch runtime execution. Built primarily in Scala with extensive Java integration, it provides advanced query optimization capabilities including cost-based optimization, code generation for high-performance execution, and comprehensive support for both streaming and batch workloads.

3

4

## Package Information

5

6

- **Package Name**: flink-table-planner-blink_2.12

7

- **Package Type**: Maven

8

- **Group ID**: org.apache.flink

9

- **Version**: 1.13.6

10

- **Language**: Scala/Java

11

- **Installation**:

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-table-planner-blink_2.12</artifactId>

16

<version>1.13.6</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

// Factory for creating planner instances

24

import org.apache.flink.table.planner.delegation.BlinkPlannerFactory;

25

import org.apache.flink.table.planner.delegation.BlinkExecutorFactory;

26

27

// Main planner classes

28

import org.apache.flink.table.planner.delegation.StreamPlanner;

29

import org.apache.flink.table.planner.delegation.BatchPlanner;

30

```

31

32

## Basic Usage

33

34

```java

35

import org.apache.flink.table.api.*;

36

import org.apache.flink.table.planner.delegation.BlinkPlannerFactory;

37

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

38

39

// Create table environment with Blink planner for streaming

40

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

41

EnvironmentSettings settings = EnvironmentSettings.newInstance()

42

.useBlinkPlanner()

43

.inStreamingMode()

44

.build();

45

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

46

47

// Execute SQL queries

48

Table result = tableEnv.sqlQuery("SELECT * FROM source_table WHERE value > 100");

49

```

50

51

## Architecture

52

53

The Flink Table Planner Blink is built around several key components:

54

55

- **Planner Factories**: Service provider interface (SPI) entry points that create planner instances based on configuration

56

- **Planner Implementations**: StreamPlanner for streaming mode, BatchPlanner for batch mode

57

- **Query Optimization**: Multi-phase optimization pipeline with cost-based optimization using Apache Calcite

58

- **Code Generation**: Dynamic code generation for high-performance execution using Janino compiler

59

- **Execution Graph**: Translation from logical plans to Flink's execution graph representation

60

- **Apache Calcite Integration**: Deep integration with Calcite for SQL parsing, validation, and optimization

61

62

## Capabilities

63

64

### Planner Factory System

65

66

Service provider interface (SPI) factories for creating and configuring planner instances. These factories are the main entry points for integrating the Blink planner with Flink's Table API.

67

68

```java { .api }

69

public final class BlinkPlannerFactory implements PlannerFactory {

70

public Planner create(Map<String, String> properties, Executor executor,

71

TableConfig tableConfig, FunctionCatalog functionCatalog,

72

CatalogManager catalogManager);

73

}

74

75

public final class BlinkExecutorFactory implements ExecutorFactory {

76

public Executor create(Map<String, String> properties);

77

}

78

```

79

80

[Planner Factory System](./planner-factory.md)

81

82

### Query Planning and Optimization

83

84

Core query planning capabilities including logical plan creation, optimization rule application, and cost-based optimization. Supports both streaming and batch execution modes with different optimization strategies.

85

86

```scala { .api }

87

abstract class PlannerBase extends Planner {

88

def translateToRel(modifyOperation: ModifyOperation): RelNode

89

def optimize(relNodes: Seq[RelNode]): Seq[RelNode]

90

def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]

91

}

92

93

class StreamPlanner extends PlannerBase

94

class BatchPlanner extends PlannerBase

95

```

96

97

[Query Planning](./query-planning.md)

98

99

### Code Generation

100

101

Dynamic code generation system that creates efficient Java code for query execution. Generates specialized code for calculations, aggregations, projections, and other operations to achieve high performance.

102

103

```scala { .api }

104

object CalcCodeGenerator {

105

def generateCalcOperator(

106

ctx: CodeGeneratorContext,

107

inputTransform: Transformation[RowData],

108

outputType: RowType,

109

projection: Seq[RexNode],

110

condition: Option[RexNode],

111

retainHeader: Boolean = false,

112

opName: String

113

): CodeGenOperatorFactory[RowData]

114

}

115

116

object ProjectionCodeGenerator {

117

def generateProjection(

118

ctx: CodeGeneratorContext,

119

name: String,

120

inType: RowType,

121

outType: RowType,

122

inputMapping: Array[Int],

123

outClass: Class[_ <: RowData] = classOf[BinaryRowData]

124

): GeneratedProjection

125

}

126

```

127

128

[Code Generation](./code-generation.md)

129

130

### Function Integration

131

132

Comprehensive support for user-defined functions (UDFs) including scalar functions, aggregate functions, and table functions. Provides utilities for function registration, validation, and SQL integration.

133

134

```scala { .api }

135

object UserDefinedFunctionUtils {

136

def checkForInstantiation(clazz: Class[_]): Unit

137

def checkNotSingleton(clazz: Class[_]): Unit

138

def getEvalMethodSignature(function: UserDefinedFunction, expectedTypes: Array[LogicalType]): Method

139

}

140

141

class AggSqlFunction extends SqlFunction

142

class ScalarSqlFunction extends SqlFunction

143

class TableSqlFunction extends SqlFunction

144

```

145

146

[Function Integration](./function-integration.md)

147

148

### Expression System

149

150

Expression handling for SQL expressions and Table API expressions, including validation, type inference, and code generation. Supports complex expressions with nested operations and custom functions.

151

152

```scala { .api }

153

// Expression conversion and validation

154

trait ExpressionConverter {

155

def convertToRexNode(expression: Expression): RexNode

156

def convertToExpression(rexNode: RexNode): Expression

157

}

158

```

159

160

[Expression System](./expression-system.md)

161

162

### Catalog Integration

163

164

Integration with Flink's catalog system for metadata management, table registration, and schema information. Supports multiple catalog types and schema evolution.

165

166

```java { .api }

167

// Catalog integration through standard Flink interfaces

168

// Implemented via CatalogManager and FunctionCatalog parameters

169

```

170

171

[Catalog Integration](./catalog-integration.md)

172

173

## Types

174

175

```java { .api }

176

// Core planner configuration

177

public class EnvironmentSettings {

178

public static final String STREAMING_MODE = "table.exec.mode.streaming";

179

public static final String CLASS_NAME = "table.exec.planner-factory";

180

}

181

182

// Execution graph representation

183

public class ExecNodeGraph {

184

public List<ExecNode<?>> getRootNodes();

185

public void accept(ExecNodeVisitor visitor);

186

}

187

188

// Transformation representation

189

public abstract class Transformation<T> {

190

public String getName();

191

public TypeInformation<T> getOutputType();

192

}

193

```