Security testing utilities provide support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC (Mini Key Distribution Center).
Helper class that manages MiniKDC lifecycle for secure Flink testing scenarios.
public class SecureTestEnvironment {
// Initialize secure testing environment
public static void prepare(TemporaryFolder tempFolder);
// Clean up secure testing resources
public static void cleanup();
// Configure Flink security settings
public static Configuration populateFlinkSecureConfigurations(@Nullable Configuration flinkConf);
// Get client security configurations
public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap();
// Get test keytab file path
public static String getTestKeytab();
// Get Hadoop service principal
public static String getHadoopServicePrincipal();
}Usage Example:
public class SecureFlinkTest {
@ClassRule
public static final TemporaryFolder tempFolder = new TemporaryFolder();
@BeforeClass
public static void setupSecurity() {
// Initialize MiniKDC and security environment
SecureTestEnvironment.prepare(tempFolder);
}
@AfterClass
public static void cleanupSecurity() {
// Cleanup MiniKDC and security resources
SecureTestEnvironment.cleanup();
}
@Test
public void testSecureJob() throws Exception {
// Get secure Flink configuration
Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(null);
// Create secure mini cluster
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(flinkConfig, true);
try {
// Run secure job
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
testEnv.setAsContext();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ... secure job logic
} finally {
TestEnvironment.unsetAsContext();
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
}
}Provides security context management for testing with both client and server principals in MiniKDC environments.
@Internal
public class TestingSecurityContext {
// Install security context with client configurations
public static void install(SecurityUtils.SecurityConfiguration config,
Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;
// Client security configuration
public static class ClientSecurityConfiguration {
public String getPrincipal();
public String getKeytab();
public ClientSecurityConfiguration(String principal, String keytab);
}
}Usage Example:
@Test
public void testWithClientSecurity() throws Exception {
// Prepare security environment
SecureTestEnvironment.prepare(tempFolder);
// Get client security configurations
Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs =
SecureTestEnvironment.getClientSecurityConfigurationMap();
// Configure security
SecurityUtils.SecurityConfiguration securityConfig = new SecurityUtils.SecurityConfiguration();
TestingSecurityContext.install(securityConfig, clientConfigs);
// Now run tests with security context installed
// ... test logic
SecureTestEnvironment.cleanup();
}public class KerberosTest extends AbstractTestBase {
@ClassRule
public static final TemporaryFolder tempFolder = new TemporaryFolder();
@BeforeClass
public static void setupKerberos() {
SecureTestEnvironment.prepare(tempFolder);
}
@AfterClass
public static void cleanupKerberos() {
SecureTestEnvironment.cleanup();
}
@Test
public void testKerberosAuthentication() throws Exception {
// Get secure configuration
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());
// Verify Kerberos settings are populated
assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));
// Start secure cluster
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);
try {
// Test secure job execution
TestEnvironment testEnv = new TestEnvironment(cluster, getParallelism(), false);
testEnv.setAsContext();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Job that requires authentication
DataSet<String> secureData = env.fromElements("secure", "data");
List<String> result = secureData.collect();
TestBaseUtils.compareResultAsText(result, "secure\ndata");
} finally {
TestEnvironment.unsetAsContext();
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
}
}When testing with secure HDFS, additional configuration is required:
@Test
public void testSecureHDFS() throws Exception {
SecureTestEnvironment.prepare(tempFolder);
// Get Hadoop service principal for HDFS
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
String keytabPath = SecureTestEnvironment.getTestKeytab();
Configuration config = new Configuration();
config.setString("security.kerberos.login.principal", hadoopPrincipal);
config.setString("security.kerberos.login.keytab", keytabPath);
// Populate additional HDFS security settings
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);
try {
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
testEnv.setAsContext();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Test reading from secure HDFS
String hdfsPath = "hdfs://localhost:9000/secure/data.txt";
DataSet<String> hdfsData = env.readTextFile(hdfsPath);
// Process and verify results
List<String> result = hdfsData.collect();
assertFalse("Should read data from secure HDFS", result.isEmpty());
} finally {
TestEnvironment.unsetAsContext();
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
SecureTestEnvironment.cleanup();
}Common security configuration options used in secure testing:
// Kerberos authentication
config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);
// SSL/TLS configuration
config.setBoolean(SecurityOptions.SSL_ENABLED, true);
config.setString(SecurityOptions.SSL_KEYSTORE, keystorePath);
config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, keystorePassword);
// SASL authentication
config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);The security testing infrastructure integrates with Hadoop's MiniKDC:
// MiniKDC provides:
// - Kerberos realm setup
// - Principal and keytab generation
// - KDC server lifecycle management
// - Test user and service principals
@Test
public void testMiniKDCIntegration() throws Exception {
SecureTestEnvironment.prepare(tempFolder);
// MiniKDC is now running and configured
String testKeytab = SecureTestEnvironment.getTestKeytab();
assertTrue("Test keytab should exist", new File(testKeytab).exists());
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
assertTrue("Hadoop principal should be configured", hadoopPrincipal != null && !hadoopPrincipal.isEmpty());
// Client configurations are available
Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs =
SecureTestEnvironment.getClientSecurityConfigurationMap();
assertFalse("Client configurations should be available", clientConfigs.isEmpty());
SecureTestEnvironment.cleanup();
}public class SecureIntegrationTest extends AbstractTestBase {
@ClassRule
public static final TemporaryFolder tempFolder = new TemporaryFolder();
@BeforeClass
public static void setupSecureEnvironment() {
SecureTestEnvironment.prepare(tempFolder);
}
@AfterClass
public static void cleanupSecureEnvironment() {
SecureTestEnvironment.cleanup();
}
private LocalFlinkMiniCluster createSecureCluster() throws Exception {
Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());
return TestBaseUtils.startCluster(config, true);
}
@Test
public void testSecureJobExecution() throws Exception {
LocalFlinkMiniCluster cluster = createSecureCluster();
try {
runSecureTest(cluster);
} finally {
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
}
private void runSecureTest(LocalFlinkMiniCluster cluster) throws Exception {
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
testEnv.setAsContext();
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ... secure job logic
} finally {
TestEnvironment.unsetAsContext();
}
}
}To use security testing features, ensure your project includes the hadoop-minikdc dependency (marked as optional in the POM):
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${minikdc.version}</version>
<scope>test</scope>
</dependency>