or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-completeness.mdcheckpointing.mddata-generation.mdexecution.mdfault-tolerance.mdindex.mdstreaming.md

execution.mddocs/

0

# Execution Utilities

1

2

Core utilities for executing test jobs with proper exception handling and result validation. These utilities handle the complexities of test execution in Flink environments and provide consistent patterns for test success validation.

3

4

## Core Execution Utilities

5

6

### TestUtils

7

8

Primary utility class for executing streaming jobs with proper exception handling and success validation.

9

10

```java { .api }

11

public class TestUtils {

12

public static JobExecutionResult tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception;

13

}

14

```

15

16

The `tryExecute` method:

17

- Executes the streaming job with the given name

18

- Catches `ProgramInvocationException` and `JobExecutionException`

19

- Searches for nested `SuccessException` to determine test success

20

- Fails the test if no `SuccessException` is found in the exception chain

21

- Returns `JobExecutionResult` on successful completion or `null` if terminated by `SuccessException`

22

23

### SuccessException

24

25

Custom exception used to indicate successful test completion and controlled program termination.

26

27

```java { .api }

28

public class SuccessException extends Exception {

29

public SuccessException();

30

}

31

```

32

33

This exception is typically thrown by:

34

- Custom sink functions when expected results are achieved

35

- Map/filter functions when target conditions are met

36

- Source functions when sufficient data has been processed

37

38

## Test Execution Patterns

39

40

### Controlled Termination Pattern

41

42

Use `SuccessException` to terminate streaming jobs when test conditions are met:

43

44

```java

45

public class ValidatingFunction implements MapFunction<Integer, Integer> {

46

private int processedCount = 0;

47

private final int targetCount;

48

49

public ValidatingFunction(int targetCount) {

50

this.targetCount = targetCount;

51

}

52

53

@Override

54

public Integer map(Integer value) throws Exception {

55

processedCount++;

56

57

// Perform validation logic

58

if (value < 0) {

59

throw new RuntimeException("Invalid negative value");

60

}

61

62

// Terminate successfully when target reached

63

if (processedCount >= targetCount) {

64

throw new SuccessException("Processed " + targetCount + " elements successfully");

65

}

66

67

return value * 2;

68

}

69

}

70

```

71

72

### Result Collection with Success Validation

73

74

```java

75

public class CollectingValidatingSink<T> implements SinkFunction<T> {

76

private final List<T> results = new ArrayList<>();

77

private final int expectedCount;

78

79

public CollectingValidatingSink(int expectedCount) {

80

this.expectedCount = expectedCount;

81

}

82

83

@Override

84

public void invoke(T value) throws Exception {

85

synchronized (results) {

86

results.add(value);

87

88

// Validate intermediate results

89

validateElement(value);

90

91

// Terminate when collection is complete

92

if (results.size() >= expectedCount) {

93

validateFinalResults();

94

throw new SuccessException("Successfully collected " + expectedCount + " elements");

95

}

96

}

97

}

98

99

private void validateElement(T value) throws Exception {

100

// Element-level validation logic

101

}

102

103

private void validateFinalResults() throws Exception {

104

// Final result validation logic

105

}

106

}

107

```

108

109

## Usage Examples

110

111

### Basic Streaming Job Execution

112

113

```java

114

@Test

115

public void testBasicStreamingExecution() throws Exception {

116

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

117

env.setParallelism(1);

118

119

env.fromElements(1, 2, 3, 4, 5)

120

.map(x -> x * 2)

121

.addSink(new PrintSinkFunction<>());

122

123

// Execute with proper exception handling

124

JobExecutionResult result = TestUtils.tryExecute(env, "Basic Streaming Test");

125

126

assertNotNull("Job should complete successfully", result);

127

}

128

```

129

130

### Success Exception Pattern

131

132

```java

133

@Test

134

public void testSuccessExceptionPattern() throws Exception {

135

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

136

env.setParallelism(1);

137

138

env.fromSequence(1, Long.MAX_VALUE) // Infinite source

139

.map(new ValidatingFunction(100)) // Terminates after 100 elements

140

.addSink(new DiscardingSink<>());

141

142

// Job will terminate via SuccessException

143

JobExecutionResult result = TestUtils.tryExecute(env, "Success Exception Test");

144

145

// Result will be null when terminated by SuccessException

146

assertNull("Job should terminate via SuccessException", result);

147

}

148

```

149

150

### Comprehensive Result Validation

151

152

```java

153

@Test

154

public void testComprehensiveValidation() throws Exception {

155

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

156

env.setParallelism(2);

157

158

List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

159

160

env.fromCollection(inputData)

161

.keyBy(x -> x % 2) // Partition by even/odd

162

.map(x -> x * x) // Square each number

163

.addSink(new ValidatingResultSink(inputData.size()));

164

165

// Execute with validation

166

TestUtils.tryExecute(env, "Comprehensive Validation Test");

167

}

168

169

class ValidatingResultSink implements SinkFunction<Integer> {

170

private final Set<Integer> receivedValues = new HashSet<>();

171

private final int expectedCount;

172

173

public ValidatingResultSink(int expectedCount) {

174

this.expectedCount = expectedCount;

175

}

176

177

@Override

178

public void invoke(Integer value) throws Exception {

179

synchronized (receivedValues) {

180

// Validate value is a perfect square

181

int sqrt = (int) Math.sqrt(value);

182

if (sqrt * sqrt != value) {

183

throw new RuntimeException("Value " + value + " is not a perfect square");

184

}

185

186

receivedValues.add(value);

187

188

if (receivedValues.size() >= expectedCount) {

189

// Validate we received all expected squares

190

Set<Integer> expectedSquares = Set.of(1, 4, 9, 16, 25, 36, 49, 64, 81, 100);

191

if (!receivedValues.equals(expectedSquares)) {

192

throw new RuntimeException("Received values don't match expected squares");

193

}

194

195

throw new SuccessException("All " + expectedCount + " squares validated successfully");

196

}

197

}

198

}

199

}

200

```

201

202

### Timeout and Error Handling

203

204

```java

205

@Test

206

public void testExecutionWithTimeout() throws Exception {

207

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

208

env.setParallelism(1);

209

210

// Job that should complete within reasonable time

211

env.fromSequence(1, 1000)

212

.map(new SlowProcessingFunction(10)) // 10ms per element

213

.addSink(new TimeoutValidatingSink(5000)); // 5 second timeout

214

215

long startTime = System.currentTimeMillis();

216

217

try {

218

TestUtils.tryExecute(env, "Timeout Test");

219

} catch (Exception e) {

220

long duration = System.currentTimeMillis() - startTime;

221

assertTrue("Test should complete within timeout", duration < 10000);

222

throw e;

223

}

224

}

225

226

class TimeoutValidatingSink implements SinkFunction<Integer> {

227

private final long timeoutMs;

228

private final long startTime;

229

private int elementCount;

230

231

public TimeoutValidatingSink(long timeoutMs) {

232

this.timeoutMs = timeoutMs;

233

this.startTime = System.currentTimeMillis();

234

}

235

236

@Override

237

public void invoke(Integer value) throws Exception {

238

elementCount++;

239

240

long elapsed = System.currentTimeMillis() - startTime;

241

if (elapsed > timeoutMs) {

242

throw new RuntimeException("Test exceeded timeout of " + timeoutMs + "ms");

243

}

244

245

// Terminate after processing reasonable amount

246

if (elementCount >= 100) {

247

throw new SuccessException("Processed " + elementCount + " elements within timeout");

248

}

249

}

250

}

251

```

252

253

### Parallel Execution Validation

254

255

```java

256

@Test

257

public void testParallelExecutionValidation() throws Exception {

258

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

259

env.setParallelism(4);

260

261

env.fromSequence(1, 1000)

262

.map(new ParallelValidatingFunction())

263

.keyBy(x -> x % 4) // Distribute across 4 partitions

264

.addSink(new ParallelResultSink());

265

266

TestUtils.tryExecute(env, "Parallel Execution Test");

267

}

268

269

class ParallelValidatingFunction implements MapFunction<Long, String> {

270

@Override

271

public String map(Long value) throws Exception {

272

// Include subtask information for validation

273

int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();

274

return subtaskIndex + ":" + value;

275

}

276

}

277

278

class ParallelResultSink implements SinkFunction<String> {

279

private static final AtomicInteger totalReceived = new AtomicInteger(0);

280

private static final Set<Integer> activeSubtasks = ConcurrentHashMap.newKeySet();

281

282

@Override

283

public void invoke(String value) throws Exception {

284

// Parse subtask index

285

int subtaskIndex = Integer.parseInt(value.split(":")[0]);

286

activeSubtasks.add(subtaskIndex);

287

288

int received = totalReceived.incrementAndGet();

289

290

// Validate parallel execution

291

if (received >= 1000) {

292

if (activeSubtasks.size() < 4) {

293

throw new RuntimeException("Not all subtasks were active: " + activeSubtasks);

294

}

295

296

throw new SuccessException("Successfully validated parallel execution across " +

297

activeSubtasks.size() + " subtasks");

298

}

299

}

300

}

301

```

302

303

### Error Recovery Testing

304

305

```java

306

@Test

307

public void testErrorRecovery() throws Exception {

308

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

309

env.setParallelism(1);

310

env.enableCheckpointing(1000);

311

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 500));

312

313

env.fromSequence(1, 100)

314

.map(new RecoveringFunction())

315

.addSink(new RecoveryValidatingSink());

316

317

// Should succeed after recovery attempts

318

TestUtils.tryExecute(env, "Error Recovery Test");

319

}

320

321

class RecoveringFunction implements MapFunction<Long, Long> {

322

private static int attemptCount = 0;

323

324

@Override

325

public Long map(Long value) throws Exception {

326

// Fail on first attempt for certain values

327

if (attemptCount == 0 && value == 50) {

328

attemptCount++;

329

throw new RuntimeException("Simulated failure at value " + value);

330

}

331

332

return value;

333

}

334

}

335

```