0
# Streaming Utilities
1
2
Specialized components for testing streaming operations including output selectors, result collection, stream partitioning utilities, and no-op functions for performance testing.
3
4
## Output Selectors
5
6
### EvenOddOutputSelector
7
8
Output selector for splitting integer streams based on even/odd values, commonly used for stream partitioning tests.
9
10
```java { .api }
11
public class EvenOddOutputSelector implements OutputSelector<Integer> {
12
@Override
13
public Iterable<String> select(Integer value);
14
}
15
```
16
17
The selector returns:
18
- `"even"` for even numbers (value % 2 == 0)
19
- `"odd"` for odd numbers (value % 2 != 0)
20
21
## Result Collection and Validation
22
23
### TestListResultSink
24
25
Thread-safe sink function that collects streaming results into a list for test verification.
26
27
```java { .api }
28
public class TestListResultSink<T> extends RichSinkFunction<T> {
29
private int resultListId;
30
31
public TestListResultSink();
32
33
@Override
34
public void open(Configuration parameters) throws Exception;
35
36
@Override
37
public void invoke(T value) throws Exception;
38
39
@Override
40
public void close() throws Exception;
41
42
public List<T> getResult();
43
public List<T> getSortedResult();
44
}
45
```
46
47
### TestListWrapper
48
49
Singleton utility providing thread-safe access to multiple test result collections. Used internally by TestListResultSink for managing concurrent result collection.
50
51
```java { .api }
52
public class TestListWrapper {
53
private List<List<? extends Comparable>> lists;
54
55
private TestListWrapper();
56
57
public static TestListWrapper getInstance();
58
public int createList();
59
public List<?> getList(int listId);
60
}
61
```
62
63
### ReceiveCheckNoOpSink
64
65
Sink that validates element reception and asserts at least one element was received during close(), useful for ensuring data flow.
66
67
```java { .api }
68
public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
69
private List<T> received;
70
71
public ReceiveCheckNoOpSink();
72
73
@Override
74
public void open(Configuration conf);
75
76
@Override
77
public void invoke(T tuple);
78
79
@Override
80
public void close();
81
}
82
```
83
84
## Processing Functions
85
86
### NoOpIntMap
87
88
Pass-through map function for integers, used for pipeline testing without transformation overhead.
89
90
```java { .api }
91
public class NoOpIntMap implements MapFunction<Integer, Integer> {
92
@Override
93
public Integer map(Integer value) throws Exception;
94
}
95
```
96
97
## Usage Examples
98
99
### Stream Partitioning with Output Selector
100
101
```java
102
@Test
103
public void testStreamPartitioning() throws Exception {
104
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
105
env.setParallelism(1);
106
107
// Create source data
108
DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
109
110
// Split stream using EvenOddOutputSelector
111
SplitStream<Integer> splitStream = source.split(new EvenOddOutputSelector());
112
113
// Process even and odd streams separately
114
DataStream<Integer> evenStream = splitStream.select("even");
115
DataStream<Integer> oddStream = splitStream.select("odd");
116
117
// Collect results for verification
118
TestListResultSink<Integer> evenSink = new TestListResultSink<>();
119
TestListResultSink<Integer> oddSink = new TestListResultSink<>();
120
121
evenStream.addSink(evenSink);
122
oddStream.addSink(oddSink);
123
124
// Execute job
125
TestUtils.tryExecute(env, "Stream Partitioning Test");
126
127
// Verify results
128
List<Integer> evenResults = evenSink.getResult();
129
List<Integer> oddResults = oddSink.getResult();
130
131
assertEquals("Should have 5 even numbers", 5, evenResults.size());
132
assertEquals("Should have 5 odd numbers", 5, oddResults.size());
133
134
// Verify all even numbers are actually even
135
assertTrue("All results should be even",
136
evenResults.stream().allMatch(x -> x % 2 == 0));
137
assertTrue("All results should be odd",
138
oddResults.stream().allMatch(x -> x % 2 == 1));
139
}
140
```
141
142
### Result Collection and Validation
143
144
```java
145
@Test
146
public void testResultCollection() throws Exception {
147
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
148
env.setParallelism(1);
149
150
// Create test data
151
List<String> testData = Arrays.asList("hello", "world", "flink", "streaming");
152
DataStream<String> source = env.fromCollection(testData);
153
154
// Transform data
155
DataStream<String> transformed = source
156
.map(String::toUpperCase)
157
.filter(s -> s.length() > 4);
158
159
// Collect results
160
TestListResultSink<String> resultSink = new TestListResultSink<>();
161
transformed.addSink(resultSink);
162
163
// Execute
164
TestUtils.tryExecute(env, "Result Collection Test");
165
166
// Validate results
167
List<String> results = resultSink.getResult();
168
169
assertEquals("Should have filtered results", 2, results.size());
170
assertTrue("Should contain HELLO", results.contains("HELLO"));
171
assertTrue("Should contain WORLD", results.contains("WORLD"));
172
assertTrue("Should contain FLINK", results.contains("FLINK"));
173
assertTrue("Should contain STREAMING", results.contains("STREAMING"));
174
}
175
```
176
177
### Performance Testing with No-Op Functions
178
179
```java
180
@Test
181
public void testStreamingPerformance() throws Exception {
182
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
183
env.setParallelism(4);
184
185
// Generate large dataset
186
DataStream<Integer> source = env.fromSequence(1, 1000000);
187
188
// Apply no-op transformations to test overhead
189
DataStream<Integer> processed = source
190
.map(new NoOpIntMap()) // No-op transformation
191
.keyBy(x -> x % 100) // Partition by key
192
.map(new NoOpIntMap()) // Another no-op
193
.filter(x -> true); // Pass-through filter
194
195
// Use no-op sink to measure throughput
196
ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>();
197
processed.addSink(sink);
198
199
// Measure execution time
200
long startTime = System.currentTimeMillis();
201
TestUtils.tryExecute(env, "Performance Test");
202
long duration = System.currentTimeMillis() - startTime;
203
204
// Verify throughput
205
long processedCount = sink.getReceivedCount();
206
assertEquals("Should process all elements", 1000000, processedCount);
207
208
double throughput = processedCount / (duration / 1000.0);
209
System.out.println("Throughput: " + throughput + " elements/second");
210
211
// Assert minimum throughput requirement
212
assertTrue("Throughput should be reasonable", throughput > 10000);
213
}
214
```
215
216
### Multi-Sink Result Validation
217
218
```java
219
@Test
220
public void testMultiSinkValidation() throws Exception {
221
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
222
env.setParallelism(2);
223
224
DataStream<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
225
226
// Split into multiple streams
227
SplitStream<Integer> split = source.split(new EvenOddOutputSelector());
228
229
// Create multiple sinks
230
TestListResultSink<Integer> allSink = new TestListResultSink<>();
231
TestListResultSink<Integer> evenSink = new TestListResultSink<>();
232
TestListResultSink<Integer> oddSink = new TestListResultSink<>();
233
234
// Connect streams to sinks
235
source.addSink(allSink);
236
split.select("even").addSink(evenSink);
237
split.select("odd").addSink(oddSink);
238
239
// Execute
240
TestUtils.tryExecute(env, "Multi-Sink Test");
241
242
// Validate consistency across sinks
243
List<Integer> allResults = allSink.getResult();
244
List<Integer> evenResults = evenSink.getResult();
245
List<Integer> oddResults = oddSink.getResult();
246
247
// Check total count
248
assertEquals("All sink should have all elements", 10, allResults.size());
249
assertEquals("Even + odd should equal total",
250
evenResults.size() + oddResults.size(), allResults.size());
251
252
// Check partitioning correctness
253
Set<Integer> combinedResults = new HashSet<>();
254
combinedResults.addAll(evenResults);
255
combinedResults.addAll(oddResults);
256
257
assertEquals("Combined results should match original",
258
new HashSet<>(allResults), combinedResults);
259
}
260
```
261
262
### Thread-Safe Result Collection
263
264
```java
265
@Test
266
public void testThreadSafeCollection() throws Exception {
267
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
268
env.setParallelism(4); // Multiple parallel instances
269
270
// Use wrapper for thread-safe collection
271
TestListWrapper<Integer> resultWrapper = new TestListWrapper<>();
272
273
DataStream<Integer> source = env.fromSequence(1, 1000);
274
275
// Custom sink using wrapper
276
source.addSink(new SinkFunction<Integer>() {
277
@Override
278
public void invoke(Integer value) {
279
resultWrapper.add(value * 2); // Transform and collect
280
}
281
});
282
283
TestUtils.tryExecute(env, "Thread-Safe Collection Test");
284
285
// Verify results
286
List<Integer> results = resultWrapper.getList();
287
assertEquals("Should have all elements", 1000, results.size());
288
289
// Verify transformation applied
290
assertTrue("All values should be even",
291
results.stream().allMatch(x -> x % 2 == 0));
292
293
// Verify range
294
assertTrue("Should contain expected values",
295
results.containsAll(Arrays.asList(2, 4, 6, 8, 10)));
296
}
297
```