or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md
tile.json

execution-nodes.mddocs/

Execution Nodes

Execution nodes represent the building blocks of Flink's query execution plans. The ExecNode hierarchy provides interfaces and abstractions for translating optimized relational plans into executable Flink transformations, supporting both streaming and batch processing modes.

Package Information

import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode; 
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.delegation.Planner;

Capabilities

ExecNode Base Interface

Core interface for all execution nodes in the query execution plan.

public interface ExecNode<T> extends ExecNodeTranslator<T> {
    
    /**
     * Gets the ID of this node.
     */
    int getId();
    
    /**
     * Returns a string which describes this node.
     */
    String getDescription();
    
    /**
     * Returns the output LogicalType of this node, this type should be consistent with the
     * type parameter T.
     */
    LogicalType getOutputType();
    
    /**
     * Returns a list of this node's input properties.
     * If there are no inputs, returns an empty list, not null.
     */
    List<InputProperty> getInputProperties();
    
    /**
     * Returns a list of this node's input ExecEdges.
     * If there are no inputs, returns an empty list, not null.
     */
    List<ExecEdge> getInputEdges();
    
    /**
     * Sets the input ExecEdges which connect this nodes and its input nodes.
     * If there are no inputs, the given inputEdges should be empty, not null.
     */
    void setInputEdges(List<ExecEdge> inputEdges);
    
    /**
     * Replaces the ordinalInParent-th input edge.
     * 
     * @param index Position of the child input edge, 0 is the first.
     * @param newInputEdge New edge that should be put at position `index`.
     */
    void replaceInputEdge(int index, ExecEdge newInputEdge);
    
    /**
     * Accepts a visit from a ExecNodeVisitor.
     */
    void accept(ExecNodeVisitor visitor);
}

The ExecNode interface serves as the foundation for all execution nodes, providing:

  • Plan Structure: Methods to navigate and modify the execution plan tree via ExecEdges
  • Type Information: Access to logical output types for type checking
  • Translation: Inherits from ExecNodeTranslator to convert nodes to Transformations
  • Properties: Required physical properties for input validation
  • Visitor Pattern: Support for traversing execution plan graphs

StreamExecNode Interface

Base interface for stream execution nodes. This is a simple marker interface that extends ExecNode.

public interface StreamExecNode<T> extends ExecNode<T> {}

StreamExecNode serves as a base interface for all streaming execution nodes. It inherits all functionality from ExecNode and ExecNodeTranslator, including the translateToPlan(Planner planner) method for converting to Flink transformations.

BatchExecNode Interface

Base interface for batch execution nodes. This is a simple marker interface that extends ExecNode.

public interface BatchExecNode<T> extends ExecNode<T> {}

BatchExecNode serves as a base interface for all batch execution nodes. Like StreamExecNode, it inherits all functionality from ExecNode and ExecNodeTranslator for plan translation.

ExecNodeTranslator Interface

Interface responsible for translating ExecNodes to Flink Transformations.

public interface ExecNodeTranslator<T> {
    
    /**
     * Translates this node into a Transformation.
     * 
     * NOTE: This method should return same translate result if called multiple times.
     * 
     * @param planner The Planner of the translated Table.
     */
    Transformation<T> translateToPlan(Planner planner);
}

This interface is extended by ExecNode, providing the core translation functionality that converts execution plan nodes into executable Flink transformations.

InputProperty

Describes the physical properties required for an execution node's input.

public class InputProperty {
    
    /**
     * The required data distribution for the input.
     */
    public enum RequiredDistribution {
        SINGLETON,    // Single partition (all data on one node)
        BROADCAST,    // Broadcast to all nodes
        HASH,         // Hash partition by specific keys
        RANGE,        // Range partition by specific keys  
        UNKNOWN       // No specific requirement
    }
    
    /**
     * The damage properties that indicate how changes propagate.
     */
    public enum DamageProperty {
        NO_DAMAGE,           // Changes don't affect output
        DAMAGE_IF_INSERT,    // Inserts may affect output
        DAMAGE_IF_UPDATE,    // Updates may affect output
        DAMAGE_IF_DELETE     // Deletes may affect output
    }
    
    // Constructor and accessor methods
    public InputProperty(
        RequiredDistribution requiredDistribution,
        DistributionSpec distributionSpec,
        DamageProperty damageProperty
    );
    
    public RequiredDistribution getRequiredDistribution();
    public DistributionSpec getDistributionSpec(); 
    public DamageProperty getDamageProperty();
}

Usage Example:

// Define input properties for a join operation
List<InputProperty> inputProperties = Arrays.asList(
    // Left input: hash partitioned by join keys
    new InputProperty(
        InputProperty.RequiredDistribution.HASH,
        DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1
        InputProperty.DamageProperty.DAMAGE_IF_INSERT
    ),
    // Right input: hash partitioned by same join keys  
    new InputProperty(
        InputProperty.RequiredDistribution.HASH,
        DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1
        InputProperty.DamageProperty.DAMAGE_IF_INSERT
    )
);

Translator Interfaces

ExecNodeTranslator

Base interface for translating execution nodes to Flink transformations.

public interface ExecNodeTranslator<T extends ExecNode<?>> {
    
    /**
     * Translates the given execution node to a Flink Transformation.
     */
    Transformation<?> translate(T node, PlannerBase planner);
    
    /**
     * Returns the class of execution nodes this translator can handle.
     */
    Class<T> getTargetClass();
}

SingleTransformationTranslator

Translator interface for nodes that produce a single transformation.

public interface SingleTransformationTranslator<T extends ExecNode<?>> 
    extends ExecNodeTranslator<T> {
    
    /**
     * Translates the node to a single transformation.
     */
    Transformation<?> translateToSingleTransformation(T node, PlannerBase planner);
    
    @Override
    default Transformation<?> translate(T node, PlannerBase planner) {
        return translateToSingleTransformation(node, planner);
    }
}

MultipleTransformationTranslator

Translator interface for nodes that produce multiple transformations.

public interface MultipleTransformationTranslator<T extends ExecNode<?>> 
    extends ExecNodeTranslator<T> {
    
    /**
     * Translates the node to multiple transformations.
     */
    List<Transformation<?>> translateToMultipleTransformations(T node, PlannerBase planner);
    
    @Override
    default Transformation<?> translate(T node, PlannerBase planner) {
        List<Transformation<?>> transformations = 
            translateToMultipleTransformations(node, planner);
        
        // Return union transformation if multiple outputs
        if (transformations.size() == 1) {
            return transformations.get(0);
        } else {
            return createUnionTransformation(transformations);
        }
    }
}

Execution Plan Serialization

ExecNodeJsonSerdeUtil

Utilities for serializing and deserializing execution nodes to/from JSON format.

public final class ExecNodeJsonSerdeUtil {
    
    /**
     * Serializes an execution node to JSON string.
     */
    public static String serializeExecNode(ExecNode<?> execNode);
    
    /**
     * Deserializes an execution node from JSON string.
     */
    public static ExecNode<?> deserializeExecNode(String json, ClassLoader classLoader);
    
    /**
     * Serializes a list of execution nodes to JSON.
     */
    public static String serializeExecNodeList(List<ExecNode<?>> execNodes);
    
    /**
     * Deserializes a list of execution nodes from JSON.
     */
    public static List<ExecNode<?>> deserializeExecNodeList(String json, ClassLoader classLoader);
}

Usage Example:

// Serialize execution plan for storage or transmission
List<ExecNode<?>> execNodes = // execution plan nodes
String serializedPlan = ExecNodeJsonSerdeUtil.serializeExecNodeList(execNodes);

// Store or transmit serialized plan
saveExecutionPlan(serializedPlan);

// Later: deserialize and execute
String storedPlan = loadExecutionPlan();
List<ExecNode<?>> deserializedNodes = ExecNodeJsonSerdeUtil.deserializeExecNodeList(
    storedPlan, 
    classLoader
);

// Execute deserialized plan
for (ExecNode<?> node : deserializedNodes) {
    if (node instanceof StreamExecNode) {
        Transformation<?> transformation = 
            ((StreamExecNode<?>) node).translateToPlan(planner);
        // Add to execution environment
    }
}

Advanced Execution Node Patterns

Composite Execution Nodes

// Example of a composite node that combines multiple operations
public class CompositeExecNode implements StreamExecNode<RowData> {
    
    private final List<ExecNode<?>> subNodes;
    
    @Override
    public Transformation<RowData> translateToPlan(PlannerBase planner) {
        // Translate sub-nodes and chain them
        Transformation<RowData> result = null;
        
        for (ExecNode<?> subNode : subNodes) {
            Transformation<RowData> subTransform = 
                ((StreamExecNode<RowData>) subNode).translateToPlan(planner);
                
            if (result == null) {
                result = subTransform;
            } else {
                // Chain transformations
                result = chainTransformations(result, subTransform);
            }
        }
        
        return result;
    }
}

Adaptive Execution Nodes

// Example of an adaptive node that adjusts behavior at runtime
public class AdaptiveJoinExecNode implements BatchExecNode<RowData> {
    
    @Override
    public Transformation<RowData> translateToPlan(PlannerBase planner) {
        // Choose join strategy based on runtime statistics
        JoinStrategy strategy = chooseJoinStrategy(
            getInputNodes().get(0).getEstimatedRowCount(),
            getInputNodes().get(1).getEstimatedRowCount()
        );
        
        switch (strategy) {
            case BROADCAST_HASH:
                return createBroadcastHashJoin(planner);
            case SORT_MERGE:
                return createSortMergeJoin(planner);
            case NESTED_LOOP:
                return createNestedLoopJoin(planner);
            default:
                throw new UnsupportedOperationException("Unknown join strategy: " + strategy);
        }
    }
}

Error Handling

Execution nodes should provide robust error handling:

@Override
public Transformation<RowData> translateToPlan(PlannerBase planner) {
    try {
        // Validate inputs
        validateInputNodes();
        
        // Create transformation
        return createTransformation(planner);
        
    } catch (Exception e) {
        throw new TableException(
            String.format("Failed to translate ExecNode %s: %s", 
                         getDescription(), e.getMessage()), 
            e
        );
    }
}

private void validateInputNodes() {
    if (getInputNodes().isEmpty()) {
        throw new IllegalStateException("ExecNode requires at least one input");
    }
    
    for (int i = 0; i < getInputNodes().size(); i++) {
        ExecNode<?> input = getInputNodes().get(i);
        InputProperty expectedProperty = getInputProperties().get(i);
        
        // Validate input compatibility
        if (!isCompatible(input.getOutputType(), expectedProperty)) {
            throw new ValidationException(
                String.format("Input %d is not compatible with required property %s", 
                             i, expectedProperty)
            );
        }
    }
}