Execution nodes represent the building blocks of Flink's query execution plans. The ExecNode hierarchy provides interfaces and abstractions for translating optimized relational plans into executable Flink transformations, supporting both streaming and batch processing modes.
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.delegation.Planner;Core interface for all execution nodes in the query execution plan.
public interface ExecNode<T> extends ExecNodeTranslator<T> {
/**
* Gets the ID of this node.
*/
int getId();
/**
* Returns a string which describes this node.
*/
String getDescription();
/**
* Returns the output LogicalType of this node, this type should be consistent with the
* type parameter T.
*/
LogicalType getOutputType();
/**
* Returns a list of this node's input properties.
* If there are no inputs, returns an empty list, not null.
*/
List<InputProperty> getInputProperties();
/**
* Returns a list of this node's input ExecEdges.
* If there are no inputs, returns an empty list, not null.
*/
List<ExecEdge> getInputEdges();
/**
* Sets the input ExecEdges which connect this nodes and its input nodes.
* If there are no inputs, the given inputEdges should be empty, not null.
*/
void setInputEdges(List<ExecEdge> inputEdges);
/**
* Replaces the ordinalInParent-th input edge.
*
* @param index Position of the child input edge, 0 is the first.
* @param newInputEdge New edge that should be put at position `index`.
*/
void replaceInputEdge(int index, ExecEdge newInputEdge);
/**
* Accepts a visit from a ExecNodeVisitor.
*/
void accept(ExecNodeVisitor visitor);
}The ExecNode interface serves as the foundation for all execution nodes, providing:
Base interface for stream execution nodes. This is a simple marker interface that extends ExecNode.
public interface StreamExecNode<T> extends ExecNode<T> {}StreamExecNode serves as a base interface for all streaming execution nodes. It inherits all functionality from ExecNode and ExecNodeTranslator, including the translateToPlan(Planner planner) method for converting to Flink transformations.
Base interface for batch execution nodes. This is a simple marker interface that extends ExecNode.
public interface BatchExecNode<T> extends ExecNode<T> {}BatchExecNode serves as a base interface for all batch execution nodes. Like StreamExecNode, it inherits all functionality from ExecNode and ExecNodeTranslator for plan translation.
Interface responsible for translating ExecNodes to Flink Transformations.
public interface ExecNodeTranslator<T> {
/**
* Translates this node into a Transformation.
*
* NOTE: This method should return same translate result if called multiple times.
*
* @param planner The Planner of the translated Table.
*/
Transformation<T> translateToPlan(Planner planner);
}This interface is extended by ExecNode, providing the core translation functionality that converts execution plan nodes into executable Flink transformations.
Describes the physical properties required for an execution node's input.
public class InputProperty {
/**
* The required data distribution for the input.
*/
public enum RequiredDistribution {
SINGLETON, // Single partition (all data on one node)
BROADCAST, // Broadcast to all nodes
HASH, // Hash partition by specific keys
RANGE, // Range partition by specific keys
UNKNOWN // No specific requirement
}
/**
* The damage properties that indicate how changes propagate.
*/
public enum DamageProperty {
NO_DAMAGE, // Changes don't affect output
DAMAGE_IF_INSERT, // Inserts may affect output
DAMAGE_IF_UPDATE, // Updates may affect output
DAMAGE_IF_DELETE // Deletes may affect output
}
// Constructor and accessor methods
public InputProperty(
RequiredDistribution requiredDistribution,
DistributionSpec distributionSpec,
DamageProperty damageProperty
);
public RequiredDistribution getRequiredDistribution();
public DistributionSpec getDistributionSpec();
public DamageProperty getDamageProperty();
}Usage Example:
// Define input properties for a join operation
List<InputProperty> inputProperties = Arrays.asList(
// Left input: hash partitioned by join keys
new InputProperty(
InputProperty.RequiredDistribution.HASH,
DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1
InputProperty.DamageProperty.DAMAGE_IF_INSERT
),
// Right input: hash partitioned by same join keys
new InputProperty(
InputProperty.RequiredDistribution.HASH,
DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1
InputProperty.DamageProperty.DAMAGE_IF_INSERT
)
);Base interface for translating execution nodes to Flink transformations.
public interface ExecNodeTranslator<T extends ExecNode<?>> {
/**
* Translates the given execution node to a Flink Transformation.
*/
Transformation<?> translate(T node, PlannerBase planner);
/**
* Returns the class of execution nodes this translator can handle.
*/
Class<T> getTargetClass();
}Translator interface for nodes that produce a single transformation.
public interface SingleTransformationTranslator<T extends ExecNode<?>>
extends ExecNodeTranslator<T> {
/**
* Translates the node to a single transformation.
*/
Transformation<?> translateToSingleTransformation(T node, PlannerBase planner);
@Override
default Transformation<?> translate(T node, PlannerBase planner) {
return translateToSingleTransformation(node, planner);
}
}Translator interface for nodes that produce multiple transformations.
public interface MultipleTransformationTranslator<T extends ExecNode<?>>
extends ExecNodeTranslator<T> {
/**
* Translates the node to multiple transformations.
*/
List<Transformation<?>> translateToMultipleTransformations(T node, PlannerBase planner);
@Override
default Transformation<?> translate(T node, PlannerBase planner) {
List<Transformation<?>> transformations =
translateToMultipleTransformations(node, planner);
// Return union transformation if multiple outputs
if (transformations.size() == 1) {
return transformations.get(0);
} else {
return createUnionTransformation(transformations);
}
}
}Utilities for serializing and deserializing execution nodes to/from JSON format.
public final class ExecNodeJsonSerdeUtil {
/**
* Serializes an execution node to JSON string.
*/
public static String serializeExecNode(ExecNode<?> execNode);
/**
* Deserializes an execution node from JSON string.
*/
public static ExecNode<?> deserializeExecNode(String json, ClassLoader classLoader);
/**
* Serializes a list of execution nodes to JSON.
*/
public static String serializeExecNodeList(List<ExecNode<?>> execNodes);
/**
* Deserializes a list of execution nodes from JSON.
*/
public static List<ExecNode<?>> deserializeExecNodeList(String json, ClassLoader classLoader);
}Usage Example:
// Serialize execution plan for storage or transmission
List<ExecNode<?>> execNodes = // execution plan nodes
String serializedPlan = ExecNodeJsonSerdeUtil.serializeExecNodeList(execNodes);
// Store or transmit serialized plan
saveExecutionPlan(serializedPlan);
// Later: deserialize and execute
String storedPlan = loadExecutionPlan();
List<ExecNode<?>> deserializedNodes = ExecNodeJsonSerdeUtil.deserializeExecNodeList(
storedPlan,
classLoader
);
// Execute deserialized plan
for (ExecNode<?> node : deserializedNodes) {
if (node instanceof StreamExecNode) {
Transformation<?> transformation =
((StreamExecNode<?>) node).translateToPlan(planner);
// Add to execution environment
}
}// Example of a composite node that combines multiple operations
public class CompositeExecNode implements StreamExecNode<RowData> {
private final List<ExecNode<?>> subNodes;
@Override
public Transformation<RowData> translateToPlan(PlannerBase planner) {
// Translate sub-nodes and chain them
Transformation<RowData> result = null;
for (ExecNode<?> subNode : subNodes) {
Transformation<RowData> subTransform =
((StreamExecNode<RowData>) subNode).translateToPlan(planner);
if (result == null) {
result = subTransform;
} else {
// Chain transformations
result = chainTransformations(result, subTransform);
}
}
return result;
}
}// Example of an adaptive node that adjusts behavior at runtime
public class AdaptiveJoinExecNode implements BatchExecNode<RowData> {
@Override
public Transformation<RowData> translateToPlan(PlannerBase planner) {
// Choose join strategy based on runtime statistics
JoinStrategy strategy = chooseJoinStrategy(
getInputNodes().get(0).getEstimatedRowCount(),
getInputNodes().get(1).getEstimatedRowCount()
);
switch (strategy) {
case BROADCAST_HASH:
return createBroadcastHashJoin(planner);
case SORT_MERGE:
return createSortMergeJoin(planner);
case NESTED_LOOP:
return createNestedLoopJoin(planner);
default:
throw new UnsupportedOperationException("Unknown join strategy: " + strategy);
}
}
}Execution nodes should provide robust error handling:
@Override
public Transformation<RowData> translateToPlan(PlannerBase planner) {
try {
// Validate inputs
validateInputNodes();
// Create transformation
return createTransformation(planner);
} catch (Exception e) {
throw new TableException(
String.format("Failed to translate ExecNode %s: %s",
getDescription(), e.getMessage()),
e
);
}
}
private void validateInputNodes() {
if (getInputNodes().isEmpty()) {
throw new IllegalStateException("ExecNode requires at least one input");
}
for (int i = 0; i < getInputNodes().size(); i++) {
ExecNode<?> input = getInputNodes().get(i);
InputProperty expectedProperty = getInputProperties().get(i);
// Validate input compatibility
if (!isCompatible(input.getOutputType(), expectedProperty)) {
throw new ValidationException(
String.format("Input %d is not compatible with required property %s",
i, expectedProperty)
);
}
}
}