or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md
tile.json

core-planning.mddocs/

Core Planning Components

Core planner implementations handle the translation and optimization of table programs into Flink execution plans. The planning system supports both streaming and batch processing modes with sophisticated SQL parsing capabilities.

Package Information

import org.apache.flink.table.planner.delegation.{PlannerBase, StreamPlanner, BatchPlanner}
import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.catalog.UnresolvedIdentifier;

Capabilities

PlannerBase (Abstract Base Class)

The foundational abstract class that provides common functionality for both streaming and batch planners.

abstract class PlannerBase extends Planner {
  def getTableEnvironment: TableEnvironment
  def getFlinkRelBuilder: FlinkRelBuilder  
  def getTypeFactory: FlinkTypeFactory
  def getExecEnv: StreamExecutionEnvironment
  def getTableConfig: TableConfig
  def getFlinkContext: FlinkContext
  def getCatalogManager: CatalogManager
  def getFunctionCatalog: FunctionCatalog
  
  // Core planning methods
  def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
  def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
  def getCompletionHints(statement: String, position: Int): Array[String]
  def getParser: Parser
}

Key Responsibilities:

  • Manages the planner context and configuration (PlannerContext, PlannerConfiguration)
  • Provides access to Calcite components (FlinkRelBuilder, FlinkTypeFactory)
  • Handles query optimization and execution plan generation
  • Integrates with Flink's catalog system and function registry
  • Manages the lifecycle of planning operations

Usage Example:

// PlannerBase is typically accessed through concrete implementations
val planner: PlannerBase = // obtained from factory

// Access core components
val tableEnv = planner.getTableEnvironment
val relBuilder = planner.getFlinkRelBuilder
val typeFactory = planner.getTypeFactory

// Translate operations to transformations
val operations: util.List[ModifyOperation] = // parsed operations
val transformations = planner.translate(operations)

// Explain query plans
val explanation = planner.explain(operations, ExplainDetail.ESTIMATED_COST)

StreamPlanner

Concrete planner implementation specialized for streaming workloads.

class StreamPlanner(
    executor: Executor,
    config: TableConfig,
    functionCatalog: FunctionCatalog,
    catalogManager: CatalogManager,
    isStreamingMode: Boolean
) extends PlannerBase {

  // Streaming-specific optimization and translation
  override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
  
  // Streaming-specific explain functionality  
  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
}

Key Features:

  • Optimized for continuous stream processing
  • Supports streaming-specific operators (windowing, watermarks, state management)
  • Handles incremental computation and result updates
  • Manages streaming execution semantics (event-time processing, late data handling)

Usage Example:

// StreamPlanner is created through DefaultPlannerFactory for streaming mode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// The planner is internal to the table environment
val streamPlanner = // accessed internally

// Process streaming operations
val operations = // parsed streaming operations  
val transformations = streamPlanner.translate(operations)

BatchPlanner

Concrete planner implementation specialized for batch workloads.

class BatchPlanner(
    executor: Executor,
    config: TableConfig,  
    functionCatalog: FunctionCatalog,
    catalogManager: CatalogManager
) extends PlannerBase {

  // Batch-specific optimization and translation
  override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]]
  
  // Batch-specific explain functionality
  override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
}

Key Features:

  • Optimized for finite dataset processing
  • Supports batch-specific optimizations (join reordering, predicate pushdown)
  • Handles bounded data processing patterns
  • Manages batch execution semantics and resource allocation

Usage Example:

// BatchPlanner is created through DefaultPlannerFactory for batch mode
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val tableEnv = TableEnvironment.create(
  EnvironmentSettings.newInstance()
    .inBatchMode()
    .build()
)

// The planner is internal to the table environment
val batchPlanner = // accessed internally

// Process batch operations
val operations = // parsed batch operations
val transformations = batchPlanner.translate(operations)

ParserImpl

Default SQL parser implementation using Apache Calcite for parsing SQL statements, identifiers, and expressions.

public class ParserImpl implements Parser {
    
    // Constructor
    public ParserImpl(
        CatalogManager catalogManager,
        Supplier<FlinkPlannerImpl> validatorSupplier,
        Supplier<CalciteParser> calciteParserSupplier,
        RexNodeToExpressionConverter rexNodeToExpressionConverter
    );

    // Core parsing methods  
    public List<Operation> parse(String statement);
    public UnresolvedIdentifier parseIdentifier(String identifier);
    public ResolvedExpression parseSqlExpression(
        String sqlExpression, 
        RowType inputRowType, 
        LogicalType outputType
    );
    
    // Completion and validation
    public String[] getCompletionHints(String statement, int position);
}

Key Methods:

  • parse(String statement): Parses SQL statements into executable operations

    • Handles DDL (CREATE TABLE, ALTER TABLE, etc.)
    • Handles DML (SELECT, INSERT, UPDATE, DELETE)
    • Handles utility statements (SHOW TABLES, DESCRIBE, etc.)
    • Returns list of Operation objects for execution
  • parseIdentifier(String identifier): Parses table/column identifiers

    • Supports qualified names (catalog.database.table)
    • Handles quoted and unquoted identifiers
    • Returns UnresolvedIdentifier for catalog resolution
  • parseSqlExpression(...): Parses SQL expressions within contexts

    • Used for computed columns, filters, projections
    • Requires input row type for context
    • Returns resolved expressions with type information

Usage Example:

import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.catalog.UnresolvedIdentifier;

// Parser is typically obtained from PlannerBase
Parser parser = planner.getParser();

// Parse SQL statements
List<Operation> operations = parser.parse("CREATE TABLE my_table (id INT, name STRING)");
List<Operation> queryOps = parser.parse("SELECT id, UPPER(name) FROM my_table WHERE id > 100");

// Parse identifiers
UnresolvedIdentifier tableId = parser.parseIdentifier("catalog.database.table");
UnresolvedIdentifier simpleId = parser.parseIdentifier("my_table");

// Parse expressions
RowType inputType = // define input row type
LogicalType outputType = DataTypes.STRING().getLogicalType();
ResolvedExpression expr = parser.parseSqlExpression(
    "UPPER(name)", 
    inputType, 
    outputType
);

// Get completion hints for IDE integration
String[] hints = parser.getCompletionHints("SELECT * FROM my_ta", 18);

Planning Process Flow

The core planning process follows these stages:

  1. Parsing: ParserImpl converts SQL text into Operation objects
  2. Validation: Operations are validated against catalog and type system
  3. Optimization: PlannerBase applies Calcite optimization rules
  4. Translation: Optimized plans are translated to Flink Transformation objects
  5. Execution: Executor converts transformations to executable job graphs
// Typical planning flow
Parser parser = planner.getParser();

// 1. Parse SQL to operations
List<Operation> operations = parser.parse(sqlStatement);

// 2. Translate to transformations (includes validation & optimization)
List<Transformation<?>> transformations = planner.translate(operations);

// 3. Execute transformations
JobExecutionResult result = executor.execute(transformations);

Optimization Integration

The planner integrates deeply with Apache Calcite's optimization framework:

  • Rule-Based Optimization: Applies transformation rules to improve query plans
  • Cost-Based Optimization: Uses statistics to choose optimal join orders and algorithms
  • Custom Rules: Flink-specific optimization rules for streaming and batch scenarios
  • Program Chaining: Supports custom optimization programs via CalciteConfig

Error Handling

The planning components provide comprehensive error handling:

try {
    List<Operation> operations = parser.parse(sql);
    List<Transformation<?>> transformations = planner.translate(operations);
} catch (SqlParseException e) {
    // Handle SQL syntax errors
} catch (ValidationException e) {
    // Handle semantic validation errors  
} catch (TableException e) {
    // Handle table-specific errors
}

Common error scenarios:

  • Parse Errors: Invalid SQL syntax, unrecognized keywords
  • Validation Errors: Unknown tables/columns, type mismatches, unsupported operations
  • Planning Errors: Optimization failures, unsupported query patterns
  • Resource Errors: Insufficient memory, configuration issues