or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md
tile.json

security-testing.mddocs/

Security Testing

Security testing utilities provide support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC (Mini Key Distribution Center).

Secure Test Environment

SecureTestEnvironment

Helper class that manages MiniKDC lifecycle for secure Flink testing scenarios.

public class SecureTestEnvironment {
    // Initialize secure testing environment
    public static void prepare(TemporaryFolder tempFolder);
    
    // Clean up secure testing resources
    public static void cleanup();
    
    // Configure Flink security settings
    public static Configuration populateFlinkSecureConfigurations(@Nullable Configuration flinkConf);
    
    // Get client security configurations
    public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap();
    
    // Get test keytab file path
    public static String getTestKeytab();
    
    // Get Hadoop service principal
    public static String getHadoopServicePrincipal();
}

Usage Example:

public class SecureFlinkTest {
    @ClassRule
    public static final TemporaryFolder tempFolder = new TemporaryFolder();
    
    @BeforeClass
    public static void setupSecurity() {
        // Initialize MiniKDC and security environment
        SecureTestEnvironment.prepare(tempFolder);
    }
    
    @AfterClass
    public static void cleanupSecurity() {
        // Cleanup MiniKDC and security resources
        SecureTestEnvironment.cleanup();
    }
    
    @Test
    public void testSecureJob() throws Exception {
        // Get secure Flink configuration
        Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(null);
        
        // Create secure mini cluster
        LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(flinkConfig, true);
        
        try {
            // Run secure job
            TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
            testEnv.setAsContext();
            
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            // ... secure job logic
            
        } finally {
            TestEnvironment.unsetAsContext();
            TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
        }
    }
}

Security Context Management

TestingSecurityContext

Provides security context management for testing with both client and server principals in MiniKDC environments.

@Internal
public class TestingSecurityContext {
    // Install security context with client configurations
    public static void install(SecurityUtils.SecurityConfiguration config, 
                             Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;
    
    // Client security configuration
    public static class ClientSecurityConfiguration {
        public String getPrincipal();
        public String getKeytab();
        public ClientSecurityConfiguration(String principal, String keytab);
    }
}

Usage Example:

@Test
public void testWithClientSecurity() throws Exception {
    // Prepare security environment
    SecureTestEnvironment.prepare(tempFolder);
    
    // Get client security configurations
    Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs = 
        SecureTestEnvironment.getClientSecurityConfigurationMap();
    
    // Configure security
    SecurityUtils.SecurityConfiguration securityConfig = new SecurityUtils.SecurityConfiguration();
    TestingSecurityContext.install(securityConfig, clientConfigs);
    
    // Now run tests with security context installed
    // ... test logic
    
    SecureTestEnvironment.cleanup();
}

Kerberos Authentication Testing

Setting up Kerberos Environment

public class KerberosTest extends AbstractTestBase {
    @ClassRule
    public static final TemporaryFolder tempFolder = new TemporaryFolder();
    
    @BeforeClass  
    public static void setupKerberos() {
        SecureTestEnvironment.prepare(tempFolder);
    }
    
    @AfterClass
    public static void cleanupKerberos() {
        SecureTestEnvironment.cleanup();
    }
    
    @Test
    public void testKerberosAuthentication() throws Exception {
        // Get secure configuration
        Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());
        
        // Verify Kerberos settings are populated
        assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
        assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));
        
        // Start secure cluster
        LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);
        
        try {
            // Test secure job execution
            TestEnvironment testEnv = new TestEnvironment(cluster, getParallelism(), false);
            testEnv.setAsContext();
            
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            
            // Job that requires authentication
            DataSet<String> secureData = env.fromElements("secure", "data");
            List<String> result = secureData.collect();
            
            TestBaseUtils.compareResultAsText(result, "secure\ndata");
            
        } finally {
            TestEnvironment.unsetAsContext();
            TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
        }
    }
}

HDFS Security Testing

When testing with secure HDFS, additional configuration is required:

@Test
public void testSecureHDFS() throws Exception {
    SecureTestEnvironment.prepare(tempFolder);
    
    // Get Hadoop service principal for HDFS
    String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
    String keytabPath = SecureTestEnvironment.getTestKeytab();
    
    Configuration config = new Configuration();
    config.setString("security.kerberos.login.principal", hadoopPrincipal);
    config.setString("security.kerberos.login.keytab", keytabPath);
    
    // Populate additional HDFS security settings
    Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);
    
    LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);
    
    try {
        TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
        testEnv.setAsContext();
        
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // Test reading from secure HDFS
        String hdfsPath = "hdfs://localhost:9000/secure/data.txt";
        DataSet<String> hdfsData = env.readTextFile(hdfsPath);
        
        // Process and verify results
        List<String> result = hdfsData.collect();
        assertFalse("Should read data from secure HDFS", result.isEmpty());
        
    } finally {
        TestEnvironment.unsetAsContext();
        TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    }
    
    SecureTestEnvironment.cleanup();
}

Security Configuration Options

Common security configuration options used in secure testing:

// Kerberos authentication
config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);

// SSL/TLS configuration  
config.setBoolean(SecurityOptions.SSL_ENABLED, true);
config.setString(SecurityOptions.SSL_KEYSTORE, keystorePath);
config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, keystorePassword);

// SASL authentication
config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);

Integration with Hadoop MiniKDC

The security testing infrastructure integrates with Hadoop's MiniKDC:

// MiniKDC provides:
// - Kerberos realm setup
// - Principal and keytab generation  
// - KDC server lifecycle management
// - Test user and service principals

@Test
public void testMiniKDCIntegration() throws Exception {
    SecureTestEnvironment.prepare(tempFolder);
    
    // MiniKDC is now running and configured
    String testKeytab = SecureTestEnvironment.getTestKeytab();
    assertTrue("Test keytab should exist", new File(testKeytab).exists());
    
    String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
    assertTrue("Hadoop principal should be configured", hadoopPrincipal != null && !hadoopPrincipal.isEmpty());
    
    // Client configurations are available
    Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs = 
        SecureTestEnvironment.getClientSecurityConfigurationMap();
    assertFalse("Client configurations should be available", clientConfigs.isEmpty());
    
    SecureTestEnvironment.cleanup();
}

Common Security Testing Patterns

Complete Secure Test Setup

public class SecureIntegrationTest extends AbstractTestBase {
    @ClassRule
    public static final TemporaryFolder tempFolder = new TemporaryFolder();
    
    @BeforeClass
    public static void setupSecureEnvironment() {
        SecureTestEnvironment.prepare(tempFolder);
    }
    
    @AfterClass  
    public static void cleanupSecureEnvironment() {
        SecureTestEnvironment.cleanup();
    }
    
    private LocalFlinkMiniCluster createSecureCluster() throws Exception {
        Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());
        return TestBaseUtils.startCluster(config, true);
    }
    
    @Test
    public void testSecureJobExecution() throws Exception {
        LocalFlinkMiniCluster cluster = createSecureCluster();
        
        try {
            runSecureTest(cluster);
        } finally {
            TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
        }
    }
    
    private void runSecureTest(LocalFlinkMiniCluster cluster) throws Exception {
        TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
        testEnv.setAsContext();
        
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            // ... secure job logic
        } finally {
            TestEnvironment.unsetAsContext();
        }
    }
}

Security Testing Dependencies

To use security testing features, ensure your project includes the hadoop-minikdc dependency (marked as optional in the POM):

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-minikdc</artifactId>
    <version>${minikdc.version}</version>
    <scope>test</scope>
</dependency>