0
# Security Testing
1
2
Security testing utilities provide support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC (Mini Key Distribution Center).
3
4
## Secure Test Environment
5
6
### SecureTestEnvironment
7
8
Helper class that manages MiniKDC lifecycle for secure Flink testing scenarios.
9
10
```java { .api }
11
public class SecureTestEnvironment {
12
// Initialize secure testing environment
13
public static void prepare(TemporaryFolder tempFolder);
14
15
// Clean up secure testing resources
16
public static void cleanup();
17
18
// Configure Flink security settings
19
public static Configuration populateFlinkSecureConfigurations(@Nullable Configuration flinkConf);
20
21
// Get client security configurations
22
public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap();
23
24
// Get test keytab file path
25
public static String getTestKeytab();
26
27
// Get Hadoop service principal
28
public static String getHadoopServicePrincipal();
29
}
30
```
31
32
**Usage Example:**
33
34
```java
35
public class SecureFlinkTest {
36
@ClassRule
37
public static final TemporaryFolder tempFolder = new TemporaryFolder();
38
39
@BeforeClass
40
public static void setupSecurity() {
41
// Initialize MiniKDC and security environment
42
SecureTestEnvironment.prepare(tempFolder);
43
}
44
45
@AfterClass
46
public static void cleanupSecurity() {
47
// Cleanup MiniKDC and security resources
48
SecureTestEnvironment.cleanup();
49
}
50
51
@Test
52
public void testSecureJob() throws Exception {
53
// Get secure Flink configuration
54
Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(null);
55
56
// Create secure mini cluster
57
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(flinkConfig, true);
58
59
try {
60
// Run secure job
61
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
62
testEnv.setAsContext();
63
64
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
65
// ... secure job logic
66
67
} finally {
68
TestEnvironment.unsetAsContext();
69
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
70
}
71
}
72
}
73
```
74
75
## Security Context Management
76
77
### TestingSecurityContext
78
79
Provides security context management for testing with both client and server principals in MiniKDC environments.
80
81
```java { .api }
82
@Internal
83
public class TestingSecurityContext {
84
// Install security context with client configurations
85
public static void install(SecurityUtils.SecurityConfiguration config,
86
Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;
87
88
// Client security configuration
89
public static class ClientSecurityConfiguration {
90
public String getPrincipal();
91
public String getKeytab();
92
public ClientSecurityConfiguration(String principal, String keytab);
93
}
94
}
95
```
96
97
**Usage Example:**
98
99
```java
100
@Test
101
public void testWithClientSecurity() throws Exception {
102
// Prepare security environment
103
SecureTestEnvironment.prepare(tempFolder);
104
105
// Get client security configurations
106
Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs =
107
SecureTestEnvironment.getClientSecurityConfigurationMap();
108
109
// Configure security
110
SecurityUtils.SecurityConfiguration securityConfig = new SecurityUtils.SecurityConfiguration();
111
TestingSecurityContext.install(securityConfig, clientConfigs);
112
113
// Now run tests with security context installed
114
// ... test logic
115
116
SecureTestEnvironment.cleanup();
117
}
118
```
119
120
## Kerberos Authentication Testing
121
122
### Setting up Kerberos Environment
123
124
```java
125
public class KerberosTest extends AbstractTestBase {
126
@ClassRule
127
public static final TemporaryFolder tempFolder = new TemporaryFolder();
128
129
@BeforeClass
130
public static void setupKerberos() {
131
SecureTestEnvironment.prepare(tempFolder);
132
}
133
134
@AfterClass
135
public static void cleanupKerberos() {
136
SecureTestEnvironment.cleanup();
137
}
138
139
@Test
140
public void testKerberosAuthentication() throws Exception {
141
// Get secure configuration
142
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());
143
144
// Verify Kerberos settings are populated
145
assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
146
assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));
147
148
// Start secure cluster
149
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);
150
151
try {
152
// Test secure job execution
153
TestEnvironment testEnv = new TestEnvironment(cluster, getParallelism(), false);
154
testEnv.setAsContext();
155
156
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
157
158
// Job that requires authentication
159
DataSet<String> secureData = env.fromElements("secure", "data");
160
List<String> result = secureData.collect();
161
162
TestBaseUtils.compareResultAsText(result, "secure\ndata");
163
164
} finally {
165
TestEnvironment.unsetAsContext();
166
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
167
}
168
}
169
}
170
```
171
172
## HDFS Security Testing
173
174
When testing with secure HDFS, additional configuration is required:
175
176
```java
177
@Test
178
public void testSecureHDFS() throws Exception {
179
SecureTestEnvironment.prepare(tempFolder);
180
181
// Get Hadoop service principal for HDFS
182
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
183
String keytabPath = SecureTestEnvironment.getTestKeytab();
184
185
Configuration config = new Configuration();
186
config.setString("security.kerberos.login.principal", hadoopPrincipal);
187
config.setString("security.kerberos.login.keytab", keytabPath);
188
189
// Populate additional HDFS security settings
190
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);
191
192
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);
193
194
try {
195
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
196
testEnv.setAsContext();
197
198
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
199
200
// Test reading from secure HDFS
201
String hdfsPath = "hdfs://localhost:9000/secure/data.txt";
202
DataSet<String> hdfsData = env.readTextFile(hdfsPath);
203
204
// Process and verify results
205
List<String> result = hdfsData.collect();
206
assertFalse("Should read data from secure HDFS", result.isEmpty());
207
208
} finally {
209
TestEnvironment.unsetAsContext();
210
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
211
}
212
213
SecureTestEnvironment.cleanup();
214
}
215
```
216
217
## Security Configuration Options
218
219
Common security configuration options used in secure testing:
220
221
```java
222
// Kerberos authentication
223
config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
224
config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);
225
226
// SSL/TLS configuration
227
config.setBoolean(SecurityOptions.SSL_ENABLED, true);
228
config.setString(SecurityOptions.SSL_KEYSTORE, keystorePath);
229
config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, keystorePassword);
230
231
// SASL authentication
232
config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
233
```
234
235
## Integration with Hadoop MiniKDC
236
237
The security testing infrastructure integrates with Hadoop's MiniKDC:
238
239
```java
240
// MiniKDC provides:
241
// - Kerberos realm setup
242
// - Principal and keytab generation
243
// - KDC server lifecycle management
244
// - Test user and service principals
245
246
@Test
247
public void testMiniKDCIntegration() throws Exception {
248
SecureTestEnvironment.prepare(tempFolder);
249
250
// MiniKDC is now running and configured
251
String testKeytab = SecureTestEnvironment.getTestKeytab();
252
assertTrue("Test keytab should exist", new File(testKeytab).exists());
253
254
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
255
assertTrue("Hadoop principal should be configured", hadoopPrincipal != null && !hadoopPrincipal.isEmpty());
256
257
// Client configurations are available
258
Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs =
259
SecureTestEnvironment.getClientSecurityConfigurationMap();
260
assertFalse("Client configurations should be available", clientConfigs.isEmpty());
261
262
SecureTestEnvironment.cleanup();
263
}
264
```
265
266
## Common Security Testing Patterns
267
268
### Complete Secure Test Setup
269
270
```java
271
public class SecureIntegrationTest extends AbstractTestBase {
272
@ClassRule
273
public static final TemporaryFolder tempFolder = new TemporaryFolder();
274
275
@BeforeClass
276
public static void setupSecureEnvironment() {
277
SecureTestEnvironment.prepare(tempFolder);
278
}
279
280
@AfterClass
281
public static void cleanupSecureEnvironment() {
282
SecureTestEnvironment.cleanup();
283
}
284
285
private LocalFlinkMiniCluster createSecureCluster() throws Exception {
286
Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());
287
return TestBaseUtils.startCluster(config, true);
288
}
289
290
@Test
291
public void testSecureJobExecution() throws Exception {
292
LocalFlinkMiniCluster cluster = createSecureCluster();
293
294
try {
295
runSecureTest(cluster);
296
} finally {
297
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
298
}
299
}
300
301
private void runSecureTest(LocalFlinkMiniCluster cluster) throws Exception {
302
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
303
testEnv.setAsContext();
304
305
try {
306
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
307
// ... secure job logic
308
} finally {
309
TestEnvironment.unsetAsContext();
310
}
311
}
312
}
313
```
314
315
## Security Testing Dependencies
316
317
To use security testing features, ensure your project includes the hadoop-minikdc dependency (marked as optional in the POM):
318
319
```xml
320
<dependency>
321
<groupId>org.apache.hadoop</groupId>
322
<artifactId>hadoop-minikdc</artifactId>
323
<version>${minikdc.version}</version>
324
<scope>test</scope>
325
</dependency>
326
```