Comprehensive testing utilities for Apache Flink stream and batch processing applications
—
Specialized execution environments for streaming and batch testing with multiple execution modes, object reuse configuration, and temporary file management. These environments provide isolated testing contexts for Flink applications.
StreamExecutionEnvironment specifically designed for testing on MiniCluster with job execution tracking and context management.
public class TestStreamEnvironment extends StreamExecutionEnvironment {
public TestStreamEnvironment(MiniCluster miniCluster, int parallelism);
public TestStreamEnvironment(
MiniCluster miniCluster,
Configuration config,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths);
public static void setAsContext(MiniCluster miniCluster, int parallelism);
public static void setAsContext(
MiniCluster miniCluster,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths);
public static void unsetAsContext();
public void setAsContext();
public JobExecutionResult getLastJobExecutionResult();
}import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.runtime.minicluster.MiniCluster;
// Create test environment with specific parallelism
MiniCluster miniCluster = getMiniCluster();
TestStreamEnvironment env = new TestStreamEnvironment(miniCluster, 4);
// Set as global context for factory methods
TestStreamEnvironment.setAsContext(miniCluster, 4);
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
// Execute job and get result
env.fromElements(1, 2, 3, 4, 5)
.map(x -> x * 2)
.print();
JobExecutionResult result = env.execute("Test Job");
JobExecutionResult lastResult = env.getLastJobExecutionResult();Interface and implementations for different SQL job submission methods including embedded SQL clients, gateway clients, and REST clients.
public interface SQLJobClientMode {
EmbeddedSqlClient getEmbeddedSqlClient();
GatewaySqlClient getGatewaySqlClient(String host, int port);
HiveJDBC getHiveJDBC(String host, int port);
RestClient getRestClient(String host, int port, String version);
}public static class EmbeddedSqlClient {
// Embedded SQL client implementation
}
public static class GatewaySqlClient {
// Gateway SQL client implementation
}
public static class HiveJDBC {
// Hive JDBC client implementation
}
public static class RestClient {
// REST client implementation
}Utilities for submitting and managing Flink jobs in test environments with support for both regular jobs and SQL jobs.
public class JobSubmission {
// Job submission functionality
}
public class SQLJobSubmission {
// SQL job submission functionality
}Test environment with Kerberos security support for testing secure Flink deployments.
public class SecureTestEnvironment {
// Security context management for tests
}Security context management utilities for handling authentication and authorization in test environments.
public class TestingSecurityContext {
// Security context utilities
}Utilities for executing external processes and shell scripts within test environments.
public class TestProcessBuilder {
// Process builder for test execution
}
public class ShellScript {
// Shell script execution utilities
}Service loader for MiniCluster pipeline executor, enabling custom execution strategies for test environments.
public class MiniClusterPipelineExecutorServiceLoader {
// Service loader for pipeline executors
}Enumeration of available execution modes for parameterized testing with different cluster configurations.
public enum TestExecutionMode {
CLUSTER, // Standard cluster execution
CLUSTER_OBJECT_REUSE // Cluster execution with object reuse optimization
}Support for collection-based execution mode that runs jobs in-memory without cluster deployment, useful for unit testing individual operations.
public boolean isCollectionExecution();General utility functions for test environment setup and management.
public class TestUtils {
// Miscellaneous test utility functions
}File manipulation utilities specifically designed for test environments including temporary file management and path resolution.
public class FileUtils {
// File manipulation utilities for tests
}Special exception type indicating successful test completion, used in specific testing scenarios where exceptions signal success rather than failure.
public class SuccessException extends Exception {
// Exception indicating test success
}@Test
void testStreamingApplication() throws Exception {
TestStreamEnvironment env = new TestStreamEnvironment(miniCluster, 2);
DataStream<String> input = env.fromElements("a", "b", "c");
DataStream<String> result = input.map(String::toUpperCase);
result.print();
JobExecutionResult jobResult = env.execute("Upper Case Job");
// Verify execution was successful
assertNotNull(jobResult);
assertTrue(jobResult.getNetRuntime() > 0);
}@ParameterizedTest
@EnumSource(TestExecutionMode.class)
void testWithDifferentModes(TestExecutionMode mode) throws Exception {
// Configure environment based on execution mode
switch (mode) {
case CLUSTER:
// Standard cluster configuration
break;
case CLUSTER_OBJECT_REUSE:
// Object reuse optimization configuration
break;
}
// Run test with specified mode
}@BeforeEach
void setupTestContext() {
TestStreamEnvironment.setAsContext(miniCluster, 4);
}
@AfterEach
void cleanupTestContext() {
TestStreamEnvironment.unsetAsContext();
}
@Test
void testWithGlobalContext() {
// Use factory method - will return TestStreamEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Test implementation
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils