0
# Apache Flink Table Planner Blink
1
2
Apache Flink's Table API Planner Blink module provides a sophisticated SQL and Table API query planning and execution engine that bridges high-level declarative queries with Flink's streaming and batch runtime execution. Built primarily in Scala with extensive Java integration, it provides advanced query optimization capabilities including cost-based optimization, code generation for high-performance execution, and comprehensive support for both streaming and batch workloads.
3
4
## Package Information
5
6
- **Package Name**: flink-table-planner-blink_2.12
7
- **Package Type**: Maven
8
- **Group ID**: org.apache.flink
9
- **Version**: 1.13.6
10
- **Language**: Scala/Java
11
- **Installation**:
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-table-planner-blink_2.12</artifactId>
16
<version>1.13.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
// Factory for creating planner instances
24
import org.apache.flink.table.planner.delegation.BlinkPlannerFactory;
25
import org.apache.flink.table.planner.delegation.BlinkExecutorFactory;
26
27
// Main planner classes
28
import org.apache.flink.table.planner.delegation.StreamPlanner;
29
import org.apache.flink.table.planner.delegation.BatchPlanner;
30
```
31
32
## Basic Usage
33
34
```java
35
import org.apache.flink.table.api.*;
36
import org.apache.flink.table.planner.delegation.BlinkPlannerFactory;
37
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
38
39
// Create table environment with Blink planner for streaming
40
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
41
EnvironmentSettings settings = EnvironmentSettings.newInstance()
42
.useBlinkPlanner()
43
.inStreamingMode()
44
.build();
45
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
46
47
// Execute SQL queries
48
Table result = tableEnv.sqlQuery("SELECT * FROM source_table WHERE value > 100");
49
```
50
51
## Architecture
52
53
The Flink Table Planner Blink is built around several key components:
54
55
- **Planner Factories**: Service provider interface (SPI) entry points that create planner instances based on configuration
56
- **Planner Implementations**: StreamPlanner for streaming mode, BatchPlanner for batch mode
57
- **Query Optimization**: Multi-phase optimization pipeline with cost-based optimization using Apache Calcite
58
- **Code Generation**: Dynamic code generation for high-performance execution using Janino compiler
59
- **Execution Graph**: Translation from logical plans to Flink's execution graph representation
60
- **Apache Calcite Integration**: Deep integration with Calcite for SQL parsing, validation, and optimization
61
62
## Capabilities
63
64
### Planner Factory System
65
66
Service provider interface (SPI) factories for creating and configuring planner instances. These factories are the main entry points for integrating the Blink planner with Flink's Table API.
67
68
```java { .api }
69
public final class BlinkPlannerFactory implements PlannerFactory {
70
public Planner create(Map<String, String> properties, Executor executor,
71
TableConfig tableConfig, FunctionCatalog functionCatalog,
72
CatalogManager catalogManager);
73
}
74
75
public final class BlinkExecutorFactory implements ExecutorFactory {
76
public Executor create(Map<String, String> properties);
77
}
78
```
79
80
[Planner Factory System](./planner-factory.md)
81
82
### Query Planning and Optimization
83
84
Core query planning capabilities including logical plan creation, optimization rule application, and cost-based optimization. Supports both streaming and batch execution modes with different optimization strategies.
85
86
```scala { .api }
87
abstract class PlannerBase extends Planner {
88
def translateToRel(modifyOperation: ModifyOperation): RelNode
89
def optimize(relNodes: Seq[RelNode]): Seq[RelNode]
90
def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
91
}
92
93
class StreamPlanner extends PlannerBase
94
class BatchPlanner extends PlannerBase
95
```
96
97
[Query Planning](./query-planning.md)
98
99
### Code Generation
100
101
Dynamic code generation system that creates efficient Java code for query execution. Generates specialized code for calculations, aggregations, projections, and other operations to achieve high performance.
102
103
```scala { .api }
104
object CalcCodeGenerator {
105
def generateCalcOperator(
106
ctx: CodeGeneratorContext,
107
inputTransform: Transformation[RowData],
108
outputType: RowType,
109
projection: Seq[RexNode],
110
condition: Option[RexNode],
111
retainHeader: Boolean = false,
112
opName: String
113
): CodeGenOperatorFactory[RowData]
114
}
115
116
object ProjectionCodeGenerator {
117
def generateProjection(
118
ctx: CodeGeneratorContext,
119
name: String,
120
inType: RowType,
121
outType: RowType,
122
inputMapping: Array[Int],
123
outClass: Class[_ <: RowData] = classOf[BinaryRowData]
124
): GeneratedProjection
125
}
126
```
127
128
[Code Generation](./code-generation.md)
129
130
### Function Integration
131
132
Comprehensive support for user-defined functions (UDFs) including scalar functions, aggregate functions, and table functions. Provides utilities for function registration, validation, and SQL integration.
133
134
```scala { .api }
135
object UserDefinedFunctionUtils {
136
def checkForInstantiation(clazz: Class[_]): Unit
137
def checkNotSingleton(clazz: Class[_]): Unit
138
def getEvalMethodSignature(function: UserDefinedFunction, expectedTypes: Array[LogicalType]): Method
139
}
140
141
class AggSqlFunction extends SqlFunction
142
class ScalarSqlFunction extends SqlFunction
143
class TableSqlFunction extends SqlFunction
144
```
145
146
[Function Integration](./function-integration.md)
147
148
### Expression System
149
150
Expression handling for SQL expressions and Table API expressions, including validation, type inference, and code generation. Supports complex expressions with nested operations and custom functions.
151
152
```scala { .api }
153
// Expression conversion and validation
154
trait ExpressionConverter {
155
def convertToRexNode(expression: Expression): RexNode
156
def convertToExpression(rexNode: RexNode): Expression
157
}
158
```
159
160
[Expression System](./expression-system.md)
161
162
### Catalog Integration
163
164
Integration with Flink's catalog system for metadata management, table registration, and schema information. Supports multiple catalog types and schema evolution.
165
166
```java { .api }
167
// Catalog integration through standard Flink interfaces
168
// Implemented via CatalogManager and FunctionCatalog parameters
169
```
170
171
[Catalog Integration](./catalog-integration.md)
172
173
## Types
174
175
```java { .api }
176
// Core planner configuration
177
public class EnvironmentSettings {
178
public static final String STREAMING_MODE = "table.exec.mode.streaming";
179
public static final String CLASS_NAME = "table.exec.planner-factory";
180
}
181
182
// Execution graph representation
183
public class ExecNodeGraph {
184
public List<ExecNode<?>> getRootNodes();
185
public void accept(ExecNodeVisitor visitor);
186
}
187
188
// Transformation representation
189
public abstract class Transformation<T> {
190
public String getName();
191
public TypeInformation<T> getOutputType();
192
}
193
```