0
# Execution Nodes
1
2
Execution nodes represent the building blocks of Flink's query execution plans. The ExecNode hierarchy provides interfaces and abstractions for translating optimized relational plans into executable Flink transformations, supporting both streaming and batch processing modes.
3
4
## Package Information
5
6
```java
7
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
8
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeTranslator;
9
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
10
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
11
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
12
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
13
import org.apache.flink.api.dag.Transformation;
14
import org.apache.flink.table.types.logical.LogicalType;
15
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
16
import org.apache.flink.table.delegation.Planner;
17
```
18
19
## Capabilities
20
21
### ExecNode Base Interface
22
23
Core interface for all execution nodes in the query execution plan.
24
25
```java { .api }
26
public interface ExecNode<T> extends ExecNodeTranslator<T> {
27
28
/**
29
* Gets the ID of this node.
30
*/
31
int getId();
32
33
/**
34
* Returns a string which describes this node.
35
*/
36
String getDescription();
37
38
/**
39
* Returns the output LogicalType of this node, this type should be consistent with the
40
* type parameter T.
41
*/
42
LogicalType getOutputType();
43
44
/**
45
* Returns a list of this node's input properties.
46
* If there are no inputs, returns an empty list, not null.
47
*/
48
List<InputProperty> getInputProperties();
49
50
/**
51
* Returns a list of this node's input ExecEdges.
52
* If there are no inputs, returns an empty list, not null.
53
*/
54
List<ExecEdge> getInputEdges();
55
56
/**
57
* Sets the input ExecEdges which connect this nodes and its input nodes.
58
* If there are no inputs, the given inputEdges should be empty, not null.
59
*/
60
void setInputEdges(List<ExecEdge> inputEdges);
61
62
/**
63
* Replaces the ordinalInParent-th input edge.
64
*
65
* @param index Position of the child input edge, 0 is the first.
66
* @param newInputEdge New edge that should be put at position `index`.
67
*/
68
void replaceInputEdge(int index, ExecEdge newInputEdge);
69
70
/**
71
* Accepts a visit from a ExecNodeVisitor.
72
*/
73
void accept(ExecNodeVisitor visitor);
74
}
75
```
76
77
The `ExecNode` interface serves as the foundation for all execution nodes, providing:
78
- **Plan Structure**: Methods to navigate and modify the execution plan tree via ExecEdges
79
- **Type Information**: Access to logical output types for type checking
80
- **Translation**: Inherits from ExecNodeTranslator to convert nodes to Transformations
81
- **Properties**: Required physical properties for input validation
82
- **Visitor Pattern**: Support for traversing execution plan graphs
83
84
### StreamExecNode Interface
85
86
Base interface for stream execution nodes. This is a simple marker interface that extends ExecNode.
87
88
```java { .api }
89
public interface StreamExecNode<T> extends ExecNode<T> {}
90
```
91
92
StreamExecNode serves as a base interface for all streaming execution nodes. It inherits all functionality from ExecNode and ExecNodeTranslator, including the `translateToPlan(Planner planner)` method for converting to Flink transformations.
93
94
### BatchExecNode Interface
95
96
Base interface for batch execution nodes. This is a simple marker interface that extends ExecNode.
97
98
```java { .api }
99
public interface BatchExecNode<T> extends ExecNode<T> {}
100
```
101
102
BatchExecNode serves as a base interface for all batch execution nodes. Like StreamExecNode, it inherits all functionality from ExecNode and ExecNodeTranslator for plan translation.
103
104
### ExecNodeTranslator Interface
105
106
Interface responsible for translating ExecNodes to Flink Transformations.
107
108
```java { .api }
109
public interface ExecNodeTranslator<T> {
110
111
/**
112
* Translates this node into a Transformation.
113
*
114
* NOTE: This method should return same translate result if called multiple times.
115
*
116
* @param planner The Planner of the translated Table.
117
*/
118
Transformation<T> translateToPlan(Planner planner);
119
}
120
```
121
122
This interface is extended by ExecNode, providing the core translation functionality that converts execution plan nodes into executable Flink transformations.
123
124
### InputProperty
125
126
Describes the physical properties required for an execution node's input.
127
128
```java { .api }
129
public class InputProperty {
130
131
/**
132
* The required data distribution for the input.
133
*/
134
public enum RequiredDistribution {
135
SINGLETON, // Single partition (all data on one node)
136
BROADCAST, // Broadcast to all nodes
137
HASH, // Hash partition by specific keys
138
RANGE, // Range partition by specific keys
139
UNKNOWN // No specific requirement
140
}
141
142
/**
143
* The damage properties that indicate how changes propagate.
144
*/
145
public enum DamageProperty {
146
NO_DAMAGE, // Changes don't affect output
147
DAMAGE_IF_INSERT, // Inserts may affect output
148
DAMAGE_IF_UPDATE, // Updates may affect output
149
DAMAGE_IF_DELETE // Deletes may affect output
150
}
151
152
// Constructor and accessor methods
153
public InputProperty(
154
RequiredDistribution requiredDistribution,
155
DistributionSpec distributionSpec,
156
DamageProperty damageProperty
157
);
158
159
public RequiredDistribution getRequiredDistribution();
160
public DistributionSpec getDistributionSpec();
161
public DamageProperty getDamageProperty();
162
}
163
```
164
165
**Usage Example:**
166
167
```java
168
// Define input properties for a join operation
169
List<InputProperty> inputProperties = Arrays.asList(
170
// Left input: hash partitioned by join keys
171
new InputProperty(
172
InputProperty.RequiredDistribution.HASH,
173
DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1
174
InputProperty.DamageProperty.DAMAGE_IF_INSERT
175
),
176
// Right input: hash partitioned by same join keys
177
new InputProperty(
178
InputProperty.RequiredDistribution.HASH,
179
DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1
180
InputProperty.DamageProperty.DAMAGE_IF_INSERT
181
)
182
);
183
```
184
185
## Translator Interfaces
186
187
### ExecNodeTranslator
188
189
Base interface for translating execution nodes to Flink transformations.
190
191
```java { .api }
192
public interface ExecNodeTranslator<T extends ExecNode<?>> {
193
194
/**
195
* Translates the given execution node to a Flink Transformation.
196
*/
197
Transformation<?> translate(T node, PlannerBase planner);
198
199
/**
200
* Returns the class of execution nodes this translator can handle.
201
*/
202
Class<T> getTargetClass();
203
}
204
```
205
206
### SingleTransformationTranslator
207
208
Translator interface for nodes that produce a single transformation.
209
210
```java { .api }
211
public interface SingleTransformationTranslator<T extends ExecNode<?>>
212
extends ExecNodeTranslator<T> {
213
214
/**
215
* Translates the node to a single transformation.
216
*/
217
Transformation<?> translateToSingleTransformation(T node, PlannerBase planner);
218
219
@Override
220
default Transformation<?> translate(T node, PlannerBase planner) {
221
return translateToSingleTransformation(node, planner);
222
}
223
}
224
```
225
226
### MultipleTransformationTranslator
227
228
Translator interface for nodes that produce multiple transformations.
229
230
```java { .api }
231
public interface MultipleTransformationTranslator<T extends ExecNode<?>>
232
extends ExecNodeTranslator<T> {
233
234
/**
235
* Translates the node to multiple transformations.
236
*/
237
List<Transformation<?>> translateToMultipleTransformations(T node, PlannerBase planner);
238
239
@Override
240
default Transformation<?> translate(T node, PlannerBase planner) {
241
List<Transformation<?>> transformations =
242
translateToMultipleTransformations(node, planner);
243
244
// Return union transformation if multiple outputs
245
if (transformations.size() == 1) {
246
return transformations.get(0);
247
} else {
248
return createUnionTransformation(transformations);
249
}
250
}
251
}
252
```
253
254
## Execution Plan Serialization
255
256
### ExecNodeJsonSerdeUtil
257
258
Utilities for serializing and deserializing execution nodes to/from JSON format.
259
260
```java { .api }
261
public final class ExecNodeJsonSerdeUtil {
262
263
/**
264
* Serializes an execution node to JSON string.
265
*/
266
public static String serializeExecNode(ExecNode<?> execNode);
267
268
/**
269
* Deserializes an execution node from JSON string.
270
*/
271
public static ExecNode<?> deserializeExecNode(String json, ClassLoader classLoader);
272
273
/**
274
* Serializes a list of execution nodes to JSON.
275
*/
276
public static String serializeExecNodeList(List<ExecNode<?>> execNodes);
277
278
/**
279
* Deserializes a list of execution nodes from JSON.
280
*/
281
public static List<ExecNode<?>> deserializeExecNodeList(String json, ClassLoader classLoader);
282
}
283
```
284
285
**Usage Example:**
286
287
```java
288
// Serialize execution plan for storage or transmission
289
List<ExecNode<?>> execNodes = // execution plan nodes
290
String serializedPlan = ExecNodeJsonSerdeUtil.serializeExecNodeList(execNodes);
291
292
// Store or transmit serialized plan
293
saveExecutionPlan(serializedPlan);
294
295
// Later: deserialize and execute
296
String storedPlan = loadExecutionPlan();
297
List<ExecNode<?>> deserializedNodes = ExecNodeJsonSerdeUtil.deserializeExecNodeList(
298
storedPlan,
299
classLoader
300
);
301
302
// Execute deserialized plan
303
for (ExecNode<?> node : deserializedNodes) {
304
if (node instanceof StreamExecNode) {
305
Transformation<?> transformation =
306
((StreamExecNode<?>) node).translateToPlan(planner);
307
// Add to execution environment
308
}
309
}
310
```
311
312
## Advanced Execution Node Patterns
313
314
### Composite Execution Nodes
315
316
```java
317
// Example of a composite node that combines multiple operations
318
public class CompositeExecNode implements StreamExecNode<RowData> {
319
320
private final List<ExecNode<?>> subNodes;
321
322
@Override
323
public Transformation<RowData> translateToPlan(PlannerBase planner) {
324
// Translate sub-nodes and chain them
325
Transformation<RowData> result = null;
326
327
for (ExecNode<?> subNode : subNodes) {
328
Transformation<RowData> subTransform =
329
((StreamExecNode<RowData>) subNode).translateToPlan(planner);
330
331
if (result == null) {
332
result = subTransform;
333
} else {
334
// Chain transformations
335
result = chainTransformations(result, subTransform);
336
}
337
}
338
339
return result;
340
}
341
}
342
```
343
344
### Adaptive Execution Nodes
345
346
```java
347
// Example of an adaptive node that adjusts behavior at runtime
348
public class AdaptiveJoinExecNode implements BatchExecNode<RowData> {
349
350
@Override
351
public Transformation<RowData> translateToPlan(PlannerBase planner) {
352
// Choose join strategy based on runtime statistics
353
JoinStrategy strategy = chooseJoinStrategy(
354
getInputNodes().get(0).getEstimatedRowCount(),
355
getInputNodes().get(1).getEstimatedRowCount()
356
);
357
358
switch (strategy) {
359
case BROADCAST_HASH:
360
return createBroadcastHashJoin(planner);
361
case SORT_MERGE:
362
return createSortMergeJoin(planner);
363
case NESTED_LOOP:
364
return createNestedLoopJoin(planner);
365
default:
366
throw new UnsupportedOperationException("Unknown join strategy: " + strategy);
367
}
368
}
369
}
370
```
371
372
## Error Handling
373
374
Execution nodes should provide robust error handling:
375
376
```java
377
@Override
378
public Transformation<RowData> translateToPlan(PlannerBase planner) {
379
try {
380
// Validate inputs
381
validateInputNodes();
382
383
// Create transformation
384
return createTransformation(planner);
385
386
} catch (Exception e) {
387
throw new TableException(
388
String.format("Failed to translate ExecNode %s: %s",
389
getDescription(), e.getMessage()),
390
e
391
);
392
}
393
}
394
395
private void validateInputNodes() {
396
if (getInputNodes().isEmpty()) {
397
throw new IllegalStateException("ExecNode requires at least one input");
398
}
399
400
for (int i = 0; i < getInputNodes().size(); i++) {
401
ExecNode<?> input = getInputNodes().get(i);
402
InputProperty expectedProperty = getInputProperties().get(i);
403
404
// Validate input compatibility
405
if (!isCompatible(input.getOutputType(), expectedProperty)) {
406
throw new ValidationException(
407
String.format("Input %d is not compatible with required property %s",
408
i, expectedProperty)
409
);
410
}
411
}
412
}
413
```