0
# Test Environments and Data Sources
1
2
Test execution environments, data sources, and utilities for creating controlled testing scenarios in Flink applications. These utilities enable comprehensive testing of streaming and batch applications with predictable test data and isolated execution contexts.
3
4
## Capabilities
5
6
### Test Environment Management
7
8
#### TestStreamEnvironment
9
10
Test-specific stream execution environment that provides controlled execution for testing streaming applications.
11
12
```java { .api }
13
/**
14
* Test-specific stream execution environment
15
* Extends StreamExecutionEnvironment with test-friendly defaults
16
*/
17
class TestStreamEnvironment extends StreamExecutionEnvironment {
18
/** Create a test stream environment with default parallelism of 1 */
19
static TestStreamEnvironment createTestEnvironment();
20
21
/** Create test environment with specific parallelism */
22
static TestStreamEnvironment createTestEnvironment(int parallelism);
23
}
24
```
25
26
#### MultipleProgramsTestBase
27
28
Base classes for tests that need to run multiple Flink programs in sequence.
29
30
```java { .api }
31
/**
32
* Base class for tests running multiple programs
33
* Provides clean execution environment for each program
34
*/
35
abstract class MultipleProgramsTestBase extends AbstractTestBase {
36
/** Get execution environment for current test */
37
ExecutionEnvironment getExecutionEnvironment();
38
39
/** Get stream execution environment for current test */
40
StreamExecutionEnvironment getStreamExecutionEnvironment();
41
}
42
43
/**
44
* JUnit 4 version of multiple programs test base
45
*/
46
abstract class MultipleProgramsTestBaseJUnit4 extends AbstractTestBaseJUnit4 {
47
ExecutionEnvironment getExecutionEnvironment();
48
StreamExecutionEnvironment getStreamExecutionEnvironment();
49
}
50
```
51
52
**Usage Examples:**
53
54
```java
55
import org.apache.flink.streaming.util.TestStreamEnvironment;
56
import org.apache.flink.test.util.MultipleProgramsTestBase;
57
58
// Using TestStreamEnvironment
59
TestStreamEnvironment env = TestStreamEnvironment.createTestEnvironment();
60
DataStream<String> source = env.fromElements("hello", "world");
61
// Configure and execute test pipeline
62
63
// Using MultipleProgramsTestBase
64
public class MyIntegrationTest extends MultipleProgramsTestBase {
65
@Test
66
public void testProgram1() {
67
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
68
// Test first program
69
}
70
71
@Test
72
public void testProgram2() {
73
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
74
// Test second program with fresh environment
75
}
76
}
77
```
78
79
### Test Data Sources
80
81
#### FiniteTestSource
82
83
Finite data source for streaming tests that emits a predetermined set of elements.
84
85
```java { .api }
86
/**
87
* Finite test data source for streaming tests
88
* Implements SourceFunction and CheckpointListener for complete lifecycle testing
89
*/
90
class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
91
/** Create source with collection of elements */
92
FiniteTestSource(Collection<T> elements);
93
94
/** Create source with array of elements */
95
FiniteTestSource(T... elements);
96
97
/** Create source with elements and emission delay */
98
FiniteTestSource(Collection<T> elements, long delayBetweenElements);
99
100
void run(SourceContext<T> ctx) throws Exception;
101
void cancel();
102
void notifyCheckpointComplete(long checkpointId);
103
}
104
```
105
106
**Usage Examples:**
107
108
```java
109
import org.apache.flink.streaming.util.FiniteTestSource;
110
111
// Simple test source
112
FiniteTestSource<Integer> source = new FiniteTestSource<>(1, 2, 3, 4, 5);
113
DataStream<Integer> stream = env.addSource(source);
114
115
// Source with delay between elements
116
List<String> testData = Arrays.asList("a", "b", "c");
117
FiniteTestSource<String> delayedSource =
118
new FiniteTestSource<>(testData, 100); // 100ms between elements
119
DataStream<String> delayedStream = env.addSource(delayedSource);
120
```
121
122
### Test Data Sinks
123
124
#### TestListResultSink
125
126
Sink that collects streaming results in a list for easy verification in tests.
127
128
```java { .api }
129
/**
130
* Sink that collects results in a list for test verification
131
*/
132
class TestListResultSink<T> extends RichSinkFunction<T> {
133
/** Create sink with result collection */
134
TestListResultSink(List<T> resultList);
135
136
/** Create sink with thread-safe result collection */
137
static <T> TestListResultSink<T> createThreadSafe();
138
139
void invoke(T value, Context context);
140
141
/** Get collected results (thread-safe) */
142
List<T> getResults();
143
144
/** Clear collected results */
145
void clear();
146
}
147
```
148
149
**Usage Examples:**
150
151
```java
152
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
153
154
// Collect results for verification
155
List<String> results = new ArrayList<>();
156
TestListResultSink<String> sink = new TestListResultSink<>(results);
157
158
DataStream<String> stream = env.fromElements("hello", "world");
159
stream.addSink(sink);
160
env.execute();
161
162
// Verify results
163
assertEquals(Arrays.asList("hello", "world"), results);
164
165
// Thread-safe version
166
TestListResultSink<Integer> threadSafeSink = TestListResultSink.createThreadSafe();
167
stream.addSink(threadSafeSink);
168
env.execute();
169
List<Integer> safeResults = threadSafeSink.getResults();
170
```
171
172
### Upsert Testing Framework
173
174
Specialized testing framework for upsert operations in streaming applications.
175
176
#### UpsertTestSink
177
178
Test sink specifically designed for testing upsert behavior in streaming applications.
179
180
```java { .api }
181
/**
182
* Test sink for upsert operations
183
* Tracks inserts, updates, and deletes separately
184
*/
185
class UpsertTestSink<IN> implements Sink<IN> {
186
/** Get all received records with their operation types */
187
List<UpsertRecord<IN>> getUpsertResults();
188
189
/** Get only insert records */
190
List<IN> getInsertResults();
191
192
/** Get only update records */
193
List<IN> getUpdateResults();
194
195
/** Get only delete records */
196
List<IN> getDeleteResults();
197
198
/** Clear all collected results */
199
void clearResults();
200
}
201
202
/**
203
* Builder for UpsertTestSink
204
*/
205
class UpsertTestSinkBuilder<IN> {
206
/** Set key fields for upsert operations */
207
UpsertTestSinkBuilder<IN> setKeyFields(String... keyFields);
208
209
/** Enable changelog mode tracking */
210
UpsertTestSinkBuilder<IN> enableChangelogMode();
211
212
/** Build the upsert test sink */
213
UpsertTestSink<IN> build();
214
}
215
216
/**
217
* Record in upsert sink with operation type
218
*/
219
class UpsertRecord<T> {
220
T getRecord();
221
UpsertOperation getOperation(); // INSERT, UPDATE, DELETE
222
long getTimestamp();
223
}
224
```
225
226
**Usage Examples:**
227
228
```java
229
import org.apache.flink.connector.upserttest.sink.UpsertTestSink;
230
import org.apache.flink.connector.upserttest.sink.UpsertTestSinkBuilder;
231
232
// Create upsert test sink
233
UpsertTestSink<MyRecord> upsertSink = new UpsertTestSinkBuilder<MyRecord>()
234
.setKeyFields("id")
235
.enableChangelogMode()
236
.build();
237
238
// Use in streaming pipeline
239
DataStream<MyRecord> changelogStream = // ... your upsert stream
240
changelogStream.sinkTo(upsertSink);
241
env.execute();
242
243
// Verify upsert behavior
244
List<MyRecord> inserts = upsertSink.getInsertResults();
245
List<MyRecord> updates = upsertSink.getUpdateResults();
246
List<UpsertRecord<MyRecord>> allChanges = upsertSink.getUpsertResults();
247
248
assertEquals(3, inserts.size());
249
assertEquals(2, updates.size());
250
```
251
252
### Test Data Generators
253
254
Pre-built test datasets for common algorithms and testing scenarios.
255
256
#### Algorithm Test Data
257
258
```java { .api }
259
/**
260
* Test data for connected components algorithm
261
*/
262
class ConnectedComponentsData {
263
/** Get default vertex data for testing */
264
static DataSet<Tuple2<Long, Long>> getDefaultVertexDataSet(ExecutionEnvironment env);
265
266
/** Get default edge data for testing */
267
static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
268
269
/** Get expected results for default dataset */
270
static String getExpectedResult();
271
}
272
273
/**
274
* Test data for word count examples
275
*/
276
class WordCountData {
277
/** Get default text data for word count testing */
278
static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env);
279
280
/** Get expected word count results */
281
static String getExpectedResult();
282
}
283
284
/**
285
* Test data for K-means clustering
286
*/
287
class KMeansData {
288
/** Get default point data for clustering */
289
static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env);
290
291
/** Get default centroid data */
292
static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env);
293
294
/** Get expected clustering results */
295
static String getExpectedResult();
296
}
297
298
/**
299
* Test data for PageRank algorithm
300
*/
301
class PageRankData {
302
/** Get default vertices for PageRank testing */
303
static DataSet<Tuple2<Long, Double>> getDefaultVerticesDataSet(ExecutionEnvironment env);
304
305
/** Get default edges for PageRank testing */
306
static DataSet<Tuple2<Long, Long>> getDefaultEdgesDataSet(ExecutionEnvironment env);
307
308
/** Get expected PageRank results */
309
static String getExpectedResult();
310
}
311
```
312
313
**Usage Examples:**
314
315
```java
316
import org.apache.flink.test.testdata.WordCountData;
317
import org.apache.flink.test.testdata.ConnectedComponentsData;
318
319
// Word count test
320
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
321
DataSet<String> text = WordCountData.getDefaultTextLineDataSet(env);
322
// Run word count algorithm
323
String result = // ... algorithm result
324
assertEquals(WordCountData.getExpectedResult(), result);
325
326
// Connected components test
327
DataSet<Tuple2<Long, Long>> vertices =
328
ConnectedComponentsData.getDefaultVertexDataSet(env);
329
DataSet<Tuple2<Long, Long>> edges =
330
ConnectedComponentsData.getDefaultEdgeDataSet(env);
331
// Run connected components algorithm
332
```
333
334
### File and Process Utilities
335
336
#### TestBaseUtils and FileUtils
337
338
Utilities for file operations and process management in tests.
339
340
```java { .api }
341
/**
342
* Base utilities for Flink tests
343
*/
344
class TestBaseUtils {
345
/** Create temporary directory for test data */
346
static String createTempDirectory();
347
348
/** Clean up test resources */
349
static void cleanup();
350
351
/** Compare result files with expected output */
352
static void compareResultsByLinesInMemory(String expectedResult, String actualResult);
353
}
354
355
/**
356
* File utilities for testing
357
*/
358
class FileUtils {
359
/** Write string to temporary file */
360
static Path writeToTempFile(String content, String suffix);
361
362
/** Read entire file as string */
363
static String readFileAsString(Path file);
364
365
/** Create temporary directory */
366
static Path createTempDirectory(String prefix);
367
368
/** Delete directory recursively */
369
static void deleteDirectoryRecursively(Path directory);
370
}
371
372
/**
373
* Build and run test processes
374
*/
375
class TestProcessBuilder {
376
/** Create process builder for Java application */
377
static TestProcessBuilder createJavaProcess(Class<?> mainClass);
378
379
/** Add JVM arguments */
380
TestProcessBuilder addJvmArgs(String... args);
381
382
/** Add program arguments */
383
TestProcessBuilder addArgs(String... args);
384
385
/** Start the process and wait for completion */
386
ProcessResult start();
387
}
388
```
389
390
**Usage Examples:**
391
392
```java
393
import org.apache.flink.test.util.FileUtils;
394
import org.apache.flink.test.util.TestProcessBuilder;
395
396
// File operations
397
Path tempFile = FileUtils.writeToTempFile("test data", ".txt");
398
String content = FileUtils.readFileAsString(tempFile);
399
assertEquals("test data", content);
400
401
// Process testing
402
ProcessResult result = TestProcessBuilder
403
.createJavaProcess(MyFlinkJob.class)
404
.addJvmArgs("-Xmx1g")
405
.addArgs("--input", "test-input.txt")
406
.start();
407
408
assertEquals(0, result.getExitCode());
409
```