or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdrecovery-fault-tolerance.mdscala-testing.mdstate-migration.mdtest-base-classes.mdtest-utilities.md

test-utilities.mddocs/

0

# Test Utilities

1

2

Core utility classes and helper functions for common testing operations including execution handling, result collection, test coordination mechanisms, and data structures for testing scenarios.

3

4

## Capabilities

5

6

### TestUtils

7

8

General test utilities for Flink job execution with specialized exception handling.

9

10

```java { .api }

11

/**

12

* Test utilities for Flink job execution

13

*/

14

public class TestUtils {

15

16

/**

17

* Execute StreamExecutionEnvironment handling SuccessException for controlled test termination

18

* Searches through exception causes to find nested SuccessExceptions and treats them as success

19

* @param see StreamExecutionEnvironment to execute

20

* @param name Name for the execution job

21

* @return JobExecutionResult if job completes normally, null if terminated via SuccessException

22

* @throws Exception if job fails with non-SuccessException

23

*/

24

public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception;

25

}

26

```

27

28

**Usage Example:**

29

30

```java

31

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

32

env.fromElements(1, 2, 3)

33

.map(x -> {

34

if (x == 3) throw new SuccessException(); // Signal test completion

35

return x * 2;

36

})

37

.print();

38

39

// This will catch SuccessException and return null, indicating successful test completion

40

JobExecutionResult result = TestUtils.tryExecute(env, "Test Job");

41

```

42

43

### SuccessException

44

45

Exception class used to signal successful test completion in controlled termination scenarios.

46

47

```java { .api }

48

/**

49

* Exception thrown to terminate a program and indicate success

50

* Used in conjunction with TestUtils.tryExecute() for controlled test termination

51

*/

52

public class SuccessException extends RuntimeException {

53

public SuccessException();

54

public SuccessException(String message);

55

}

56

```

57

58

### TestListResultSink

59

60

Thread-safe sink for collecting test results into an on-heap list with result retrieval methods.

61

62

```java { .api }

63

/**

64

* Thread-safe sink for collecting elements into an on-heap list

65

* Uses TestListWrapper for managing result storage across test executions

66

*/

67

public class TestListResultSink<T> extends RichSinkFunction<T> {

68

69

/**

70

* Get collected results as unordered list

71

* @return List of collected elements in collection order

72

*/

73

public List<T> getResult();

74

75

/**

76

* Get collected results sorted using natural ordering

77

* @return Sorted list of collected elements

78

*/

79

public List<T> getSortedResult();

80

81

/**

82

* Collect element into result list (thread-safe)

83

* @param value Element to collect

84

* @param context Sink context

85

*/

86

@Override

87

public void invoke(T value, Context context) throws Exception;

88

}

89

```

90

91

**Usage Example:**

92

93

```java

94

public class MyTest extends StreamFaultToleranceTestBase {

95

96

private TestListResultSink<Integer> resultSink = new TestListResultSink<>();

97

98

@Override

99

public void testProgram(StreamExecutionEnvironment env) {

100

env.fromElements(3, 1, 4, 1, 5)

101

.map(x -> x * 2)

102

.addSink(resultSink);

103

}

104

105

@Override

106

public void postSubmit() throws Exception {

107

List<Integer> results = resultSink.getResult();

108

assertEquals(Arrays.asList(6, 2, 8, 2, 10), results);

109

110

List<Integer> sortedResults = resultSink.getSortedResult();

111

assertEquals(Arrays.asList(2, 2, 6, 8, 10), sortedResults);

112

}

113

}

114

```

115

116

### TestListWrapper

117

118

Singleton catalog for managing test result lists with thread-safe operations for concurrent test access.

119

120

```java { .api }

121

/**

122

* Singleton catalog for lists stored by TestListResultSink

123

* Provides thread-safe operations for managing test result collections

124

*/

125

public class TestListWrapper {

126

127

/**

128

* Create new result list and return unique identifier

129

* @return Unique list ID for retrieval

130

*/

131

public static int createList();

132

133

/**

134

* Retrieve result list by unique identifier

135

* @param listId Unique identifier returned by createList()

136

* @return List of collected results

137

*/

138

public static <T> List<T> getList(int listId);

139

140

/**

141

* Clear all stored lists (for test cleanup)

142

*/

143

public static void clearAllLists();

144

}

145

```

146

147

### Tokenizer

148

149

FlatMap function for splitting strings into word count tuples, commonly used in testing scenarios.

150

151

```java { .api }

152

/**

153

* FlatMap function for splitting strings into word count tuples

154

* Commonly used in testing word count and text processing scenarios

155

*/

156

public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

157

158

/**

159

* Split input line into words and emit (word, 1) tuples

160

* @param value Input string to tokenize

161

* @param out Collector for emitting word count tuples

162

*/

163

@Override

164

public void flatMap(String value, Collector<Tuple2<String, Integer>> out);

165

}

166

```

167

168

**Usage Example:**

169

170

```java

171

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

172

env.fromElements("hello world", "hello flink", "world of streaming")

173

.flatMap(new Tokenizer())

174

.keyBy(tuple -> tuple.f0)

175

.sum(1)

176

.print();

177

// Output: (hello,1), (world,1), (hello,2), (flink,1), (world,2), (of,1), (streaming,1)

178

```

179

180

## Configuration Classes

181

182

### GeneratorConfiguration

183

184

Configuration class for event generators in windowing tests with lateness and timing controls.

185

186

```java { .api }

187

/**

188

* Configuration for event generators in windowing tests

189

* Controls event timing, lateness behavior, and session gaps

190

*/

191

public class GeneratorConfiguration {

192

193

/** Allowed lateness in milliseconds */

194

public final long allowedLateness;

195

196

/** Number of late events within lateness per session */

197

public final int lateEventsWithinLateness;

198

199

/** Number of late events after lateness per session */

200

public final int lateEventsAfterLateness;

201

202

/** Maximum additional gap between event times */

203

public final long maxAdditionalSessionGap;

204

205

/**

206

* Create generator configuration

207

* @param allowedLateness Allowed lateness in milliseconds

208

* @param lateEventsWithinLateness Late events within lateness per session

209

* @param lateEventsAfterLateness Late events after lateness per session

210

* @param maxAdditionalSessionGap Maximum additional session gap

211

*/

212

public GeneratorConfiguration(long allowedLateness, int lateEventsWithinLateness,

213

int lateEventsAfterLateness, long maxAdditionalSessionGap);

214

}

215

```

216

217

### SessionGeneratorConfiguration

218

219

Specialized configuration for session window testing scenarios.

220

221

```java { .api }

222

/**

223

* Configuration for session window event generation

224

* Extends GeneratorConfiguration with session-specific parameters

225

*/

226

public class SessionGeneratorConfiguration extends GeneratorConfiguration {

227

// Session-specific configuration parameters

228

}

229

```

230

231

### SessionConfiguration

232

233

Configuration class for individual session parameters in windowing tests.

234

235

```java { .api }

236

/**

237

* Configuration for individual session parameters in windowing tests

238

* Defines session boundaries, event counts, and timing characteristics

239

*/

240

public class SessionConfiguration {

241

// Session boundary and timing configuration

242

}

243

```

244

245

## Utility Constants

246

247

### Common Test Constants

248

249

```java { .api }

250

/**

251

* Standard parallelism settings used across test base classes

252

*/

253

public static final int STANDARD_PARALLELISM = 4;

254

public static final int HIGH_PARALLELISM = 12;

255

public static final int NUM_TASK_MANAGERS = 3;

256

public static final int NUM_TASK_SLOTS = 4;

257

258

/**

259

* Timeout and deadline configurations

260

*/

261

public static final int DEFAULT_TIMEOUT_MINUTES = 5;

262

public static final int DEFAULT_DEADLINE_MINUTES = 2;

263

264

/**

265

* File coordination patterns for process-based testing

266

*/

267

public static final String READY_MARKER_FILE_PREFIX = "ready-";

268

public static final String PROCEED_MARKER_FILE = "proceed";

269

public static final String FINISH_MARKER_FILE_PREFIX = "finish-";

270

```

271

272

## Integration Patterns

273

274

### Result Collection Pattern

275

276

The typical pattern for collecting and verifying test results:

277

278

```java

279

// 1. Create result sink

280

TestListResultSink<MyType> resultSink = new TestListResultSink<>();

281

282

// 2. Add to topology

283

env.source()

284

.transform()

285

.addSink(resultSink);

286

287

// 3. Verify results in postSubmit()

288

List<MyType> results = resultSink.getResult();

289

assertThat(results, hasSize(expectedSize));

290

assertThat(results, containsInAnyOrder(expectedElements));

291

```

292

293

### Exception-based Test Control

294

295

Using SuccessException for controlled test termination:

296

297

```java

298

env.addSource(new SourceFunction<Integer>() {

299

@Override

300

public void run(SourceContext<Integer> ctx) throws Exception {

301

for (int i = 0; i < 100; i++) {

302

ctx.collect(i);

303

if (i == 50) throw new SuccessException("Reached target");

304

}

305

}

306

307

@Override

308

public void cancel() {}

309

});

310

311

TestUtils.tryExecute(env, "Controlled Test");

312

```

313

314

### Thread-Safe Result Access

315

316

TestListWrapper enables safe result access across multiple test threads:

317

318

```java

319

// Thread 1: Collect results

320

int listId = TestListWrapper.createList();

321

// Add elements to list via TestListResultSink

322

323

// Thread 2: Access results

324

List<String> results = TestListWrapper.getList(listId);

325

// Process results safely

326

```