or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-completeness.mdcheckpointing.mddata-generation.mdexecution.mdfault-tolerance.mdindex.mdstreaming.md

streaming.mddocs/

0

# Streaming Utilities

1

2

Specialized components for testing streaming operations including output selectors, result collection, stream partitioning utilities, and no-op functions for performance testing.

3

4

## Output Selectors

5

6

### EvenOddOutputSelector

7

8

Output selector for splitting integer streams based on even/odd values, commonly used for stream partitioning tests.

9

10

```java { .api }

11

public class EvenOddOutputSelector implements OutputSelector<Integer> {

12

@Override

13

public Iterable<String> select(Integer value);

14

}

15

```

16

17

The selector returns:

18

- `"even"` for even numbers (value % 2 == 0)

19

- `"odd"` for odd numbers (value % 2 != 0)

20

21

## Result Collection and Validation

22

23

### TestListResultSink

24

25

Thread-safe sink function that collects streaming results into a list for test verification.

26

27

```java { .api }

28

public class TestListResultSink<T> extends RichSinkFunction<T> {

29

private int resultListId;

30

31

public TestListResultSink();

32

33

@Override

34

public void open(Configuration parameters) throws Exception;

35

36

@Override

37

public void invoke(T value) throws Exception;

38

39

@Override

40

public void close() throws Exception;

41

42

public List<T> getResult();

43

public List<T> getSortedResult();

44

}

45

```

46

47

### TestListWrapper

48

49

Singleton utility providing thread-safe access to multiple test result collections. Used internally by TestListResultSink for managing concurrent result collection.

50

51

```java { .api }

52

public class TestListWrapper {

53

private List<List<? extends Comparable>> lists;

54

55

private TestListWrapper();

56

57

public static TestListWrapper getInstance();

58

public int createList();

59

public List<?> getList(int listId);

60

}

61

```

62

63

### ReceiveCheckNoOpSink

64

65

Sink that validates element reception and asserts at least one element was received during close(), useful for ensuring data flow.

66

67

```java { .api }

68

public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {

69

private List<T> received;

70

71

public ReceiveCheckNoOpSink();

72

73

@Override

74

public void open(Configuration conf);

75

76

@Override

77

public void invoke(T tuple);

78

79

@Override

80

public void close();

81

}

82

```

83

84

## Processing Functions

85

86

### NoOpIntMap

87

88

Pass-through map function for integers, used for pipeline testing without transformation overhead.

89

90

```java { .api }

91

public class NoOpIntMap implements MapFunction<Integer, Integer> {

92

@Override

93

public Integer map(Integer value) throws Exception;

94

}

95

```

96

97

## Usage Examples

98

99

### Stream Partitioning with Output Selector

100

101

```java

102

@Test

103

public void testStreamPartitioning() throws Exception {

104

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

105

env.setParallelism(1);

106

107

// Create source data

108

DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

109

110

// Split stream using EvenOddOutputSelector

111

SplitStream<Integer> splitStream = source.split(new EvenOddOutputSelector());

112

113

// Process even and odd streams separately

114

DataStream<Integer> evenStream = splitStream.select("even");

115

DataStream<Integer> oddStream = splitStream.select("odd");

116

117

// Collect results for verification

118

TestListResultSink<Integer> evenSink = new TestListResultSink<>();

119

TestListResultSink<Integer> oddSink = new TestListResultSink<>();

120

121

evenStream.addSink(evenSink);

122

oddStream.addSink(oddSink);

123

124

// Execute job

125

TestUtils.tryExecute(env, "Stream Partitioning Test");

126

127

// Verify results

128

List<Integer> evenResults = evenSink.getResult();

129

List<Integer> oddResults = oddSink.getResult();

130

131

assertEquals("Should have 5 even numbers", 5, evenResults.size());

132

assertEquals("Should have 5 odd numbers", 5, oddResults.size());

133

134

// Verify all even numbers are actually even

135

assertTrue("All results should be even",

136

evenResults.stream().allMatch(x -> x % 2 == 0));

137

assertTrue("All results should be odd",

138

oddResults.stream().allMatch(x -> x % 2 == 1));

139

}

140

```

141

142

### Result Collection and Validation

143

144

```java

145

@Test

146

public void testResultCollection() throws Exception {

147

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

148

env.setParallelism(1);

149

150

// Create test data

151

List<String> testData = Arrays.asList("hello", "world", "flink", "streaming");

152

DataStream<String> source = env.fromCollection(testData);

153

154

// Transform data

155

DataStream<String> transformed = source

156

.map(String::toUpperCase)

157

.filter(s -> s.length() > 4);

158

159

// Collect results

160

TestListResultSink<String> resultSink = new TestListResultSink<>();

161

transformed.addSink(resultSink);

162

163

// Execute

164

TestUtils.tryExecute(env, "Result Collection Test");

165

166

// Validate results

167

List<String> results = resultSink.getResult();

168

169

assertEquals("Should have filtered results", 2, results.size());

170

assertTrue("Should contain HELLO", results.contains("HELLO"));

171

assertTrue("Should contain WORLD", results.contains("WORLD"));

172

assertTrue("Should contain FLINK", results.contains("FLINK"));

173

assertTrue("Should contain STREAMING", results.contains("STREAMING"));

174

}

175

```

176

177

### Performance Testing with No-Op Functions

178

179

```java

180

@Test

181

public void testStreamingPerformance() throws Exception {

182

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

183

env.setParallelism(4);

184

185

// Generate large dataset

186

DataStream<Integer> source = env.fromSequence(1, 1000000);

187

188

// Apply no-op transformations to test overhead

189

DataStream<Integer> processed = source

190

.map(new NoOpIntMap()) // No-op transformation

191

.keyBy(x -> x % 100) // Partition by key

192

.map(new NoOpIntMap()) // Another no-op

193

.filter(x -> true); // Pass-through filter

194

195

// Use no-op sink to measure throughput

196

ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>();

197

processed.addSink(sink);

198

199

// Measure execution time

200

long startTime = System.currentTimeMillis();

201

TestUtils.tryExecute(env, "Performance Test");

202

long duration = System.currentTimeMillis() - startTime;

203

204

// Verify throughput

205

long processedCount = sink.getReceivedCount();

206

assertEquals("Should process all elements", 1000000, processedCount);

207

208

double throughput = processedCount / (duration / 1000.0);

209

System.out.println("Throughput: " + throughput + " elements/second");

210

211

// Assert minimum throughput requirement

212

assertTrue("Throughput should be reasonable", throughput > 10000);

213

}

214

```

215

216

### Multi-Sink Result Validation

217

218

```java

219

@Test

220

public void testMultiSinkValidation() throws Exception {

221

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

222

env.setParallelism(2);

223

224

DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

225

226

// Split into multiple streams

227

SplitStream<Integer> split = source.split(new EvenOddOutputSelector());

228

229

// Create multiple sinks

230

TestListResultSink<Integer> allSink = new TestListResultSink<>();

231

TestListResultSink<Integer> evenSink = new TestListResultSink<>();

232

TestListResultSink<Integer> oddSink = new TestListResultSink<>();

233

234

// Connect streams to sinks

235

source.addSink(allSink);

236

split.select("even").addSink(evenSink);

237

split.select("odd").addSink(oddSink);

238

239

// Execute

240

TestUtils.tryExecute(env, "Multi-Sink Test");

241

242

// Validate consistency across sinks

243

List<Integer> allResults = allSink.getResult();

244

List<Integer> evenResults = evenSink.getResult();

245

List<Integer> oddResults = oddSink.getResult();

246

247

// Check total count

248

assertEquals("All sink should have all elements", 10, allResults.size());

249

assertEquals("Even + odd should equal total",

250

evenResults.size() + oddResults.size(), allResults.size());

251

252

// Check partitioning correctness

253

Set<Integer> combinedResults = new HashSet<>();

254

combinedResults.addAll(evenResults);

255

combinedResults.addAll(oddResults);

256

257

assertEquals("Combined results should match original",

258

new HashSet<>(allResults), combinedResults);

259

}

260

```

261

262

### Thread-Safe Result Collection

263

264

```java

265

@Test

266

public void testThreadSafeCollection() throws Exception {

267

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

268

env.setParallelism(4); // Multiple parallel instances

269

270

// Use wrapper for thread-safe collection

271

TestListWrapper<Integer> resultWrapper = new TestListWrapper<>();

272

273

DataStream<Integer> source = env.fromSequence(1, 1000);

274

275

// Custom sink using wrapper

276

source.addSink(new SinkFunction<Integer>() {

277

@Override

278

public void invoke(Integer value) {

279

resultWrapper.add(value * 2); // Transform and collect

280

}

281

});

282

283

TestUtils.tryExecute(env, "Thread-Safe Collection Test");

284

285

// Verify results

286

List<Integer> results = resultWrapper.getList();

287

assertEquals("Should have all elements", 1000, results.size());

288

289

// Verify transformation applied

290

assertTrue("All values should be even",

291

results.stream().allMatch(x -> x % 2 == 0));

292

293

// Verify range

294

assertTrue("Should contain expected values",

295

results.containsAll(Arrays.asList(2, 4, 6, 8, 10)));

296

}

297

```