0
# Apache Flink Table Planner
1
2
Apache Flink Table Planner is a core component that translates and optimizes Table API and SQL programs into Flink execution pipelines. This module serves as the bridge between high-level table operations and the underlying Flink runtime engine, leveraging Apache Calcite for advanced query optimization.
3
4
## Package Information
5
6
- **Package Name**: flink-table-planner_2.12
7
- **Package Type**: Maven (JAR)
8
- **Language**: Java/Scala
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-table-planner_2.12
11
- **Installation**: Add to Maven/Gradle dependencies
12
13
## Maven Dependency
14
15
```xml
16
<dependency>
17
<groupId>org.apache.flink</groupId>
18
<artifactId>flink-table-planner_2.12</artifactId>
19
<version>2.1.0</version>
20
</dependency>
21
```
22
23
## Gradle Dependency
24
25
```kotlin
26
implementation("org.apache.flink:flink-table-planner_2.12:2.1.0")
27
```
28
29
## Core Imports
30
31
```java
32
// Lineage API (Primary public API)
33
import org.apache.flink.table.planner.lineage.TableSourceLineageVertex;
34
import org.apache.flink.table.planner.lineage.TableSinkLineageVertex;
35
import org.apache.flink.table.planner.lineage.TableLineageDataset;
36
import org.apache.flink.table.operations.ModifyType;
37
38
// SQL Functions (secondary API)
39
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
40
```
41
42
## Architecture Overview
43
44
**Important:** This module is primarily designed for internal use by the Flink Table API framework. The vast majority of classes (~99%) are marked with `@Internal` annotations and are not part of the stable public API contract.
45
46
The module handles:
47
- SQL parsing and validation using Apache Calcite
48
- Query optimization with cost-based and rule-based transformations
49
- Physical execution plan generation
50
- Code generation for optimized operators
51
- Integration between Table API and Flink DataStream runtime
52
53
## Usage Guidance
54
55
### For End Users
56
**Do NOT directly depend on this module.** Instead:
57
- Use Table API from `flink-table-api-java` or `flink-table-api-scala`
58
- Access SQL functionality through `TableEnvironment`
59
- Implement custom functions using APIs in `flink-table-common`
60
61
### For Connector Developers
62
- Implement factory interfaces from `flink-table-common`
63
- Use `flink-table-api-*` modules for development and testing
64
- Avoid direct dependencies on planner internals
65
66
### For Framework Developers
67
- May interact with internal APIs (at your own risk of breaking changes)
68
- Prefer extension points in other Flink Table modules when possible
69
- Must handle internal API changes across Flink versions
70
71
## Capabilities
72
73
### Data Lineage Tracking
74
75
The primary public API provides data lineage tracking capabilities for table operations.
76
77
#### Table Source Lineage
78
79
Represents source vertices in the data lineage graph.
80
81
```java { .api }
82
public interface TableSourceLineageVertex extends SourceLineageVertex {
83
// Inherits all methods from SourceLineageVertex
84
}
85
```
86
87
#### Table Sink Lineage
88
89
Represents sink vertices in the data lineage graph with modification type information.
90
91
```java { .api }
92
public interface TableSinkLineageVertex extends LineageVertex {
93
/**
94
* Returns the modification type for this sink operation.
95
*
96
* @return the modify type (INSERT, UPDATE, DELETE, etc.)
97
*/
98
ModifyType modifyType();
99
}
100
```
101
102
**Usage Example:**
103
104
```java
105
import org.apache.flink.table.planner.lineage.TableSinkLineageVertex;
106
import org.apache.flink.table.planner.lineage.TableSourceLineageVertex;
107
import org.apache.flink.table.operations.ModifyType;
108
109
// Access lineage information during table operation processing
110
public void processLineage(TableSinkLineageVertex sinkVertex) {
111
ModifyType modifyType = sinkVertex.modifyType();
112
113
switch (modifyType) {
114
case INSERT:
115
// Handle insert operation lineage
116
break;
117
case UPDATE:
118
// Handle update operation lineage
119
break;
120
case DELETE:
121
// Handle delete operation lineage
122
break;
123
}
124
}
125
```
126
127
#### Table Lineage Dataset
128
129
Provides catalog context and table information for lineage tracking.
130
131
```java { .api }
132
/**
133
* Basic table lineage dataset which has catalog context and table in it.
134
* Note: This interface lacks @PublicEvolving annotation in the source code
135
* but is considered part of the public lineage API.
136
*/
137
public interface TableLineageDataset extends LineageDataset {
138
/**
139
* Returns the catalog context for this table.
140
*
141
* @return the catalog context
142
*/
143
CatalogContext catalogContext();
144
145
/**
146
* Returns the table reference.
147
*
148
* @return the catalog base table
149
*/
150
CatalogBaseTable table();
151
152
/**
153
* Returns the object path (database and table name).
154
*
155
* @return the object path containing database and table identifiers
156
*/
157
ObjectPath objectPath();
158
}
159
```
160
161
**Usage Example:**
162
163
```java
164
import org.apache.flink.table.planner.lineage.TableLineageDataset;
165
import org.apache.flink.table.catalog.ObjectPath;
166
import org.apache.flink.table.catalog.listener.CatalogContext;
167
168
// Extract table information from lineage dataset
169
public void analyzeTableLineage(TableLineageDataset dataset) {
170
CatalogContext context = dataset.catalogContext();
171
ObjectPath path = dataset.objectPath();
172
173
String databaseName = path.getDatabaseName();
174
String tableName = path.getObjectName();
175
176
System.out.println("Lineage for table: " +
177
context.getName() + "." + databaseName + "." + tableName);
178
}
179
```
180
181
### SQL Operator Functions
182
183
Provides access to Flink-specific SQL functions and operators.
184
185
```java { .api }
186
public class FlinkSqlOperatorTable {
187
/**
188
* Returns the Flink SQL operator table instance.
189
*
190
* @param isBatchMode whether to return batch-mode or streaming-mode operators
191
* @return the operator table instance
192
*/
193
public static FlinkSqlOperatorTable instance(boolean isBatchMode);
194
195
/**
196
* Returns dynamic functions available for the specified execution mode.
197
*
198
* @param isBatchMode whether to return batch-mode or streaming-mode functions
199
* @return list of SQL functions
200
*/
201
public static List<SqlFunction> dynamicFunctions(boolean isBatchMode);
202
}
203
```
204
205
**Usage Example:**
206
207
```java
208
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
209
import org.apache.calcite.sql.SqlFunction;
210
import java.util.List;
211
212
// Access Flink SQL operators for custom query processing
213
FlinkSqlOperatorTable batchOperators = FlinkSqlOperatorTable.instance(true);
214
FlinkSqlOperatorTable streamOperators = FlinkSqlOperatorTable.instance(false);
215
216
// Get dynamic functions for streaming mode
217
List<SqlFunction> streamingFunctions = FlinkSqlOperatorTable.dynamicFunctions(false);
218
```
219
220
## Types
221
222
### Lineage Types
223
224
```java { .api }
225
// Base lineage interfaces (from other modules)
226
interface LineageVertex {
227
// Base lineage vertex functionality
228
}
229
230
interface SourceLineageVertex extends LineageVertex {
231
// Source-specific lineage functionality
232
}
233
234
interface LineageDataset {
235
// Base dataset lineage functionality
236
}
237
238
// Modification types for sink operations (from org.apache.flink.table.operations)
239
enum ModifyType {
240
INSERT,
241
UPDATE,
242
DELETE,
243
// Additional modification types as defined in Flink
244
}
245
```
246
247
### Catalog Types
248
249
```java { .api }
250
// Catalog context (from flink-table-api-java - catalog listener package)
251
interface CatalogContext {
252
/**
253
* Returns the name of the catalog.
254
*
255
* @return the catalog name
256
*/
257
String getName();
258
// Additional catalog context methods
259
}
260
261
// Object path for database.table identification
262
class ObjectPath {
263
/**
264
* Returns the database name.
265
*
266
* @return the database name
267
*/
268
String getDatabaseName();
269
270
/**
271
* Returns the table/object name.
272
*
273
* @return the object name
274
*/
275
String getObjectName();
276
// Additional path methods
277
}
278
279
// Base table interface
280
interface CatalogBaseTable {
281
// Table metadata and schema information
282
}
283
```
284
285
## API Stability Guarantees
286
287
### @PublicEvolving APIs
288
- **TableSourceLineageVertex, TableSinkLineageVertex**: May change in minor releases but with deprecation warnings
289
- Changes will be communicated through release notes
290
- Backward compatibility maintained where possible
291
292
### Unmarked APIs
293
- **FlinkSqlOperatorTable**: No explicit stability guarantees, may change without notice in any release
294
- **TableLineageDataset**: Lacks @PublicEvolving annotation but is functionally part of the public lineage API
295
- Use with caution and test thoroughly across Flink version upgrades
296
297
### @Internal APIs
298
- All other classes in this module are internal implementation details
299
- Will change without notice and should not be used directly
300
- No API compatibility guarantees
301
302
## Error Handling
303
304
When working with the public APIs, be prepared for:
305
306
- **Catalog exceptions** when accessing table metadata
307
- **Runtime exceptions** during lineage processing
308
- **Calcite-related exceptions** when working with SQL operators
309
310
Typical error handling pattern:
311
312
```java
313
try {
314
TableLineageDataset dataset = // ... obtain dataset
315
ObjectPath path = dataset.objectPath();
316
// Process lineage information
317
} catch (Exception e) {
318
// Handle catalog or runtime exceptions
319
logger.error("Failed to process table lineage", e);
320
}
321
```
322
323
## Service Provider Interface
324
325
This module automatically registers three factory implementations through Java's ServiceLoader mechanism:
326
327
- `DefaultExecutorFactory` - Creates execution runtime bridges
328
- `DefaultParserFactory` - Creates SQL parsers
329
- `DefaultPlannerFactory` - Creates query planners
330
331
These factories are auto-discovered by the Flink Table API framework and should not be instantiated directly by user code.
332
333
## Dependencies
334
335
Key module dependencies:
336
- `flink-table-api-java` - Core Table API interfaces
337
- `flink-table-common` - Common table utilities and types
338
- `flink-streaming-java` - Streaming runtime integration
339
- `calcite-core` - Query optimization engine (shaded)
340
- Various Flink runtime modules
341
342
## Important Notes
343
344
1. **Internal Module**: This is primarily an internal implementation module for the Flink Table API
345
2. **Limited Public API**: Only 3 interfaces and 1 class constitute the stable public API
346
3. **Calcite Integration**: Heavily relies on Apache Calcite (shaded to avoid conflicts)
347
4. **Code Generation**: Performs extensive runtime code generation for optimized execution
348
5. **Version Compatibility**: Internal APIs change frequently; stick to public APIs for stability
349
350
For comprehensive table processing capabilities, use the higher-level Table API modules rather than depending directly on this planner implementation.