Utilities for testing Flink applications with Kerberos security enabled, including MiniKDC lifecycle management and secure configuration setup for comprehensive security testing scenarios.
SecureTestEnvironment provides helper utilities for handling MiniKDC lifecycle and secure configurations in Flink tests.
/**
* Helper for handling MiniKDC lifecycle and secure configurations
*/
public class SecureTestEnvironment {
/**
* Default hostname for security testing
*/
public static final String HOST_NAME = "localhost";
/**
* Prepares secure environment with additional principals
* @param folder Temporary folder for KDC files
* @param additionalPrincipals Additional principals to create beyond defaults
*/
public static void prepare(TemporaryFolder folder, String... additionalPrincipals) throws Exception;
/**
* Cleans up secure environment and stops KDC
*/
public static void cleanup() throws Exception;
/**
* Populates Flink secure configurations
* @param configuration Configuration to populate with security settings
* @return Updated configuration with security settings
*/
public static Configuration populateFlinkSecureConfigurations(Configuration configuration);
/**
* Gets client security configurations
* @return Map of client security configurations by name
*/
public static Map<String, ClientSecurityConfiguration> getClientSecurityConfigurationMap();
/**
* Gets KDC realm name
* @return KDC realm string
*/
public static String getRealm();
/**
* Gets test keytab file path
* @return Path to test keytab file
*/
public static String getTestKeytab();
/**
* Gets Hadoop service principal
* @return Hadoop service principal string
*/
public static String getHadoopServicePrincipal();
}Usage Example:
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.junit.rules.TemporaryFolder;
public class SecureFlinkTest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testSecureFlinkJob() throws Exception {
try {
// Prepare secure environment with additional principals
SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");
// Get secure configuration
Configuration secureConfig = new Configuration();
secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(secureConfig);
// Verify security settings were applied
assertTrue(secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE));
assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));
// Get security information
String realm = SecureTestEnvironment.getRealm();
String keytab = SecureTestEnvironment.getTestKeytab();
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
assertNotNull("Realm should be set", realm);
assertNotNull("Keytab should be available", keytab);
assertNotNull("Hadoop principal should be set", hadoopPrincipal);
// Use secure configuration for your Flink job
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, secureConfig);
// ... run your secure job
} finally {
// Always cleanup security environment
SecureTestEnvironment.cleanup();
}
}
}TestingSecurityContext provides test security context handling for client and server principals.
/**
* Test security context for handling client and server principals
*/
public class TestingSecurityContext {
/**
* Install security context with configurations
* @param securityConfiguration Security configuration
* @param clientSecurityConfigurations Map of client security configurations
*/
public static void install(
SecurityConfiguration securityConfiguration,
Map<String, ClientSecurityConfiguration> clientSecurityConfigurations
) throws Exception;
/**
* Client security configuration
*/
public static class ClientSecurityConfiguration {
// Implementation details for client security configuration
}
}import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.configuration.Configuration;
public class BasicSecurityTest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testBasicSecureExecution() throws Exception {
try {
// Setup secure environment
SecureTestEnvironment.prepare(tempFolder);
// Create secure configuration
Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(
new Configuration()
);
// Create environment with secure config
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, config);
// Simple streaming job
DataStream<String> source = env.fromElements("secure1", "secure2", "secure3");
DataStream<String> processed = source.map(s -> "SECURE-" + s.toUpperCase());
processed.print();
// Execute securely
env.execute("Secure Test Job");
} finally {
SecureTestEnvironment.cleanup();
}
}
}@Test
public void testCustomPrincipals() throws Exception {
try {
// Setup with custom principals
SecureTestEnvironment.prepare(
tempFolder,
"alice/localhost@EXAMPLE.COM",
"bob/localhost@EXAMPLE.COM",
"service/localhost@EXAMPLE.COM"
);
// Get client security configurations
Map<String, ClientSecurityConfiguration> clientConfigs =
SecureTestEnvironment.getClientSecurityConfigurationMap();
assertFalse("Client configurations should be available", clientConfigs.isEmpty());
// Test with security context
SecurityConfiguration securityConfig = // ... create security config
TestingSecurityContext.install(securityConfig, clientConfigs);
// Create secure Flink configuration
Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(
new Configuration()
);
// Verify security settings
String keytab = SecureTestEnvironment.getTestKeytab();
String realm = SecureTestEnvironment.getRealm();
assertTrue("Keytab file should exist", new File(keytab).exists());
assertEquals("EXAMPLE.COM", realm);
// Run secure job with multiple principals
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, flinkConfig);
DataStream<String> data = env.fromElements("principal1", "principal2", "principal3");
DataStream<String> authenticated = data.map(s ->
String.format("Authenticated[%s]@%s", s, realm)
);
authenticated.print();
env.execute("Multi-Principal Security Test");
} finally {
SecureTestEnvironment.cleanup();
}
}import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
public class SecureMiniClusterTest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
private MiniClusterWithClientResource secureCluster;
@Before
public void setupSecureCluster() throws Exception {
// Setup secure environment first
SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");
// Create secure configuration
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(
new Configuration()
);
// Create secure mini cluster
MiniClusterResourceConfiguration clusterConfig =
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.setConfiguration(secureConfig)
.build();
secureCluster = new MiniClusterWithClientResource(clusterConfig);
secureCluster.before();
}
@After
public void teardownSecureCluster() throws Exception {
if (secureCluster != null) {
secureCluster.after();
}
SecureTestEnvironment.cleanup();
}
@Test
public void testSecureClusterExecution() throws Exception {
// Get secure test environment
TestEnvironment env = secureCluster.getTestEnvironment();
// Create secure job
DataSet<String> input = env.fromElements("secure-data-1", "secure-data-2");
DataSet<String> processed = input.map(s -> "AUTHENTICATED-" + s);
List<String> results = processed.collect();
// Verify secure execution
assertEquals(2, results.size());
assertTrue(results.get(0).startsWith("AUTHENTICATED-"));
assertTrue(results.get(1).startsWith("AUTHENTICATED-"));
}
}@Test
public void testSecurityConfigurationValidation() throws Exception {
try {
SecureTestEnvironment.prepare(tempFolder);
Configuration config = new Configuration();
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);
// Validate required security settings
assertTrue("Kerberos should be enabled",
secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false));
String keytab = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
assertNotNull("Keytab path should be set", keytab);
assertTrue("Keytab file should exist", new File(keytab).exists());
String principal = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
assertNotNull("Principal should be set", principal);
assertTrue("Principal should contain realm", principal.contains("@"));
// Validate realm and service principal
String realm = SecureTestEnvironment.getRealm();
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
assertNotNull("Realm should be available", realm);
assertNotNull("Hadoop principal should be available", hadoopPrincipal);
assertTrue("Hadoop principal should contain realm", hadoopPrincipal.contains(realm));
} finally {
SecureTestEnvironment.cleanup();
}
}Always ensure proper cleanup of security resources:
public class SecurityTestPattern {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Before
public void setupSecurity() throws Exception {
SecureTestEnvironment.prepare(tempFolder);
}
@After
public void cleanupSecurity() throws Exception {
// Always cleanup, even if test fails
SecureTestEnvironment.cleanup();
}
// Or use try-with-resources pattern in individual tests
@Test
public void testWithAutoCleanup() throws Exception {
SecureTestEnvironment.prepare(tempFolder);
try {
// Test logic here
} finally {
SecureTestEnvironment.cleanup();
}
}
}@Test
public void testSecurityErrorHandling() throws Exception {
try {
SecureTestEnvironment.prepare(tempFolder);
Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(
new Configuration()
);
// Test with invalid principal (should handle gracefully)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
DataStream<String> data = env.fromElements("test");
try {
data.print();
env.execute("Security Error Test");
} catch (Exception e) {
// Verify it's a security-related exception
assertTrue("Should be security-related error",
e.getMessage().contains("Kerberos") ||
e.getMessage().contains("authentication") ||
e.getMessage().contains("principal"));
}
} finally {
SecureTestEnvironment.cleanup();
}
}