0
# Runtime Utilities
1
2
Comprehensive collection of runtime utilities for test execution, process management, and common testing operations. These utilities provide essential infrastructure for running tests in controlled environments and managing Flink job execution.
3
4
## Capabilities
5
6
### Job Graph Execution Utilities
7
8
Utilities for executing JobGraphs on MiniCluster instances with comprehensive control and monitoring capabilities.
9
10
```java { .api }
11
/**
12
* Utility for running JobGraphs on MiniCluster for testing
13
*/
14
public class JobGraphRunningUtil {
15
16
/**
17
* Execute JobGraph on MiniCluster and wait for completion
18
* @param jobGraph JobGraph to execute
19
* @param miniCluster MiniCluster instance for execution
20
* @throws Exception if job execution fails
21
*/
22
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
23
24
/**
25
* Execute JobGraph with timeout
26
* @param jobGraph JobGraph to execute
27
* @param miniCluster MiniCluster instance
28
* @param timeoutMs timeout in milliseconds
29
* @return JobExecutionResult containing execution results
30
* @throws Exception if execution fails or times out
31
*/
32
public static JobExecutionResult executeWithTimeout(
33
JobGraph jobGraph,
34
MiniCluster miniCluster,
35
long timeoutMs) throws Exception;
36
37
/**
38
* Execute JobGraph and return execution result
39
* @param jobGraph JobGraph to execute
40
* @param miniCluster MiniCluster instance
41
* @return JobExecutionResult with job execution details
42
* @throws Exception if execution fails
43
*/
44
public static JobExecutionResult executeAndGetResult(
45
JobGraph jobGraph,
46
MiniCluster miniCluster) throws Exception;
47
48
/**
49
* Execute multiple JobGraphs sequentially
50
* @param jobGraphs list of JobGraphs to execute
51
* @param miniCluster MiniCluster instance
52
* @return List of JobExecutionResults
53
* @throws Exception if any job execution fails
54
*/
55
public static List<JobExecutionResult> executeSequentially(
56
List<JobGraph> jobGraphs,
57
MiniCluster miniCluster) throws Exception;
58
59
/**
60
* Submit JobGraph asynchronously and return CompletableFuture
61
* @param jobGraph JobGraph to submit
62
* @param miniCluster MiniCluster instance
63
* @return CompletableFuture containing JobExecutionResult
64
*/
65
public static CompletableFuture<JobExecutionResult> submitAsync(
66
JobGraph jobGraph,
67
MiniCluster miniCluster);
68
}
69
```
70
71
### Process Management Utilities
72
73
Entry points and utilities for managing external processes during testing scenarios.
74
75
```java { .api }
76
/**
77
* Entry point for task executor process testing
78
*/
79
public class TaskExecutorProcessEntryPoint {
80
81
/**
82
* Main entry point for standalone task executor process
83
* @param args command line arguments for task executor configuration
84
*/
85
public static void main(String[] args);
86
87
/**
88
* Start task executor with specific configuration
89
* @param config Configuration for task executor setup
90
* @throws Exception if task executor startup fails
91
*/
92
public static void startTaskExecutor(Configuration config) throws Exception;
93
94
/**
95
* Create default configuration for task executor testing
96
* @return Configuration with testing defaults
97
*/
98
public static Configuration createDefaultTestConfiguration();
99
}
100
```
101
102
### Test Function Utilities
103
104
Common test functions and utilities for data processing and validation scenarios.
105
106
```java { .api }
107
/**
108
* Tokenizer function for string processing tests
109
*/
110
public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
111
112
/**
113
* Constructor for tokenizer with default configuration
114
*/
115
public Tokenizer();
116
117
/**
118
* Constructor for tokenizer with custom delimiter
119
* @param delimiter delimiter pattern for tokenization
120
*/
121
public Tokenizer(String delimiter);
122
123
@Override
124
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;
125
}
126
127
/**
128
* Identity mapper for testing data flow without transformation
129
*/
130
public class NoOpIntMap implements MapFunction<Integer, Integer> {
131
132
/**
133
* Constructor for no-operation integer mapper
134
*/
135
public NoOpIntMap();
136
137
@Override
138
public Integer map(Integer value) throws Exception;
139
}
140
141
/**
142
* No-operation sink for testing data flow completion
143
*/
144
public class ReceiveCheckNoOpSink<T> implements SinkFunction<T> {
145
146
/**
147
* Constructor for no-op sink with receive tracking
148
* @param expectedCount expected number of elements to receive
149
*/
150
public ReceiveCheckNoOpSink(int expectedCount);
151
152
@Override
153
public void invoke(T value, Context context) throws Exception;
154
155
/**
156
* Check if expected number of elements were received
157
* @return boolean indicating if expected count was reached
158
*/
159
public boolean receivedExpectedCount();
160
161
/**
162
* Get actual count of received elements
163
* @return int representing actual received count
164
*/
165
public int getReceivedCount();
166
167
/**
168
* Reset the counter for reuse in multiple tests
169
*/
170
public void reset();
171
}
172
```
173
174
### Recovery Testing Utilities
175
176
Utilities specifically designed for testing recovery scenarios and restart strategies.
177
178
```java { .api }
179
/**
180
* Utility class for recovery testing operations
181
*/
182
public class RecoveryTestUtils {
183
184
/**
185
* Validate recovery behavior from job execution result
186
* @param result JobExecutionResult to analyze
187
* @param expectedRestartCount expected number of restarts
188
* @return boolean indicating if recovery behavior is valid
189
*/
190
public static boolean validateRecoveryBehavior(
191
JobExecutionResult result,
192
int expectedRestartCount);
193
194
/**
195
* Create configuration for failure injection testing
196
* @param restartStrategy restart strategy to use
197
* @param maxFailures maximum number of failures to inject
198
* @return Configuration with failure injection settings
199
*/
200
public static Configuration createFailureInjectionConfig(
201
String restartStrategy,
202
int maxFailures);
203
204
/**
205
* Wait for job to reach specific state with timeout
206
* @param jobId JobID to monitor
207
* @param targetState target JobStatus to wait for
208
* @param timeoutMs timeout in milliseconds
209
* @param restGateway RestClient for job monitoring
210
* @return boolean indicating if state was reached
211
* @throws Exception if monitoring fails
212
*/
213
public static boolean waitForJobState(
214
JobID jobId,
215
JobStatus targetState,
216
long timeoutMs,
217
RestClusterClient<?> restGateway) throws Exception;
218
}
219
```
220
221
### Test Environment Utilities
222
223
Utilities for setting up and managing test environments and configurations.
224
225
```java { .api }
226
/**
227
* Utility for creating and managing test environments
228
*/
229
public class TestEnvironmentUtil {
230
231
/**
232
* Create MiniCluster configuration for testing
233
* @param parallelism desired parallelism
234
* @param numTaskManagers number of task managers
235
* @return Configuration for MiniCluster setup
236
*/
237
public static Configuration createTestClusterConfig(
238
int parallelism,
239
int numTaskManagers);
240
241
/**
242
* Create streaming environment for testing
243
* @param parallelism parallelism for the environment
244
* @param checkpointingEnabled whether to enable checkpointing
245
* @return StreamExecutionEnvironment configured for testing
246
*/
247
public static StreamExecutionEnvironment createTestStreamEnv(
248
int parallelism,
249
boolean checkpointingEnabled);
250
251
/**
252
* Create batch environment for testing
253
* @param parallelism parallelism for the environment
254
* @return ExecutionEnvironment configured for testing
255
*/
256
public static ExecutionEnvironment createTestBatchEnv(int parallelism);
257
258
/**
259
* Set up test-specific logging configuration
260
* @param logLevel logging level for tests
261
* @param logToConsole whether to log to console
262
*/
263
public static void setupTestLogging(Level logLevel, boolean logToConsole);
264
265
/**
266
* Clean up test environment resources
267
* @param environment execution environment to clean up
268
* @param miniCluster mini cluster to shut down
269
* @throws Exception if cleanup fails
270
*/
271
public static void cleanupTestEnvironment(
272
StreamExecutionEnvironment environment,
273
MiniCluster miniCluster) throws Exception;
274
}
275
```
276
277
### Metrics and Monitoring Utilities
278
279
Utilities for collecting and validating metrics during test execution.
280
281
```java { .api }
282
/**
283
* Utility for metrics collection and validation in tests
284
*/
285
public class TestMetricsUtil {
286
287
/**
288
* Collect all metrics from MiniCluster
289
* @param miniCluster cluster to collect metrics from
290
* @return Map of metric names to values
291
* @throws Exception if metrics collection fails
292
*/
293
public static Map<String, Object> collectAllMetrics(MiniCluster miniCluster) throws Exception;
294
295
/**
296
* Wait for specific metric to reach expected value
297
* @param miniCluster cluster to monitor
298
* @param metricName name of metric to monitor
299
* @param expectedValue expected metric value
300
* @param timeoutMs timeout in milliseconds
301
* @return boolean indicating if metric reached expected value
302
* @throws Exception if monitoring fails
303
*/
304
public static boolean waitForMetricValue(
305
MiniCluster miniCluster,
306
String metricName,
307
Object expectedValue,
308
long timeoutMs) throws Exception;
309
310
/**
311
* Validate job metrics against expected values
312
* @param jobId JobID to validate metrics for
313
* @param expectedMetrics map of expected metric values
314
* @param miniCluster cluster to collect metrics from
315
* @return boolean indicating if all metrics match expectations
316
* @throws Exception if validation fails
317
*/
318
public static boolean validateJobMetrics(
319
JobID jobId,
320
Map<String, Object> expectedMetrics,
321
MiniCluster miniCluster) throws Exception;
322
}
323
```
324
325
**Usage Examples:**
326
327
```java
328
import org.apache.flink.test.runtime.*;
329
import org.apache.flink.test.util.*;
330
331
// Example: Executing jobs with runtime utilities
332
public class JobExecutionTest {
333
334
@Test
335
public void testJobExecution() throws Exception {
336
// Create test job graph
337
JobGraph jobGraph = createTestJobGraph();
338
339
// Set up mini cluster for testing
340
Configuration config = TestEnvironmentUtil.createTestClusterConfig(4, 2);
341
MiniCluster miniCluster = new MiniCluster(config);
342
miniCluster.start();
343
344
try {
345
// Execute job with timeout
346
JobExecutionResult result = JobGraphRunningUtil.executeWithTimeout(
347
jobGraph, miniCluster, 30000L);
348
349
// Validate execution results
350
assertTrue(result.isSuccess());
351
assertFalse(result.getAllAccumulatorResults().isEmpty());
352
353
} finally {
354
miniCluster.close();
355
}
356
}
357
358
@Test
359
public void testSequentialJobExecution() throws Exception {
360
List<JobGraph> jobs = Arrays.asList(
361
createTestJobGraph("job1"),
362
createTestJobGraph("job2"),
363
createTestJobGraph("job3")
364
);
365
366
Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);
367
MiniCluster miniCluster = new MiniCluster(config);
368
miniCluster.start();
369
370
try {
371
// Execute jobs sequentially
372
List<JobExecutionResult> results = JobGraphRunningUtil.executeSequentially(
373
jobs, miniCluster);
374
375
// Validate all jobs completed successfully
376
assertEquals(3, results.size());
377
results.forEach(result -> assertTrue(result.isSuccess()));
378
379
} finally {
380
miniCluster.close();
381
}
382
}
383
}
384
385
// Example: Recovery testing with utilities
386
public class RecoveryUtilityTest {
387
388
@Test
389
public void testRecoveryBehavior() throws Exception {
390
// Create job with failure injection
391
JobGraph faultTolerantJob = createJobWithFailures();
392
393
// Configure recovery settings
394
Configuration config = RecoveryTestUtils.createFailureInjectionConfig(
395
"fixed-delay", 3);
396
config.setString("restart-strategy.fixed-delay.delay", "1s");
397
398
MiniCluster miniCluster = new MiniCluster(config);
399
miniCluster.start();
400
401
try {
402
// Execute job and wait for completion
403
JobExecutionResult result = JobGraphRunningUtil.executeAndGetResult(
404
faultTolerantJob, miniCluster);
405
406
// Validate recovery behavior
407
boolean recoveryValid = RecoveryTestUtils.validateRecoveryBehavior(
408
result, 2);
409
assertTrue(recoveryValid);
410
411
} finally {
412
miniCluster.close();
413
}
414
}
415
}
416
417
// Example: Using test functions
418
public class TestFunctionUsage {
419
420
@Test
421
public void testTokenizerFunction() throws Exception {
422
StreamExecutionEnvironment env = TestEnvironmentUtil.createTestStreamEnv(1, false);
423
424
// Create test data
425
DataStreamSource<String> textStream = env.fromElements(
426
"hello world", "test data", "apache flink"
427
);
428
429
// Apply tokenizer
430
DataStream<Tuple2<String, Integer>> tokens = textStream
431
.flatMap(new Tokenizer())
432
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
433
434
// Use no-op sink to count results
435
ReceiveCheckNoOpSink<Tuple2<String, Integer>> countingSink =
436
new ReceiveCheckNoOpSink<>(6); // Expecting 6 tokens
437
438
tokens.addSink(countingSink);
439
440
// Execute and validate
441
env.execute("Tokenizer Test");
442
assertTrue(countingSink.receivedExpectedCount());
443
}
444
}
445
446
// Example: Environment setup and cleanup
447
public class EnvironmentManagementTest {
448
449
private StreamExecutionEnvironment env;
450
private MiniCluster miniCluster;
451
452
@Before
453
public void setup() throws Exception {
454
// Set up test logging
455
TestEnvironmentUtil.setupTestLogging(Level.INFO, true);
456
457
// Create test environment
458
env = TestEnvironmentUtil.createTestStreamEnv(2, true);
459
460
// Start mini cluster
461
Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);
462
miniCluster = new MiniCluster(config);
463
miniCluster.start();
464
}
465
466
@After
467
public void cleanup() throws Exception {
468
// Clean up test resources
469
TestEnvironmentUtil.cleanupTestEnvironment(env, miniCluster);
470
}
471
472
@Test
473
public void testWithManagedEnvironment() throws Exception {
474
// Test implementation using managed environment
475
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5);
476
477
DataStream<Integer> processed = source
478
.map(new NoOpIntMap())
479
.filter(x -> x > 2);
480
481
ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>(3);
482
processed.addSink(sink);
483
484
env.execute("Managed Environment Test");
485
assertTrue(sink.receivedExpectedCount());
486
}
487
}
488
```