Test base classes provide standardized testing patterns, cluster lifecycle management, and support for parameterized testing across multiple execution modes. They handle the common setup and teardown required for Flink testing.
Base class for tests that run test programs in a Flink mini cluster with automatic cluster lifecycle management.
public abstract class AbstractTestBase extends TestBaseUtils {
@ClassRule
public static final TemporaryFolder temporaryFolder;
public AbstractTestBase(Configuration config);
public void startCluster() throws Exception;
public void stopCluster() throws Exception;
public int getTaskManagerNumSlots();
public void setTaskManagerNumSlots(int taskManagerNumSlots);
public int getNumTaskManagers();
public void setNumTaskManagers(int numTaskManagers);
public String getTempDirPath(String dirName) throws IOException;
public String getTempFilePath(String fileName) throws IOException;
public String createTempFile(String fileName, String contents) throws IOException;
public File createAndRegisterTempFile(String fileName) throws IOException;
}Usage Example:
public class MyCustomTest extends AbstractTestBase {
public MyCustomTest() {
super(new Configuration());
setNumTaskManagers(2);
setTaskManagerNumSlots(4);
}
@Test
public void testMyApplication() throws Exception {
startCluster();
// Create temp input file
String inputPath = createTempFile("input.txt", "line1\nline2\nline3");
// Your test logic here
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> input = env.readTextFile(inputPath);
// ... test logic
stopCluster();
}
}Base class for Java API program tests supporting multiple execution modes (cluster, collection, object reuse).
public abstract class JavaProgramTestBase extends AbstractTestBase {
public JavaProgramTestBase();
public JavaProgramTestBase(Configuration config);
public void setParallelism(int parallelism);
public void setNumberOfTestRepetitions(int numberOfTestRepetitions);
public int getParallelism();
public JobExecutionResult getLatestExecutionResult();
public boolean isCollectionExecution();
protected abstract void testProgram() throws Exception;
protected void preSubmit() throws Exception;
protected void postSubmit() throws Exception;
protected boolean skipCollectionExecution();
@Test
public void testJobWithObjectReuse() throws Exception;
@Test
public void testJobWithoutObjectReuse() throws Exception;
@Test
public void testJobCollectionExecution() throws Exception;
}Usage Example:
public class WordCountTest extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(
"hello world",
"hello flink",
"world flink"
);
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
List<Tuple2<String, Integer>> result = counts.collect();
String expected = "flink,2\nhello,2\nworld,2";
compareResultAsTuples(result, expected);
}
@Override
protected void preSubmit() throws Exception {
// Setup before job execution
setParallelism(4);
}
@Override
protected boolean skipCollectionExecution() {
// Skip collection execution for this test
return false;
}
}Base class for parameterized unit tests that run multiple tests and reuse the same Flink cluster across test methods.
public class MultipleProgramsTestBase extends TestBaseUtils {
protected static final int DEFAULT_PARALLELISM = 4;
protected static boolean startWebServer = false;
protected static LocalFlinkMiniCluster cluster = null;
public MultipleProgramsTestBase(TestExecutionMode mode);
@Before
public void setupEnvironment();
@After
public void teardownEnvironment();
@BeforeClass
public static void setup() throws Exception;
@AfterClass
public static void teardown() throws Exception;
@Parameterized.Parameters(name = "Execution mode = {0}")
public static Collection<Object[]> executionModes();
}
public enum TestExecutionMode {
CLUSTER,
CLUSTER_OBJECT_REUSE,
COLLECTION
}Usage Example:
@RunWith(Parameterized.class)
public class ParameterizedTest extends MultipleProgramsTestBase {
public ParameterizedTest(TestExecutionMode mode) {
super(mode);
}
@Test
public void testWordCount() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Test runs in the mode specified by constructor parameter
DataSet<String> input = env.fromElements("hello", "world", "hello");
List<String> result = input.distinct().collect();
compareResultAsText(result, "hello\nworld");
}
@Test
public void testFilter() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
List<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0).collect();
compareResultAsText(evenNumbers, "2\n4");
}
}Abstract base class for streaming program tests.
public abstract class StreamingProgramTestBase extends AbstractTestBase {
protected static final int DEFAULT_PARALLELISM = 4;
public StreamingProgramTestBase();
public void setParallelism(int parallelism);
public int getParallelism();
protected abstract void testProgram() throws Exception;
protected void preSubmit() throws Exception;
protected void postSubmit() throws Exception;
@Test
public void testJob() throws Exception;
}Usage Example:
public class StreamingWordCountTest extends StreamingProgramTestBase {
@Override
protected void testProgram() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(getParallelism());
DataStream<String> text = env.fromElements(
"hello world",
"hello flink"
);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
env.execute("Streaming WordCount");
}
@Override
protected void preSubmit() throws Exception {
setParallelism(2);
}
}Base class for streaming unit tests that run multiple tests and reuse the same Flink cluster.
public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
protected static final int DEFAULT_PARALLELISM = 4;
protected static LocalFlinkMiniCluster cluster;
protected static final Logger LOG;
public StreamingMultipleProgramsTestBase();
@BeforeClass
public static void setup() throws Exception;
@AfterClass
public static void teardown() throws Exception;
}Usage Example:
public class StreamingIntegrationTest extends StreamingMultipleProgramsTestBase {
@Test
public void testStreamingMap() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("a", "b", "c");
input.map(String::toUpperCase).print();
env.execute("Map Test");
}
@Test
public void testStreamingFilter() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
numbers.filter(x -> x > 3).print();
env.execute("Filter Test");
}
}All base classes support configuration of cluster parameters:
public MyTest() {
super(new Configuration());
setNumTaskManagers(2);
setTaskManagerNumSlots(8);
}Use the provided temporary file utilities for test data:
String inputFile = createTempFile("test-input.txt", "test data content");
String outputDir = getTempDirPath("test-output");Override lifecycle methods for custom setup/teardown:
@Override
protected void preSubmit() throws Exception {
// Setup before job execution
}
@Override
protected void postSubmit() throws Exception {
// Validation after job execution
}