CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

Apache Flink's Table API Planner Blink module providing sophisticated SQL and Table API query planning and execution engine with advanced query optimization, code generation, and comprehensive support for both streaming and batch workloads.

Pending
Overview
Eval results
Files

query-planning.mddocs/

Query Planning and Optimization

The query planning system provides comprehensive query optimization capabilities for both streaming and batch workloads. It includes logical plan creation, rule-based optimization, cost-based optimization, and execution graph generation using Apache Calcite as the underlying optimization framework.

Capabilities

PlannerBase - Abstract Planner Foundation

Base class providing common functionality for both streaming and batch planners, including query translation, optimization, and execution plan generation.

/**
 * Abstract base class for Blink planners providing common functionality
 */
abstract class PlannerBase(
  executor: Executor,
  config: TableConfig, 
  functionCatalog: FunctionCatalog,
  catalogManager: CatalogManager,
  isStreamingMode: Boolean
) extends Planner {

  /**
   * Translates modify operation to relational algebra representation
   * @param modifyOperation The operation to translate (INSERT, UPDATE, DELETE)
   * @return RelNode representing the operation in Calcite's algebra
   */
  def translateToRel(modifyOperation: ModifyOperation): RelNode
  
  /**
   * Optimizes a sequence of relational nodes using optimization rules
   * @param relNodes Sequence of RelNode instances to optimize
   * @return Optimized sequence of RelNode instances
   */
  def optimize(relNodes: Seq[RelNode]): Seq[RelNode]
  
  /**
   * Translates execution graph to Flink transformations
   * @param execGraph Execution node graph to translate
   * @return List of Flink transformations for job execution
   */
  def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
  
  /**
   * Generates execution plan explanation for debugging and analysis
   * @param operations List of operations to explain
   * @param extraDetails Additional detail levels for explanation
   * @return String representation of the execution plan
   */
  def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String
  
  /**
   * Returns trait definitions for relational algebra optimization
   * @return Array of trait definitions used by this planner
   */
  protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]]
  
  /**
   * Returns optimizer instance for this planner
   * @return Optimizer instance implementing optimization strategies
   */  
  protected def getOptimizer: Optimizer
  
  /**
   * Returns execution node graph processors for post-optimization processing
   * @return Sequence of processors for execution graph transformation
   */
  protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor]
}

StreamPlanner - Streaming Query Planning

Planner implementation optimized for streaming workloads with support for watermarks, windowing, and continuous query processing.

/**
 * Planner implementation for streaming execution mode
 */
class StreamPlanner(
  executor: Executor,
  config: TableConfig,
  functionCatalog: FunctionCatalog, 
  catalogManager: CatalogManager
) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {

  /**
   * Returns trait definitions specific to streaming execution
   * Includes distribution, mini-batch interval, modify kind, and update kind traits
   */
  protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = Array(
    ConventionTraitDef.INSTANCE,
    FlinkRelDistributionTraitDef.INSTANCE,
    MiniBatchIntervalTraitDef.INSTANCE,
    ModifyKindSetTraitDef.INSTANCE,
    UpdateKindTraitDef.INSTANCE
  )
  
  /**
   * Returns stream-specific optimizer with common sub-graph optimization
   * @return StreamCommonSubGraphBasedOptimizer instance
   */
  protected def getOptimizer: Optimizer = new StreamCommonSubGraphBasedOptimizer(this)
  
  /**
   * Returns empty processor sequence for streaming (no additional processing needed)
   * @return Empty sequence of processors
   */
  protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor] = Seq()
  
  /**
   * Translates execution graph to stream transformations
   * @param execGraph Execution node graph for streaming
   * @return List of streaming transformations
   */
  protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
}

Usage Example:

import org.apache.flink.table.planner.delegation.StreamPlanner

// StreamPlanner is typically created via BlinkPlannerFactory
// but can be instantiated directly for advanced use cases
val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager)

// Translate SQL to execution plan
val operations = parseAndValidateSQL("SELECT * FROM source WHERE value > 100")
val transformations = streamPlanner.translateToRel(operations)

BatchPlanner - Batch Query Planning

Planner implementation optimized for batch workloads with support for batch-specific optimizations and execution strategies.

/**
 * Planner implementation for batch execution mode
 */
class BatchPlanner(
  executor: Executor,
  config: TableConfig,
  functionCatalog: FunctionCatalog,
  catalogManager: CatalogManager  
) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = false) {

  /**
   * Returns trait definitions specific to batch execution
   * Focuses on distribution and convention traits without streaming-specific traits
   */
  protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]]
  
  /**
   * Returns batch-specific optimizer with batch optimization strategies
   * @return BatchCommonSubGraphBasedOptimizer instance
   */
  protected def getOptimizer: Optimizer
  
  /**
   * Returns batch-specific processors for execution graph optimization
   * @return Sequence of batch-specific processors
   */
  protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor]
  
  /**
   * Translates execution graph to batch transformations
   * @param execGraph Execution node graph for batch processing
   * @return List of batch transformations
   */
  protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
}

Optimizer System

The optimization system provides multi-phase query optimization using Apache Calcite's rule-based and cost-based optimization framework.

/**
 * Base optimizer interface for query optimization
 */
trait Optimizer {
  /**
   * Optimizes relational algebra expressions using optimization rules
   * @param roots Root nodes of the query plan to optimize
   * @return Optimized execution node graph
   */
  def optimize(roots: Seq[RelNode]): ExecNodeGraph
}

/**
 * Stream-specific optimizer with common sub-graph based optimization
 */
class StreamCommonSubGraphBasedOptimizer(planner: PlannerBase) extends Optimizer {
  /**
   * Applies streaming-specific optimization rules and strategies
   * Includes watermark propagation, mini-batch optimization, and state optimization
   */
  def optimize(roots: Seq[RelNode]): ExecNodeGraph
}

/**
 * Batch-specific optimizer with batch optimization strategies
 */
class BatchCommonSubGraphBasedOptimizer(planner: PlannerBase) extends Optimizer {
  /**
   * Applies batch-specific optimization rules and strategies  
   * Includes join reordering, aggregation pushdown, and partition pruning
   */
  def optimize(roots: Seq[RelNode]): ExecNodeGraph
}

Execution Node Graph

Represents the optimized execution plan as a graph of execution nodes, providing the bridge between logical planning and physical execution.

/**
 * Graph representation of execution nodes for query execution
 */
public class ExecNodeGraph {
  /**
   * Returns root nodes of the execution graph
   * @return List of root execution nodes
   */
  public List<ExecNode<?>> getRootNodes();
  
  /**
   * Accepts visitor for traversing the execution graph
   * @param visitor Visitor implementation for graph traversal
   */
  public void accept(ExecNodeVisitor visitor);
  
  /**
   * Returns all nodes in the execution graph  
   * @return Set of all execution nodes in the graph
   */
  public Set<ExecNode<?>> getAllNodes();
}

/**
 * Base interface for execution nodes in the graph
 */
public interface ExecNode<T> {
  /**
   * Translates this execution node to Flink transformation
   * @param planner Planner context for translation
   * @return Flink transformation representing this node
   */
  Transformation<T> translateToPlan(Planner planner);
  
  /**
   * Returns input nodes of this execution node
   * @return List of input execution nodes
   */
  List<ExecNode<?>> getInputNodes();
  
  /**
   * Returns output type of this execution node
   * @return Type information for the output
   */
  TypeInformation<T> getOutputType();
}

Optimization Phases

Phase 1: Logical Planning

  • SQL parsing and validation using Calcite
  • Logical plan creation with RelNode representation
  • Initial semantic validation and type checking

Phase 2: Rule-Based Optimization

  • Application of transformation rules (projection pushdown, filter pushdown, etc.)
  • Join reordering and optimization
  • Predicate simplification and constant folding

Phase 3: Cost-Based Optimization

  • Statistics-based cost estimation
  • Join algorithm selection (hash join, sort-merge join, etc.)
  • Access path selection and index usage

Phase 4: Physical Planning

  • Translation to execution nodes (ExecNode graph)
  • Operator selection and configuration
  • Resource allocation and parallelism planning

Phase 5: Code Generation

  • Dynamic code generation for optimized execution
  • Fusion of operators where beneficial
  • Memory management optimization

Configuration and Tuning

Key configuration options for query planning:

// Enable cost-based optimization
tableConfig.getConfiguration().setString("table.optimizer.cbo-enabled", "true");

// Set join reordering optimization
tableConfig.getConfiguration().setString("table.optimizer.join-reorder-enabled", "true");

// Configure mini-batch settings for streaming
tableConfig.getConfiguration().setString("table.exec.mini-batch.enabled", "true");
tableConfig.getConfiguration().setString("table.exec.mini-batch.allow-latency", "1s");

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-planner-blink-2-12

docs

catalog-integration.md

code-generation.md

expression-system.md

function-integration.md

index.md

planner-factory.md

query-planning.md

tile.json