or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md
tile.json

security-testing.mddocs/

Security Testing

Utilities for testing Flink applications with Kerberos security enabled, including MiniKDC lifecycle management and secure configuration setup for comprehensive security testing scenarios.

Capabilities

Secure Test Environment

SecureTestEnvironment provides helper utilities for handling MiniKDC lifecycle and secure configurations in Flink tests.

/**
 * Helper for handling MiniKDC lifecycle and secure configurations
 */
public class SecureTestEnvironment {
    /**
     * Default hostname for security testing
     */
    public static final String HOST_NAME = "localhost";

    /**
     * Prepares secure environment with additional principals
     * @param folder Temporary folder for KDC files
     * @param additionalPrincipals Additional principals to create beyond defaults
     */
    public static void prepare(TemporaryFolder folder, String... additionalPrincipals) throws Exception;

    /**
     * Cleans up secure environment and stops KDC
     */
    public static void cleanup() throws Exception;

    /**
     * Populates Flink secure configurations
     * @param configuration Configuration to populate with security settings
     * @return Updated configuration with security settings
     */
    public static Configuration populateFlinkSecureConfigurations(Configuration configuration);

    /**
     * Gets client security configurations
     * @return Map of client security configurations by name
     */
    public static Map<String, ClientSecurityConfiguration> getClientSecurityConfigurationMap();

    /**
     * Gets KDC realm name
     * @return KDC realm string
     */
    public static String getRealm();

    /**
     * Gets test keytab file path
     * @return Path to test keytab file
     */
    public static String getTestKeytab();

    /**
     * Gets Hadoop service principal
     * @return Hadoop service principal string
     */
    public static String getHadoopServicePrincipal();
}

Usage Example:

import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.junit.rules.TemporaryFolder;

public class SecureFlinkTest {
    
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    
    @Test
    public void testSecureFlinkJob() throws Exception {
        try {
            // Prepare secure environment with additional principals
            SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");
            
            // Get secure configuration
            Configuration secureConfig = new Configuration();
            secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(secureConfig);
            
            // Verify security settings were applied
            assertTrue(secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE));
            assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
            assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));
            
            // Get security information
            String realm = SecureTestEnvironment.getRealm();
            String keytab = SecureTestEnvironment.getTestKeytab();
            String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
            
            assertNotNull("Realm should be set", realm);
            assertNotNull("Keytab should be available", keytab);
            assertNotNull("Hadoop principal should be set", hadoopPrincipal);
            
            // Use secure configuration for your Flink job
            // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, secureConfig);
            // ... run your secure job
            
        } finally {
            // Always cleanup security environment
            SecureTestEnvironment.cleanup();
        }
    }
}

Testing Security Context

TestingSecurityContext provides test security context handling for client and server principals.

/**
 * Test security context for handling client and server principals
 */
public class TestingSecurityContext {
    /**
     * Install security context with configurations
     * @param securityConfiguration Security configuration
     * @param clientSecurityConfigurations Map of client security configurations
     */
    public static void install(
        SecurityConfiguration securityConfiguration, 
        Map<String, ClientSecurityConfiguration> clientSecurityConfigurations
    ) throws Exception;

    /**
     * Client security configuration
     */
    public static class ClientSecurityConfiguration {
        // Implementation details for client security configuration
    }
}

Complete Security Testing Examples

Basic Secure Environment Test

import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.configuration.Configuration;

public class BasicSecurityTest {
    
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    
    @Test
    public void testBasicSecureExecution() throws Exception {
        try {
            // Setup secure environment
            SecureTestEnvironment.prepare(tempFolder);
            
            // Create secure configuration
            Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(
                new Configuration()
            );
            
            // Create environment with secure config
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, config);
            
            // Simple streaming job
            DataStream<String> source = env.fromElements("secure1", "secure2", "secure3");
            DataStream<String> processed = source.map(s -> "SECURE-" + s.toUpperCase());
            
            processed.print();
            
            // Execute securely
            env.execute("Secure Test Job");
            
        } finally {
            SecureTestEnvironment.cleanup();
        }
    }
}

Advanced Security Testing with Custom Principals

@Test
public void testCustomPrincipals() throws Exception {
    try {
        // Setup with custom principals
        SecureTestEnvironment.prepare(
            tempFolder,
            "alice/localhost@EXAMPLE.COM",
            "bob/localhost@EXAMPLE.COM",
            "service/localhost@EXAMPLE.COM"
        );
        
        // Get client security configurations
        Map<String, ClientSecurityConfiguration> clientConfigs = 
            SecureTestEnvironment.getClientSecurityConfigurationMap();
        
        assertFalse("Client configurations should be available", clientConfigs.isEmpty());
        
        // Test with security context
        SecurityConfiguration securityConfig = // ... create security config
        TestingSecurityContext.install(securityConfig, clientConfigs);
        
        // Create secure Flink configuration
        Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(
            new Configuration()
        );
        
        // Verify security settings
        String keytab = SecureTestEnvironment.getTestKeytab();
        String realm = SecureTestEnvironment.getRealm();
        
        assertTrue("Keytab file should exist", new File(keytab).exists());
        assertEquals("EXAMPLE.COM", realm);
        
        // Run secure job with multiple principals
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, flinkConfig);
        
        DataStream<String> data = env.fromElements("principal1", "principal2", "principal3");
        DataStream<String> authenticated = data.map(s -> 
            String.format("Authenticated[%s]@%s", s, realm)
        );
        
        authenticated.print();
        env.execute("Multi-Principal Security Test");
        
    } finally {
        SecureTestEnvironment.cleanup();
    }
}

Integration with MiniCluster Security

import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;

public class SecureMiniClusterTest {
    
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    
    private MiniClusterWithClientResource secureCluster;
    
    @Before
    public void setupSecureCluster() throws Exception {
        // Setup secure environment first
        SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");
        
        // Create secure configuration
        Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(
            new Configuration()
        );
        
        // Create secure mini cluster
        MiniClusterResourceConfiguration clusterConfig = 
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(2)
                .setNumberTaskManagers(1)
                .setConfiguration(secureConfig)
                .build();
        
        secureCluster = new MiniClusterWithClientResource(clusterConfig);
        secureCluster.before();
    }
    
    @After
    public void teardownSecureCluster() throws Exception {
        if (secureCluster != null) {
            secureCluster.after();
        }
        SecureTestEnvironment.cleanup();
    }
    
    @Test
    public void testSecureClusterExecution() throws Exception {
        // Get secure test environment
        TestEnvironment env = secureCluster.getTestEnvironment();
        
        // Create secure job
        DataSet<String> input = env.fromElements("secure-data-1", "secure-data-2");
        DataSet<String> processed = input.map(s -> "AUTHENTICATED-" + s);
        
        List<String> results = processed.collect();
        
        // Verify secure execution
        assertEquals(2, results.size());
        assertTrue(results.get(0).startsWith("AUTHENTICATED-"));
        assertTrue(results.get(1).startsWith("AUTHENTICATED-"));
    }
}

Testing Security Configuration Validation

@Test
public void testSecurityConfigurationValidation() throws Exception {
    try {
        SecureTestEnvironment.prepare(tempFolder);
        
        Configuration config = new Configuration();
        Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);
        
        // Validate required security settings
        assertTrue("Kerberos should be enabled", 
            secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false));
        
        String keytab = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        assertNotNull("Keytab path should be set", keytab);
        assertTrue("Keytab file should exist", new File(keytab).exists());
        
        String principal = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
        assertNotNull("Principal should be set", principal);
        assertTrue("Principal should contain realm", principal.contains("@"));
        
        // Validate realm and service principal
        String realm = SecureTestEnvironment.getRealm();
        String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
        
        assertNotNull("Realm should be available", realm);
        assertNotNull("Hadoop principal should be available", hadoopPrincipal);
        assertTrue("Hadoop principal should contain realm", hadoopPrincipal.contains(realm));
        
    } finally {
        SecureTestEnvironment.cleanup();
    }
}

Security Testing Best Practices

Cleanup Pattern

Always ensure proper cleanup of security resources:

public class SecurityTestPattern {
    
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    
    @Before
    public void setupSecurity() throws Exception {
        SecureTestEnvironment.prepare(tempFolder);
    }
    
    @After
    public void cleanupSecurity() throws Exception {
        // Always cleanup, even if test fails
        SecureTestEnvironment.cleanup();
    }
    
    // Or use try-with-resources pattern in individual tests
    @Test
    public void testWithAutoCleanup() throws Exception {
        SecureTestEnvironment.prepare(tempFolder);
        try {
            // Test logic here
        } finally {
            SecureTestEnvironment.cleanup();
        }
    }
}

Error Handling in Security Tests

@Test
public void testSecurityErrorHandling() throws Exception {
    try {
        SecureTestEnvironment.prepare(tempFolder);
        
        Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(
            new Configuration()
        );
        
        // Test with invalid principal (should handle gracefully)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
        DataStream<String> data = env.fromElements("test");
        
        try {
            data.print();
            env.execute("Security Error Test");
        } catch (Exception e) {
            // Verify it's a security-related exception
            assertTrue("Should be security-related error", 
                e.getMessage().contains("Kerberos") || 
                e.getMessage().contains("authentication") ||
                e.getMessage().contains("principal"));
        }
        
    } finally {
        SecureTestEnvironment.cleanup();
    }
}