0
# Test Environment Setup
1
2
Test environment utilities for setting up and managing Flink execution environments in tests, including both streaming and batch processing environments with MiniCluster integration.
3
4
## Capabilities
5
6
### Test Streaming Environment
7
8
`TestStreamEnvironment` provides a streaming execution environment that runs jobs on a MiniCluster for testing purposes.
9
10
```java { .api }
11
/**
12
* StreamExecutionEnvironment that executes jobs on MiniCluster for testing
13
*/
14
public class TestStreamEnvironment extends StreamExecutionEnvironment {
15
/**
16
* Create test streaming environment with full configuration
17
* @param cluster The MiniCluster to execute on
18
* @param configuration Flink configuration
19
* @param parallelism Default parallelism
20
* @param jarFiles JAR files to include in classpath
21
* @param classPaths Additional classpaths
22
*/
23
public TestStreamEnvironment(
24
MiniCluster cluster,
25
Configuration configuration,
26
int parallelism,
27
Collection<Path> jarFiles,
28
Collection<URL> classPaths
29
);
30
31
/**
32
* Create test streaming environment with simplified configuration
33
* @param cluster The MiniCluster to execute on
34
* @param parallelism Default parallelism
35
*/
36
public TestStreamEnvironment(MiniCluster cluster, int parallelism);
37
38
/**
39
* Set as the context environment with full configuration
40
* @param cluster The MiniCluster to execute on
41
* @param parallelism Default parallelism
42
* @param jarFiles JAR files to include in classpath
43
* @param classPaths Additional classpaths
44
*/
45
public static void setAsContext(
46
MiniCluster cluster,
47
int parallelism,
48
Collection<Path> jarFiles,
49
Collection<URL> classPaths
50
);
51
52
/**
53
* Set as the context environment with simplified configuration
54
* @param cluster The MiniCluster to execute on
55
* @param parallelism Default parallelism
56
*/
57
public static void setAsContext(MiniCluster cluster, int parallelism);
58
59
/**
60
* Reset the context environment
61
*/
62
public static void unsetAsContext();
63
}
64
```
65
66
**Usage Example:**
67
68
```java
69
import org.apache.flink.streaming.util.TestStreamEnvironment;
70
import org.apache.flink.runtime.minicluster.MiniCluster;
71
72
@Test
73
public void testStreamingJob() throws Exception {
74
MiniCluster cluster = // ... create cluster
75
76
// Set up test streaming environment
77
TestStreamEnvironment.setAsContext(cluster, 4);
78
79
try {
80
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
81
82
// Your streaming job logic here
83
DataStream<String> input = env.fromElements("test", "data");
84
DataStream<String> result = input.map(String::toUpperCase);
85
86
result.print();
87
env.execute("Test Streaming Job");
88
89
} finally {
90
TestStreamEnvironment.unsetAsContext();
91
}
92
}
93
```
94
95
### Test Batch Environment
96
97
`TestEnvironment` provides a batch execution environment that runs jobs on a MiniCluster for testing purposes.
98
99
```java { .api }
100
/**
101
* ExecutionEnvironment implementation that executes jobs on MiniCluster
102
*/
103
public class TestEnvironment extends ExecutionEnvironment {
104
/**
105
* Create test environment with full configuration
106
* @param cluster The MiniCluster to execute on
107
* @param parallelism Default parallelism
108
* @param objectReuse Whether to enable object reuse
109
* @param jarFiles JAR files to include in classpath
110
* @param classPaths Additional classpaths
111
*/
112
public TestEnvironment(
113
MiniCluster cluster,
114
int parallelism,
115
boolean objectReuse,
116
Collection<Path> jarFiles,
117
Collection<URL> classPaths
118
);
119
120
/**
121
* Create test environment with simplified configuration
122
* @param cluster The MiniCluster to execute on
123
* @param parallelism Default parallelism
124
* @param objectReuse Whether to enable object reuse
125
*/
126
public TestEnvironment(MiniCluster cluster, int parallelism, boolean objectReuse);
127
128
/**
129
* Get the result of the last job execution
130
* @return JobExecutionResult of the last executed job
131
*/
132
public JobExecutionResult getLastJobExecutionResult();
133
134
/**
135
* Set this environment as the context environment
136
*/
137
public void setAsContext();
138
139
/**
140
* Set as the context environment with full configuration
141
* @param cluster The MiniCluster to execute on
142
* @param parallelism Default parallelism
143
* @param jarFiles JAR files to include in classpath
144
* @param classPaths Additional classpaths
145
*/
146
public static void setAsContext(
147
MiniCluster cluster,
148
int parallelism,
149
Collection<Path> jarFiles,
150
Collection<URL> classPaths
151
);
152
153
/**
154
* Set as the context environment with simplified configuration
155
* @param cluster The MiniCluster to execute on
156
* @param parallelism Default parallelism
157
*/
158
public static void setAsContext(MiniCluster cluster, int parallelism);
159
160
/**
161
* Reset the context environment
162
*/
163
public static void unsetAsContext();
164
}
165
```
166
167
### MiniCluster Resource Management
168
169
`MiniClusterWithClientResource` provides JUnit integration for managing MiniCluster lifecycle in tests.
170
171
```java { .api }
172
/**
173
* Starts Flink mini cluster and registers execution environments as JUnit rule
174
*/
175
public class MiniClusterWithClientResource extends MiniClusterResource {
176
/**
177
* Create cluster resource with configuration
178
* @param configuration MiniCluster configuration
179
*/
180
public MiniClusterWithClientResource(MiniClusterResourceConfiguration configuration);
181
182
/**
183
* Get cluster client for job submission
184
* @return ClusterClient for interacting with cluster
185
*/
186
public ClusterClient<?> getClusterClient();
187
188
/**
189
* Get REST cluster client
190
* @return RestClusterClient for REST API access
191
*/
192
public RestClusterClient<?> getRestClusterClient();
193
194
/**
195
* Get test environment configured for this cluster
196
* @return TestEnvironment configured for this cluster
197
*/
198
public TestEnvironment getTestEnvironment();
199
}
200
```
201
202
**Usage Example:**
203
204
```java
205
import org.apache.flink.test.util.MiniClusterWithClientResource;
206
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
207
208
public class MyFlinkTest {
209
210
@ClassRule
211
public static MiniClusterWithClientResource flinkCluster =
212
new MiniClusterWithClientResource(
213
new MiniClusterResourceConfiguration.Builder()
214
.setNumberSlotsPerTaskManager(2)
215
.setNumberTaskManagers(1)
216
.build());
217
218
@Test
219
public void testBatchJob() throws Exception {
220
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
221
222
// Your batch job logic here
223
DataSet<String> input = env.fromElements("hello", "world");
224
DataSet<String> result = input.map(String::toUpperCase);
225
226
List<String> output = result.collect();
227
228
// Validate results
229
assertEquals(Arrays.asList("HELLO", "WORLD"), output);
230
}
231
}
232
```
233
234
### Pipeline Executor Service Loader
235
236
`MiniClusterPipelineExecutorServiceLoader` provides pipeline execution service integration for MiniCluster.
237
238
```java { .api }
239
/**
240
* Pipeline executor service loader for MiniCluster execution
241
*/
242
public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader {
243
public static final String NAME = "minicluster";
244
245
/**
246
* Create executor service loader for given MiniCluster
247
* @param miniCluster The MiniCluster to execute on
248
*/
249
public MiniClusterPipelineExecutorServiceLoader(MiniCluster miniCluster);
250
251
/**
252
* Update configuration for MiniCluster execution
253
* @param configuration Configuration to update
254
* @param jarFiles JAR files to include
255
* @param classpaths Additional classpaths
256
* @return Updated configuration
257
*/
258
public static Configuration updateConfigurationForMiniCluster(
259
Configuration configuration,
260
Collection<Path> jarFiles,
261
Collection<URL> classpaths
262
);
263
264
/**
265
* Get executor factory for configuration
266
* @param configuration Flink configuration
267
* @return PipelineExecutorFactory for the configuration
268
*/
269
public PipelineExecutorFactory getExecutorFactory(Configuration configuration);
270
271
/**
272
* Get supported executor names
273
* @return Stream of executor names
274
*/
275
public Stream<String> getExecutorNames();
276
}
277
```
278
279
### Test Base Utils
280
281
`TestBaseUtils` provides a comprehensive collection of utility methods for testing Flink applications, including result comparison, file operations, and configuration management.
282
283
```java { .api }
284
/**
285
* Utility class with various methods for testing purposes
286
*/
287
public class TestBaseUtils {
288
// Configuration constants
289
protected static final int MINIMUM_HEAP_SIZE_MB = 192;
290
protected static final String TASK_MANAGER_MEMORY_SIZE = "80m";
291
protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
292
protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
293
public static final FiniteDuration DEFAULT_TIMEOUT;
294
public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);
295
296
// Result reading methods
297
/**
298
* Get result readers for result files
299
* @param resultPath Path to result directory
300
* @return Array of BufferedReaders for result files
301
*/
302
public static BufferedReader[] getResultReader(String resultPath) throws IOException;
303
304
/**
305
* Get result readers with exclusion prefixes and ordering
306
* @param resultPath Path to result directory
307
* @param excludePrefixes Prefixes to exclude from results
308
* @param inOrderOfFiles Whether to maintain file order
309
* @return Array of BufferedReaders for result files
310
*/
311
public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException;
312
313
/**
314
* Get result input streams
315
* @param resultPath Path to result directory
316
* @return Array of BufferedInputStreams for result files
317
*/
318
public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException;
319
320
/**
321
* Get result input streams with exclusion prefixes
322
* @param resultPath Path to result directory
323
* @param excludePrefixes Prefixes to exclude from results
324
* @return Array of BufferedInputStreams for result files
325
*/
326
public static BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException;
327
328
/**
329
* Read all result lines into target list
330
* @param target List to store result lines
331
* @param resultPath Path to result directory
332
*/
333
public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
334
335
/**
336
* Read all result lines with exclusions and ordering
337
* @param target List to store result lines
338
* @param resultPath Path to result directory
339
* @param excludePrefixes Prefixes to exclude from results
340
* @param inOrderOfFiles Whether to maintain file order
341
*/
342
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException;
343
344
// Result comparison methods
345
/**
346
* Compare results by lines in memory
347
* @param expectedResultStr Expected result as string
348
* @param resultPath Path to actual results
349
*/
350
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
351
352
/**
353
* Compare results by lines in memory with exclusions
354
* @param expectedResultStr Expected result as string
355
* @param resultPath Path to actual results
356
* @param excludePrefixes Prefixes to exclude from comparison
357
*/
358
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception;
359
360
/**
361
* Compare results by lines with strict ordering
362
* @param expectedResultStr Expected result as string
363
* @param resultPath Path to actual results
364
*/
365
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
366
367
/**
368
* Compare results by lines with strict ordering and exclusions
369
* @param expectedResultStr Expected result as string
370
* @param resultPath Path to actual results
371
* @param excludePrefixes Prefixes to exclude from comparison
372
*/
373
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception;
374
375
/**
376
* Check lines against regular expression pattern
377
* @param resultPath Path to result files
378
* @param regexp Regular expression pattern to match
379
*/
380
public static void checkLinesAgainstRegexp(String resultPath, String regexp);
381
382
/**
383
* Compare key-value pairs with delta tolerance
384
* @param expectedLines Expected key-value pairs
385
* @param resultPath Path to actual results
386
* @param delimiter Key-value delimiter
387
* @param maxDelta Maximum allowed delta for numeric comparisons
388
*/
389
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;
390
391
/**
392
* Compare key-value pairs with delta tolerance and exclusions
393
* @param expectedLines Expected key-value pairs
394
* @param resultPath Path to actual results
395
* @param excludePrefixes Prefixes to exclude from comparison
396
* @param delimiter Key-value delimiter
397
* @param maxDelta Maximum allowed delta for numeric comparisons
398
*/
399
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes, String delimiter, double maxDelta) throws Exception;
400
401
/**
402
* Compare result collections with custom comparator
403
* @param expected Expected results list
404
* @param actual Actual results list
405
* @param comparator Comparator for element comparison
406
*/
407
public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator);
408
409
// Collection comparison methods
410
/**
411
* Compare result as tuples
412
* @param result Actual result list
413
* @param expected Expected result as string
414
*/
415
public static <T> void compareResultAsTuples(List<T> result, String expected);
416
417
/**
418
* Compare result as text
419
* @param result Actual result list
420
* @param expected Expected result as string
421
*/
422
public static <T> void compareResultAsText(List<T> result, String expected);
423
424
/**
425
* Compare ordered result as text
426
* @param result Actual result list
427
* @param expected Expected result as string
428
*/
429
public static <T> void compareOrderedResultAsText(List<T> result, String expected);
430
431
/**
432
* Compare ordered result as text with tuple option
433
* @param result Actual result list
434
* @param expected Expected result as string
435
* @param asTuples Whether to treat as tuples
436
*/
437
public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples);
438
439
/**
440
* Check if result contains expected content
441
* @param result Actual result list
442
* @param expected Expected content as string
443
*/
444
public static <T> void containsResultAsText(List<T> result, String expected);
445
446
// Utility methods
447
/**
448
* Set environment variables
449
* @param newenv Map of environment variables to set
450
*/
451
public static void setEnv(Map<String, String> newenv);
452
453
/**
454
* Construct test path for class
455
* @param forClass Class to construct path for
456
* @param folder Folder name
457
* @return Constructed test path
458
*/
459
public static String constructTestPath(Class<?> forClass, String folder);
460
461
/**
462
* Construct test URI for class
463
* @param forClass Class to construct URI for
464
* @param folder Folder name
465
* @return Constructed test URI
466
*/
467
public static String constructTestURI(Class<?> forClass, String folder);
468
469
/**
470
* Get content from HTTP URL
471
* @param url URL to fetch from
472
* @return HTTP response content
473
*/
474
public static String getFromHTTP(String url) throws Exception;
475
476
/**
477
* Get content from HTTP URL with timeout
478
* @param url URL to fetch from
479
* @param timeout Request timeout
480
* @return HTTP response content
481
*/
482
public static String getFromHTTP(String url, Time timeout) throws Exception;
483
484
// Configuration methods
485
/**
486
* Convert configurations to parameter list
487
* @param testConfigs Configuration objects
488
* @return Parameter list for parameterized tests
489
*/
490
protected static Collection<Object[]> toParameterList(Configuration... testConfigs);
491
492
/**
493
* Convert configuration list to parameter list
494
* @param testConfigs List of configuration objects
495
* @return Parameter list for parameterized tests
496
*/
497
protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs);
498
499
/**
500
* Tuple comparator for comparing Tuple objects
501
*/
502
public static class TupleComparator<T extends Tuple> implements Comparator<T> {
503
public int compare(T o1, T o2);
504
}
505
}
506
```
507
508
**Usage Example:**
509
510
```java
511
import org.apache.flink.test.util.TestBaseUtils;
512
import org.apache.flink.api.java.tuple.Tuple2;
513
514
@Test
515
public void testResultComparison() throws Exception {
516
// Test result comparison
517
String expected = "hello\nworld\nflink";
518
String resultPath = "/path/to/results";
519
520
TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
521
522
// Test with exclusions
523
String[] excludePrefixes = {"debug:", "info:"};
524
TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath, excludePrefixes);
525
526
// Test ordered comparison
527
TestBaseUtils.compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
528
529
// Test collection comparison
530
List<String> actualResults = Arrays.asList("HELLO", "WORLD", "FLINK");
531
String expectedResults = "HELLO\nWORLD\nFLINK";
532
TestBaseUtils.compareResultAsText(actualResults, expectedResults);
533
534
// Test tuple comparison
535
List<Tuple2<String, Integer>> tupleResults = Arrays.asList(
536
new Tuple2<>("hello", 1),
537
new Tuple2<>("world", 2)
538
);
539
String expectedTuples = "(hello,1)\n(world,2)";
540
TestBaseUtils.compareResultAsTuples(tupleResults, expectedTuples);
541
}
542
543
@Test
544
public void testKeyValueComparison() throws Exception {
545
// Test with numeric delta comparison
546
String expected = "a,1.0\nb,2.5\nc,3.7";
547
String resultPath = "/path/to/numeric/results";
548
double maxDelta = 0.1;
549
550
TestBaseUtils.compareKeyValuePairsWithDelta(expected, resultPath, ",", maxDelta);
551
}
552
553
@Test
554
public void testHTTPUtilities() throws Exception {
555
// Test HTTP fetching
556
String content = TestBaseUtils.getFromHTTP("http://example.com/test");
557
assertNotNull(content);
558
559
// Test with timeout
560
Time timeout = Time.seconds(30);
561
String contentWithTimeout = TestBaseUtils.getFromHTTP("http://example.com/test", timeout);
562
assertNotNull(contentWithTimeout);
563
}
564
565
@Test
566
public void testPathConstruction() {
567
String testPath = TestBaseUtils.constructTestPath(MyTest.class, "testdata");
568
String testURI = TestBaseUtils.constructTestURI(MyTest.class, "testdata");
569
570
assertNotNull(testPath);
571
assertNotNull(testURI);
572
}
573
```
574
575
## Test Base Classes
576
577
### Abstract Test Base
578
579
Base class for unit tests that reuse the same Flink cluster across multiple test methods.
580
581
```java { .api }
582
/**
583
* Base class for unit tests that reuse the same Flink cluster
584
*/
585
public abstract class AbstractTestBase extends TestBaseUtils {
586
/**
587
* Static class rule for managing cluster lifecycle
588
*/
589
public static MiniClusterWithClientResource miniClusterResource;
590
591
/**
592
* Static temporary folder rule
593
*/
594
public static TemporaryFolder TEMPORARY_FOLDER;
595
596
/**
597
* Get temporary directory path
598
* @param dirName Directory name
599
* @return Path to temporary directory
600
*/
601
public String getTempDirPath(String dirName);
602
603
/**
604
* Get temporary file path
605
* @param fileName File name
606
* @return Path to temporary file
607
*/
608
public String getTempFilePath(String fileName);
609
610
/**
611
* Create temporary file with contents
612
* @param fileName File name
613
* @param contents File contents
614
* @return Path to created file
615
*/
616
public String createTempFile(String fileName, String contents);
617
618
/**
619
* Create and register temporary file for cleanup
620
* @param fileName File name
621
* @return Created File object
622
*/
623
public File createAndRegisterTempFile(String fileName);
624
}
625
```
626
627
### Java Program Test Base
628
629
Base class for tests that run a single test program with object reuse enabled/disabled.
630
631
```java { .api }
632
/**
633
* Base for tests that run single test with object reuse enabled/disabled
634
*/
635
public abstract class JavaProgramTestBase extends AbstractTestBase {
636
/**
637
* Set number of test repetitions
638
* @param numberOfTestRepetitions Number of times to repeat test
639
*/
640
public void setNumberOfTestRepetitions(int numberOfTestRepetitions);
641
642
/**
643
* Get parallelism level
644
* @return Current parallelism setting
645
*/
646
public int getParallelism();
647
648
/**
649
* Get latest execution result
650
* @return JobExecutionResult of latest execution
651
*/
652
public JobExecutionResult getLatestExecutionResult();
653
654
/**
655
* Check if using collection execution
656
* @return true if collection execution mode
657
*/
658
public boolean isCollectionExecution();
659
660
/**
661
* Test program implementation - must be implemented by subclasses
662
*/
663
protected abstract void testProgram() throws Exception;
664
665
/**
666
* Pre-submission work - override if needed
667
*/
668
protected abstract void preSubmit() throws Exception;
669
670
/**
671
* Post-submission work - override if needed
672
*/
673
protected abstract void postSubmit() throws Exception;
674
675
/**
676
* Whether to skip collection execution - override if needed
677
* @return true to skip collection execution
678
*/
679
protected abstract boolean skipCollectionExecution();
680
}
681
```
682
683
### Multiple Programs Test Base
684
685
Base class for parameterized tests that run in different execution modes (cluster, collection, etc.).
686
687
```java { .api }
688
/**
689
* Base for parameterized tests that run in different execution modes
690
*/
691
public class MultipleProgramsTestBase extends AbstractTestBase {
692
/**
693
* Test execution modes
694
*/
695
public enum TestExecutionMode {
696
CLUSTER,
697
CLUSTER_OBJECT_REUSE,
698
COLLECTION
699
}
700
701
/**
702
* Create test base with execution mode
703
* @param mode Test execution mode
704
*/
705
public MultipleProgramsTestBase(TestExecutionMode mode);
706
707
/**
708
* Provides parameterized execution modes for JUnit parameterized tests
709
* @return Collection of execution mode parameters
710
*/
711
public static Collection<Object[]> executionModes();
712
}
713
```
714
715
**Usage Example:**
716
717
```java
718
import org.apache.flink.test.util.MultipleProgramsTestBase;
719
720
@RunWith(Parameterized.class)
721
public class MyParameterizedTest extends MultipleProgramsTestBase {
722
723
public MyParameterizedTest(TestExecutionMode mode) {
724
super(mode);
725
}
726
727
@Parameterized.Parameters
728
public static Collection<Object[]> executionModes() {
729
return MultipleProgramsTestBase.executionModes();
730
}
731
732
@Test
733
public void testInAllModes() throws Exception {
734
// Test will run in all execution modes
735
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
736
// ... test logic
737
}
738
}
739
```
740
741
## Utility Classes
742
743
### Test Process Builder
744
745
`TestProcessBuilder` provides utilities for creating and managing external test processes with JVM configuration.
746
747
```java { .api }
748
/**
749
* Utility wrapping ProcessBuilder with common testing options
750
*/
751
public class TestProcessBuilder {
752
/**
753
* Create process builder for main class
754
* @param mainClass Main class name to execute
755
*/
756
public TestProcessBuilder(String mainClass) throws IOException;
757
758
/**
759
* Start the configured process
760
* @return TestProcess wrapper for managing the process
761
*/
762
public TestProcess start() throws IOException;
763
764
/**
765
* Set JVM memory configuration
766
* @param jvmMemory Memory size for JVM
767
* @return This builder for method chaining
768
*/
769
public TestProcessBuilder setJvmMemory(MemorySize jvmMemory);
770
771
/**
772
* Add JVM argument
773
* @param arg JVM argument to add
774
* @return This builder for method chaining
775
*/
776
public TestProcessBuilder addJvmArg(String arg);
777
778
/**
779
* Add main class argument
780
* @param arg Argument for main class
781
* @return This builder for method chaining
782
*/
783
public TestProcessBuilder addMainClassArg(String arg);
784
785
/**
786
* Add Flink configuration as main class arguments
787
* @param config Flink configuration to add
788
* @return This builder for method chaining
789
*/
790
public TestProcessBuilder addConfigAsMainClassArgs(Configuration config);
791
792
/**
793
* Use clean environment for process
794
* @return This builder for method chaining
795
*/
796
public TestProcessBuilder withCleanEnvironment();
797
798
/**
799
* Wrapper for managing test process execution
800
*/
801
public static class TestProcess {
802
/**
803
* Get underlying Process object
804
* @return Process instance
805
*/
806
public Process getProcess();
807
808
/**
809
* Get process output writer
810
* @return StringWriter with process output
811
*/
812
public StringWriter getProcessOutput();
813
814
/**
815
* Get process error output writer
816
* @return StringWriter with error output
817
*/
818
public StringWriter getErrorOutput();
819
820
/**
821
* Destroy the process
822
*/
823
public void destroy();
824
}
825
}
826
```
827
828
### Shell Script Builder
829
830
`ShellScript` provides utilities for creating cross-platform shell scripts for testing.
831
832
```java { .api }
833
/**
834
* Utility for creating shell scripts on Linux and Windows
835
*/
836
public class ShellScript {
837
/**
838
* Create shell script builder
839
* @return Platform-appropriate shell script builder
840
*/
841
public static ShellScriptBuilder createShellScriptBuilder();
842
843
/**
844
* Get script file extension for current platform
845
* @return Script extension (.sh or .bat)
846
*/
847
public static String getScriptExtension();
848
849
/**
850
* Abstract builder for creating shell scripts
851
*/
852
public abstract static class ShellScriptBuilder {
853
/**
854
* Write script to file
855
* @param file Target file for script
856
*/
857
public void write(File file) throws IOException;
858
859
/**
860
* Add command to script
861
* @param command Command components to execute
862
*/
863
public abstract void command(List<String> command);
864
865
/**
866
* Set environment variable in script
867
* @param key Environment variable name
868
* @param value Environment variable value
869
*/
870
public abstract void env(String key, String value);
871
}
872
}
873
```
874
875
### Collection Test Environment
876
877
`CollectionTestEnvironment` provides collection-based execution environment for testing without cluster setup.
878
879
```java { .api }
880
/**
881
* Collection execution environment for testing
882
*/
883
public class CollectionTestEnvironment extends ExecutionEnvironment {
884
/**
885
* Get result of last job execution
886
* @return JobExecutionResult of last execution
887
*/
888
public JobExecutionResult getLastJobExecutionResult();
889
890
/**
891
* Execute job with given name
892
* @param jobName Name for the job execution
893
* @return JobExecutionResult after execution
894
*/
895
public JobExecutionResult execute(String jobName) throws Exception;
896
897
/**
898
* Set this environment as context environment
899
*/
900
protected void setAsContext();
901
902
/**
903
* Unset this environment from context
904
*/
905
protected static void unsetAsContext();
906
}
907
```
908
909
### Testing Security Context
910
911
`TestingSecurityContext` provides security context management for Kerberos-enabled testing scenarios.
912
913
```java { .api }
914
/**
915
* Security context for handling client and server principals in MiniKDC testing
916
*/
917
public class TestingSecurityContext {
918
/**
919
* Install security context with configurations
920
* @param config Security configuration
921
* @param clientSecurityConfigurationMap Client security configurations
922
*/
923
public static void install(
924
SecurityConfiguration config,
925
Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap
926
) throws Exception;
927
928
/**
929
* Client security configuration for testing
930
*/
931
static class ClientSecurityConfiguration {
932
/**
933
* Create client security configuration
934
* @param principal Kerberos principal
935
* @param keytab Path to keytab file
936
*/
937
public ClientSecurityConfiguration(String principal, String keytab);
938
939
/**
940
* Get Kerberos principal
941
* @return Principal string
942
*/
943
public String getPrincipal();
944
945
/**
946
* Get keytab file path
947
* @return Keytab file path
948
*/
949
public String getKeytab();
950
}
951
}
952
```