or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md

execution-nodes.mddocs/

0

# Execution Nodes

1

2

Execution nodes represent the building blocks of Flink's query execution plans. The ExecNode hierarchy provides interfaces and abstractions for translating optimized relational plans into executable Flink transformations, supporting both streaming and batch processing modes.

3

4

## Package Information

5

6

```java

7

import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;

8

import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeTranslator;

9

import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;

10

import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;

11

import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;

12

import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;

13

import org.apache.flink.api.dag.Transformation;

14

import org.apache.flink.table.types.logical.LogicalType;

15

import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;

16

import org.apache.flink.table.delegation.Planner;

17

```

18

19

## Capabilities

20

21

### ExecNode Base Interface

22

23

Core interface for all execution nodes in the query execution plan.

24

25

```java { .api }

26

public interface ExecNode<T> extends ExecNodeTranslator<T> {

27

28

/**

29

* Gets the ID of this node.

30

*/

31

int getId();

32

33

/**

34

* Returns a string which describes this node.

35

*/

36

String getDescription();

37

38

/**

39

* Returns the output LogicalType of this node, this type should be consistent with the

40

* type parameter T.

41

*/

42

LogicalType getOutputType();

43

44

/**

45

* Returns a list of this node's input properties.

46

* If there are no inputs, returns an empty list, not null.

47

*/

48

List<InputProperty> getInputProperties();

49

50

/**

51

* Returns a list of this node's input ExecEdges.

52

* If there are no inputs, returns an empty list, not null.

53

*/

54

List<ExecEdge> getInputEdges();

55

56

/**

57

* Sets the input ExecEdges which connect this nodes and its input nodes.

58

* If there are no inputs, the given inputEdges should be empty, not null.

59

*/

60

void setInputEdges(List<ExecEdge> inputEdges);

61

62

/**

63

* Replaces the ordinalInParent-th input edge.

64

*

65

* @param index Position of the child input edge, 0 is the first.

66

* @param newInputEdge New edge that should be put at position `index`.

67

*/

68

void replaceInputEdge(int index, ExecEdge newInputEdge);

69

70

/**

71

* Accepts a visit from a ExecNodeVisitor.

72

*/

73

void accept(ExecNodeVisitor visitor);

74

}

75

```

76

77

The `ExecNode` interface serves as the foundation for all execution nodes, providing:

78

- **Plan Structure**: Methods to navigate and modify the execution plan tree via ExecEdges

79

- **Type Information**: Access to logical output types for type checking

80

- **Translation**: Inherits from ExecNodeTranslator to convert nodes to Transformations

81

- **Properties**: Required physical properties for input validation

82

- **Visitor Pattern**: Support for traversing execution plan graphs

83

84

### StreamExecNode Interface

85

86

Base interface for stream execution nodes. This is a simple marker interface that extends ExecNode.

87

88

```java { .api }

89

public interface StreamExecNode<T> extends ExecNode<T> {}

90

```

91

92

StreamExecNode serves as a base interface for all streaming execution nodes. It inherits all functionality from ExecNode and ExecNodeTranslator, including the `translateToPlan(Planner planner)` method for converting to Flink transformations.

93

94

### BatchExecNode Interface

95

96

Base interface for batch execution nodes. This is a simple marker interface that extends ExecNode.

97

98

```java { .api }

99

public interface BatchExecNode<T> extends ExecNode<T> {}

100

```

101

102

BatchExecNode serves as a base interface for all batch execution nodes. Like StreamExecNode, it inherits all functionality from ExecNode and ExecNodeTranslator for plan translation.

103

104

### ExecNodeTranslator Interface

105

106

Interface responsible for translating ExecNodes to Flink Transformations.

107

108

```java { .api }

109

public interface ExecNodeTranslator<T> {

110

111

/**

112

* Translates this node into a Transformation.

113

*

114

* NOTE: This method should return same translate result if called multiple times.

115

*

116

* @param planner The Planner of the translated Table.

117

*/

118

Transformation<T> translateToPlan(Planner planner);

119

}

120

```

121

122

This interface is extended by ExecNode, providing the core translation functionality that converts execution plan nodes into executable Flink transformations.

123

124

### InputProperty

125

126

Describes the physical properties required for an execution node's input.

127

128

```java { .api }

129

public class InputProperty {

130

131

/**

132

* The required data distribution for the input.

133

*/

134

public enum RequiredDistribution {

135

SINGLETON, // Single partition (all data on one node)

136

BROADCAST, // Broadcast to all nodes

137

HASH, // Hash partition by specific keys

138

RANGE, // Range partition by specific keys

139

UNKNOWN // No specific requirement

140

}

141

142

/**

143

* The damage properties that indicate how changes propagate.

144

*/

145

public enum DamageProperty {

146

NO_DAMAGE, // Changes don't affect output

147

DAMAGE_IF_INSERT, // Inserts may affect output

148

DAMAGE_IF_UPDATE, // Updates may affect output

149

DAMAGE_IF_DELETE // Deletes may affect output

150

}

151

152

// Constructor and accessor methods

153

public InputProperty(

154

RequiredDistribution requiredDistribution,

155

DistributionSpec distributionSpec,

156

DamageProperty damageProperty

157

);

158

159

public RequiredDistribution getRequiredDistribution();

160

public DistributionSpec getDistributionSpec();

161

public DamageProperty getDamageProperty();

162

}

163

```

164

165

**Usage Example:**

166

167

```java

168

// Define input properties for a join operation

169

List<InputProperty> inputProperties = Arrays.asList(

170

// Left input: hash partitioned by join keys

171

new InputProperty(

172

InputProperty.RequiredDistribution.HASH,

173

DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1

174

InputProperty.DamageProperty.DAMAGE_IF_INSERT

175

),

176

// Right input: hash partitioned by same join keys

177

new InputProperty(

178

InputProperty.RequiredDistribution.HASH,

179

DistributionSpec.hash(Arrays.asList(0, 1)), // Hash on columns 0,1

180

InputProperty.DamageProperty.DAMAGE_IF_INSERT

181

)

182

);

183

```

184

185

## Translator Interfaces

186

187

### ExecNodeTranslator

188

189

Base interface for translating execution nodes to Flink transformations.

190

191

```java { .api }

192

public interface ExecNodeTranslator<T extends ExecNode<?>> {

193

194

/**

195

* Translates the given execution node to a Flink Transformation.

196

*/

197

Transformation<?> translate(T node, PlannerBase planner);

198

199

/**

200

* Returns the class of execution nodes this translator can handle.

201

*/

202

Class<T> getTargetClass();

203

}

204

```

205

206

### SingleTransformationTranslator

207

208

Translator interface for nodes that produce a single transformation.

209

210

```java { .api }

211

public interface SingleTransformationTranslator<T extends ExecNode<?>>

212

extends ExecNodeTranslator<T> {

213

214

/**

215

* Translates the node to a single transformation.

216

*/

217

Transformation<?> translateToSingleTransformation(T node, PlannerBase planner);

218

219

@Override

220

default Transformation<?> translate(T node, PlannerBase planner) {

221

return translateToSingleTransformation(node, planner);

222

}

223

}

224

```

225

226

### MultipleTransformationTranslator

227

228

Translator interface for nodes that produce multiple transformations.

229

230

```java { .api }

231

public interface MultipleTransformationTranslator<T extends ExecNode<?>>

232

extends ExecNodeTranslator<T> {

233

234

/**

235

* Translates the node to multiple transformations.

236

*/

237

List<Transformation<?>> translateToMultipleTransformations(T node, PlannerBase planner);

238

239

@Override

240

default Transformation<?> translate(T node, PlannerBase planner) {

241

List<Transformation<?>> transformations =

242

translateToMultipleTransformations(node, planner);

243

244

// Return union transformation if multiple outputs

245

if (transformations.size() == 1) {

246

return transformations.get(0);

247

} else {

248

return createUnionTransformation(transformations);

249

}

250

}

251

}

252

```

253

254

## Execution Plan Serialization

255

256

### ExecNodeJsonSerdeUtil

257

258

Utilities for serializing and deserializing execution nodes to/from JSON format.

259

260

```java { .api }

261

public final class ExecNodeJsonSerdeUtil {

262

263

/**

264

* Serializes an execution node to JSON string.

265

*/

266

public static String serializeExecNode(ExecNode<?> execNode);

267

268

/**

269

* Deserializes an execution node from JSON string.

270

*/

271

public static ExecNode<?> deserializeExecNode(String json, ClassLoader classLoader);

272

273

/**

274

* Serializes a list of execution nodes to JSON.

275

*/

276

public static String serializeExecNodeList(List<ExecNode<?>> execNodes);

277

278

/**

279

* Deserializes a list of execution nodes from JSON.

280

*/

281

public static List<ExecNode<?>> deserializeExecNodeList(String json, ClassLoader classLoader);

282

}

283

```

284

285

**Usage Example:**

286

287

```java

288

// Serialize execution plan for storage or transmission

289

List<ExecNode<?>> execNodes = // execution plan nodes

290

String serializedPlan = ExecNodeJsonSerdeUtil.serializeExecNodeList(execNodes);

291

292

// Store or transmit serialized plan

293

saveExecutionPlan(serializedPlan);

294

295

// Later: deserialize and execute

296

String storedPlan = loadExecutionPlan();

297

List<ExecNode<?>> deserializedNodes = ExecNodeJsonSerdeUtil.deserializeExecNodeList(

298

storedPlan,

299

classLoader

300

);

301

302

// Execute deserialized plan

303

for (ExecNode<?> node : deserializedNodes) {

304

if (node instanceof StreamExecNode) {

305

Transformation<?> transformation =

306

((StreamExecNode<?>) node).translateToPlan(planner);

307

// Add to execution environment

308

}

309

}

310

```

311

312

## Advanced Execution Node Patterns

313

314

### Composite Execution Nodes

315

316

```java

317

// Example of a composite node that combines multiple operations

318

public class CompositeExecNode implements StreamExecNode<RowData> {

319

320

private final List<ExecNode<?>> subNodes;

321

322

@Override

323

public Transformation<RowData> translateToPlan(PlannerBase planner) {

324

// Translate sub-nodes and chain them

325

Transformation<RowData> result = null;

326

327

for (ExecNode<?> subNode : subNodes) {

328

Transformation<RowData> subTransform =

329

((StreamExecNode<RowData>) subNode).translateToPlan(planner);

330

331

if (result == null) {

332

result = subTransform;

333

} else {

334

// Chain transformations

335

result = chainTransformations(result, subTransform);

336

}

337

}

338

339

return result;

340

}

341

}

342

```

343

344

### Adaptive Execution Nodes

345

346

```java

347

// Example of an adaptive node that adjusts behavior at runtime

348

public class AdaptiveJoinExecNode implements BatchExecNode<RowData> {

349

350

@Override

351

public Transformation<RowData> translateToPlan(PlannerBase planner) {

352

// Choose join strategy based on runtime statistics

353

JoinStrategy strategy = chooseJoinStrategy(

354

getInputNodes().get(0).getEstimatedRowCount(),

355

getInputNodes().get(1).getEstimatedRowCount()

356

);

357

358

switch (strategy) {

359

case BROADCAST_HASH:

360

return createBroadcastHashJoin(planner);

361

case SORT_MERGE:

362

return createSortMergeJoin(planner);

363

case NESTED_LOOP:

364

return createNestedLoopJoin(planner);

365

default:

366

throw new UnsupportedOperationException("Unknown join strategy: " + strategy);

367

}

368

}

369

}

370

```

371

372

## Error Handling

373

374

Execution nodes should provide robust error handling:

375

376

```java

377

@Override

378

public Transformation<RowData> translateToPlan(PlannerBase planner) {

379

try {

380

// Validate inputs

381

validateInputNodes();

382

383

// Create transformation

384

return createTransformation(planner);

385

386

} catch (Exception e) {

387

throw new TableException(

388

String.format("Failed to translate ExecNode %s: %s",

389

getDescription(), e.getMessage()),

390

e

391

);

392

}

393

}

394

395

private void validateInputNodes() {

396

if (getInputNodes().isEmpty()) {

397

throw new IllegalStateException("ExecNode requires at least one input");

398

}

399

400

for (int i = 0; i < getInputNodes().size(); i++) {

401

ExecNode<?> input = getInputNodes().get(i);

402

InputProperty expectedProperty = getInputProperties().get(i);

403

404

// Validate input compatibility

405

if (!isCompatible(input.getOutputType(), expectedProperty)) {

406

throw new ValidationException(

407

String.format("Input %d is not compatible with required property %s",

408

i, expectedProperty)

409

);

410

}

411

}

412

}

413

```