Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-planner-blink-2-12@1.13.0Apache Flink's Table API Planner Blink module provides a sophisticated SQL and Table API query planning and execution engine that bridges high-level declarative queries with Flink's streaming and batch runtime execution. Built primarily in Scala with extensive Java integration, it provides advanced query optimization capabilities including cost-based optimization, code generation for high-performance execution, and comprehensive support for both streaming and batch workloads.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
</dependency>// Factory for creating planner instances
import org.apache.flink.table.planner.delegation.BlinkPlannerFactory;
import org.apache.flink.table.planner.delegation.BlinkExecutorFactory;
// Main planner classes
import org.apache.flink.table.planner.delegation.StreamPlanner;
import org.apache.flink.table.planner.delegation.BatchPlanner;import org.apache.flink.table.api.*;
import org.apache.flink.table.planner.delegation.BlinkPlannerFactory;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create table environment with Blink planner for streaming
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// Execute SQL queries
Table result = tableEnv.sqlQuery("SELECT * FROM source_table WHERE value > 100");The Flink Table Planner Blink is built around several key components:
Service provider interface (SPI) factories for creating and configuring planner instances. These factories are the main entry points for integrating the Blink planner with Flink's Table API.
public final class BlinkPlannerFactory implements PlannerFactory {
public Planner create(Map<String, String> properties, Executor executor,
TableConfig tableConfig, FunctionCatalog functionCatalog,
CatalogManager catalogManager);
}
public final class BlinkExecutorFactory implements ExecutorFactory {
public Executor create(Map<String, String> properties);
}Core query planning capabilities including logical plan creation, optimization rule application, and cost-based optimization. Supports both streaming and batch execution modes with different optimization strategies.
abstract class PlannerBase extends Planner {
def translateToRel(modifyOperation: ModifyOperation): RelNode
def optimize(relNodes: Seq[RelNode]): Seq[RelNode]
def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
}
class StreamPlanner extends PlannerBase
class BatchPlanner extends PlannerBaseDynamic code generation system that creates efficient Java code for query execution. Generates specialized code for calculations, aggregations, projections, and other operations to achieve high performance.
object CalcCodeGenerator {
def generateCalcOperator(
ctx: CodeGeneratorContext,
inputTransform: Transformation[RowData],
outputType: RowType,
projection: Seq[RexNode],
condition: Option[RexNode],
retainHeader: Boolean = false,
opName: String
): CodeGenOperatorFactory[RowData]
}
object ProjectionCodeGenerator {
def generateProjection(
ctx: CodeGeneratorContext,
name: String,
inType: RowType,
outType: RowType,
inputMapping: Array[Int],
outClass: Class[_ <: RowData] = classOf[BinaryRowData]
): GeneratedProjection
}Comprehensive support for user-defined functions (UDFs) including scalar functions, aggregate functions, and table functions. Provides utilities for function registration, validation, and SQL integration.
object UserDefinedFunctionUtils {
def checkForInstantiation(clazz: Class[_]): Unit
def checkNotSingleton(clazz: Class[_]): Unit
def getEvalMethodSignature(function: UserDefinedFunction, expectedTypes: Array[LogicalType]): Method
}
class AggSqlFunction extends SqlFunction
class ScalarSqlFunction extends SqlFunction
class TableSqlFunction extends SqlFunctionExpression handling for SQL expressions and Table API expressions, including validation, type inference, and code generation. Supports complex expressions with nested operations and custom functions.
// Expression conversion and validation
trait ExpressionConverter {
def convertToRexNode(expression: Expression): RexNode
def convertToExpression(rexNode: RexNode): Expression
}Integration with Flink's catalog system for metadata management, table registration, and schema information. Supports multiple catalog types and schema evolution.
// Catalog integration through standard Flink interfaces
// Implemented via CatalogManager and FunctionCatalog parameters// Core planner configuration
public class EnvironmentSettings {
public static final String STREAMING_MODE = "table.exec.mode.streaming";
public static final String CLASS_NAME = "table.exec.planner-factory";
}
// Execution graph representation
public class ExecNodeGraph {
public List<ExecNode<?>> getRootNodes();
public void accept(ExecNodeVisitor visitor);
}
// Transformation representation
public abstract class Transformation<T> {
public String getName();
public TypeInformation<T> getOutputType();
}