Apache Flink Table Planner - translates and optimizes table programs into Flink pipelines
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-planner_2-12@2.1.0Apache Flink Table Planner is a core component that translates and optimizes Table API and SQL programs into Flink execution pipelines. This module serves as the bridge between high-level table operations and the underlying Flink runtime engine, leveraging Apache Calcite for advanced query optimization.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>2.1.0</version>
</dependency>implementation("org.apache.flink:flink-table-planner_2.12:2.1.0")// Lineage API (Primary public API)
import org.apache.flink.table.planner.lineage.TableSourceLineageVertex;
import org.apache.flink.table.planner.lineage.TableSinkLineageVertex;
import org.apache.flink.table.planner.lineage.TableLineageDataset;
import org.apache.flink.table.operations.ModifyType;
// SQL Functions (secondary API)
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;Important: This module is primarily designed for internal use by the Flink Table API framework. The vast majority of classes (~99%) are marked with @Internal annotations and are not part of the stable public API contract.
The module handles:
Do NOT directly depend on this module. Instead:
flink-table-api-java or flink-table-api-scalaTableEnvironmentflink-table-commonflink-table-commonflink-table-api-* modules for development and testingThe primary public API provides data lineage tracking capabilities for table operations.
Represents source vertices in the data lineage graph.
public interface TableSourceLineageVertex extends SourceLineageVertex {
// Inherits all methods from SourceLineageVertex
}Represents sink vertices in the data lineage graph with modification type information.
public interface TableSinkLineageVertex extends LineageVertex {
/**
* Returns the modification type for this sink operation.
*
* @return the modify type (INSERT, UPDATE, DELETE, etc.)
*/
ModifyType modifyType();
}Usage Example:
import org.apache.flink.table.planner.lineage.TableSinkLineageVertex;
import org.apache.flink.table.planner.lineage.TableSourceLineageVertex;
import org.apache.flink.table.operations.ModifyType;
// Access lineage information during table operation processing
public void processLineage(TableSinkLineageVertex sinkVertex) {
ModifyType modifyType = sinkVertex.modifyType();
switch (modifyType) {
case INSERT:
// Handle insert operation lineage
break;
case UPDATE:
// Handle update operation lineage
break;
case DELETE:
// Handle delete operation lineage
break;
}
}Provides catalog context and table information for lineage tracking.
/**
* Basic table lineage dataset which has catalog context and table in it.
* Note: This interface lacks @PublicEvolving annotation in the source code
* but is considered part of the public lineage API.
*/
public interface TableLineageDataset extends LineageDataset {
/**
* Returns the catalog context for this table.
*
* @return the catalog context
*/
CatalogContext catalogContext();
/**
* Returns the table reference.
*
* @return the catalog base table
*/
CatalogBaseTable table();
/**
* Returns the object path (database and table name).
*
* @return the object path containing database and table identifiers
*/
ObjectPath objectPath();
}Usage Example:
import org.apache.flink.table.planner.lineage.TableLineageDataset;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.listener.CatalogContext;
// Extract table information from lineage dataset
public void analyzeTableLineage(TableLineageDataset dataset) {
CatalogContext context = dataset.catalogContext();
ObjectPath path = dataset.objectPath();
String databaseName = path.getDatabaseName();
String tableName = path.getObjectName();
System.out.println("Lineage for table: " +
context.getName() + "." + databaseName + "." + tableName);
}Provides access to Flink-specific SQL functions and operators.
public class FlinkSqlOperatorTable {
/**
* Returns the Flink SQL operator table instance.
*
* @param isBatchMode whether to return batch-mode or streaming-mode operators
* @return the operator table instance
*/
public static FlinkSqlOperatorTable instance(boolean isBatchMode);
/**
* Returns dynamic functions available for the specified execution mode.
*
* @param isBatchMode whether to return batch-mode or streaming-mode functions
* @return list of SQL functions
*/
public static List<SqlFunction> dynamicFunctions(boolean isBatchMode);
}Usage Example:
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
import org.apache.calcite.sql.SqlFunction;
import java.util.List;
// Access Flink SQL operators for custom query processing
FlinkSqlOperatorTable batchOperators = FlinkSqlOperatorTable.instance(true);
FlinkSqlOperatorTable streamOperators = FlinkSqlOperatorTable.instance(false);
// Get dynamic functions for streaming mode
List<SqlFunction> streamingFunctions = FlinkSqlOperatorTable.dynamicFunctions(false);// Base lineage interfaces (from other modules)
interface LineageVertex {
// Base lineage vertex functionality
}
interface SourceLineageVertex extends LineageVertex {
// Source-specific lineage functionality
}
interface LineageDataset {
// Base dataset lineage functionality
}
// Modification types for sink operations (from org.apache.flink.table.operations)
enum ModifyType {
INSERT,
UPDATE,
DELETE,
// Additional modification types as defined in Flink
}// Catalog context (from flink-table-api-java - catalog listener package)
interface CatalogContext {
/**
* Returns the name of the catalog.
*
* @return the catalog name
*/
String getName();
// Additional catalog context methods
}
// Object path for database.table identification
class ObjectPath {
/**
* Returns the database name.
*
* @return the database name
*/
String getDatabaseName();
/**
* Returns the table/object name.
*
* @return the object name
*/
String getObjectName();
// Additional path methods
}
// Base table interface
interface CatalogBaseTable {
// Table metadata and schema information
}When working with the public APIs, be prepared for:
Typical error handling pattern:
try {
TableLineageDataset dataset = // ... obtain dataset
ObjectPath path = dataset.objectPath();
// Process lineage information
} catch (Exception e) {
// Handle catalog or runtime exceptions
logger.error("Failed to process table lineage", e);
}This module automatically registers three factory implementations through Java's ServiceLoader mechanism:
DefaultExecutorFactory - Creates execution runtime bridgesDefaultParserFactory - Creates SQL parsersDefaultPlannerFactory - Creates query plannersThese factories are auto-discovered by the Flink Table API framework and should not be instantiated directly by user code.
Key module dependencies:
flink-table-api-java - Core Table API interfacesflink-table-common - Common table utilities and typesflink-streaming-java - Streaming runtime integrationcalcite-core - Query optimization engine (shaded)For comprehensive table processing capabilities, use the higher-level Table API modules rather than depending directly on this planner implementation.