0
# Security Testing
1
2
Utilities for testing Flink applications with Kerberos security enabled, including MiniKDC lifecycle management and secure configuration setup for comprehensive security testing scenarios.
3
4
## Capabilities
5
6
### Secure Test Environment
7
8
`SecureTestEnvironment` provides helper utilities for handling MiniKDC lifecycle and secure configurations in Flink tests.
9
10
```java { .api }
11
/**
12
* Helper for handling MiniKDC lifecycle and secure configurations
13
*/
14
public class SecureTestEnvironment {
15
/**
16
* Default hostname for security testing
17
*/
18
public static final String HOST_NAME = "localhost";
19
20
/**
21
* Prepares secure environment with additional principals
22
* @param folder Temporary folder for KDC files
23
* @param additionalPrincipals Additional principals to create beyond defaults
24
*/
25
public static void prepare(TemporaryFolder folder, String... additionalPrincipals) throws Exception;
26
27
/**
28
* Cleans up secure environment and stops KDC
29
*/
30
public static void cleanup() throws Exception;
31
32
/**
33
* Populates Flink secure configurations
34
* @param configuration Configuration to populate with security settings
35
* @return Updated configuration with security settings
36
*/
37
public static Configuration populateFlinkSecureConfigurations(Configuration configuration);
38
39
/**
40
* Gets client security configurations
41
* @return Map of client security configurations by name
42
*/
43
public static Map<String, ClientSecurityConfiguration> getClientSecurityConfigurationMap();
44
45
/**
46
* Gets KDC realm name
47
* @return KDC realm string
48
*/
49
public static String getRealm();
50
51
/**
52
* Gets test keytab file path
53
* @return Path to test keytab file
54
*/
55
public static String getTestKeytab();
56
57
/**
58
* Gets Hadoop service principal
59
* @return Hadoop service principal string
60
*/
61
public static String getHadoopServicePrincipal();
62
}
63
```
64
65
**Usage Example:**
66
67
```java
68
import org.apache.flink.test.util.SecureTestEnvironment;
69
import org.apache.flink.configuration.Configuration;
70
import org.apache.flink.configuration.SecurityOptions;
71
import org.junit.rules.TemporaryFolder;
72
73
public class SecureFlinkTest {
74
75
@Rule
76
public TemporaryFolder tempFolder = new TemporaryFolder();
77
78
@Test
79
public void testSecureFlinkJob() throws Exception {
80
try {
81
// Prepare secure environment with additional principals
82
SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");
83
84
// Get secure configuration
85
Configuration secureConfig = new Configuration();
86
secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(secureConfig);
87
88
// Verify security settings were applied
89
assertTrue(secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE));
90
assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
91
assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));
92
93
// Get security information
94
String realm = SecureTestEnvironment.getRealm();
95
String keytab = SecureTestEnvironment.getTestKeytab();
96
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
97
98
assertNotNull("Realm should be set", realm);
99
assertNotNull("Keytab should be available", keytab);
100
assertNotNull("Hadoop principal should be set", hadoopPrincipal);
101
102
// Use secure configuration for your Flink job
103
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, secureConfig);
104
// ... run your secure job
105
106
} finally {
107
// Always cleanup security environment
108
SecureTestEnvironment.cleanup();
109
}
110
}
111
}
112
```
113
114
### Testing Security Context
115
116
`TestingSecurityContext` provides test security context handling for client and server principals.
117
118
```java { .api }
119
/**
120
* Test security context for handling client and server principals
121
*/
122
public class TestingSecurityContext {
123
/**
124
* Install security context with configurations
125
* @param securityConfiguration Security configuration
126
* @param clientSecurityConfigurations Map of client security configurations
127
*/
128
public static void install(
129
SecurityConfiguration securityConfiguration,
130
Map<String, ClientSecurityConfiguration> clientSecurityConfigurations
131
) throws Exception;
132
133
/**
134
* Client security configuration
135
*/
136
public static class ClientSecurityConfiguration {
137
// Implementation details for client security configuration
138
}
139
}
140
```
141
142
## Complete Security Testing Examples
143
144
### Basic Secure Environment Test
145
146
```java
147
import org.apache.flink.test.util.SecureTestEnvironment;
148
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
149
import org.apache.flink.streaming.api.datastream.DataStream;
150
import org.apache.flink.configuration.Configuration;
151
152
public class BasicSecurityTest {
153
154
@Rule
155
public TemporaryFolder tempFolder = new TemporaryFolder();
156
157
@Test
158
public void testBasicSecureExecution() throws Exception {
159
try {
160
// Setup secure environment
161
SecureTestEnvironment.prepare(tempFolder);
162
163
// Create secure configuration
164
Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(
165
new Configuration()
166
);
167
168
// Create environment with secure config
169
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, config);
170
171
// Simple streaming job
172
DataStream<String> source = env.fromElements("secure1", "secure2", "secure3");
173
DataStream<String> processed = source.map(s -> "SECURE-" + s.toUpperCase());
174
175
processed.print();
176
177
// Execute securely
178
env.execute("Secure Test Job");
179
180
} finally {
181
SecureTestEnvironment.cleanup();
182
}
183
}
184
}
185
```
186
187
### Advanced Security Testing with Custom Principals
188
189
```java
190
@Test
191
public void testCustomPrincipals() throws Exception {
192
try {
193
// Setup with custom principals
194
SecureTestEnvironment.prepare(
195
tempFolder,
196
"alice/localhost@EXAMPLE.COM",
197
"bob/localhost@EXAMPLE.COM",
198
"service/localhost@EXAMPLE.COM"
199
);
200
201
// Get client security configurations
202
Map<String, ClientSecurityConfiguration> clientConfigs =
203
SecureTestEnvironment.getClientSecurityConfigurationMap();
204
205
assertFalse("Client configurations should be available", clientConfigs.isEmpty());
206
207
// Test with security context
208
SecurityConfiguration securityConfig = // ... create security config
209
TestingSecurityContext.install(securityConfig, clientConfigs);
210
211
// Create secure Flink configuration
212
Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(
213
new Configuration()
214
);
215
216
// Verify security settings
217
String keytab = SecureTestEnvironment.getTestKeytab();
218
String realm = SecureTestEnvironment.getRealm();
219
220
assertTrue("Keytab file should exist", new File(keytab).exists());
221
assertEquals("EXAMPLE.COM", realm);
222
223
// Run secure job with multiple principals
224
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, flinkConfig);
225
226
DataStream<String> data = env.fromElements("principal1", "principal2", "principal3");
227
DataStream<String> authenticated = data.map(s ->
228
String.format("Authenticated[%s]@%s", s, realm)
229
);
230
231
authenticated.print();
232
env.execute("Multi-Principal Security Test");
233
234
} finally {
235
SecureTestEnvironment.cleanup();
236
}
237
}
238
```
239
240
### Integration with MiniCluster Security
241
242
```java
243
import org.apache.flink.test.util.MiniClusterWithClientResource;
244
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
245
246
public class SecureMiniClusterTest {
247
248
@Rule
249
public TemporaryFolder tempFolder = new TemporaryFolder();
250
251
private MiniClusterWithClientResource secureCluster;
252
253
@Before
254
public void setupSecureCluster() throws Exception {
255
// Setup secure environment first
256
SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");
257
258
// Create secure configuration
259
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(
260
new Configuration()
261
);
262
263
// Create secure mini cluster
264
MiniClusterResourceConfiguration clusterConfig =
265
new MiniClusterResourceConfiguration.Builder()
266
.setNumberSlotsPerTaskManager(2)
267
.setNumberTaskManagers(1)
268
.setConfiguration(secureConfig)
269
.build();
270
271
secureCluster = new MiniClusterWithClientResource(clusterConfig);
272
secureCluster.before();
273
}
274
275
@After
276
public void teardownSecureCluster() throws Exception {
277
if (secureCluster != null) {
278
secureCluster.after();
279
}
280
SecureTestEnvironment.cleanup();
281
}
282
283
@Test
284
public void testSecureClusterExecution() throws Exception {
285
// Get secure test environment
286
TestEnvironment env = secureCluster.getTestEnvironment();
287
288
// Create secure job
289
DataSet<String> input = env.fromElements("secure-data-1", "secure-data-2");
290
DataSet<String> processed = input.map(s -> "AUTHENTICATED-" + s);
291
292
List<String> results = processed.collect();
293
294
// Verify secure execution
295
assertEquals(2, results.size());
296
assertTrue(results.get(0).startsWith("AUTHENTICATED-"));
297
assertTrue(results.get(1).startsWith("AUTHENTICATED-"));
298
}
299
}
300
```
301
302
### Testing Security Configuration Validation
303
304
```java
305
@Test
306
public void testSecurityConfigurationValidation() throws Exception {
307
try {
308
SecureTestEnvironment.prepare(tempFolder);
309
310
Configuration config = new Configuration();
311
Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);
312
313
// Validate required security settings
314
assertTrue("Kerberos should be enabled",
315
secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false));
316
317
String keytab = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
318
assertNotNull("Keytab path should be set", keytab);
319
assertTrue("Keytab file should exist", new File(keytab).exists());
320
321
String principal = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
322
assertNotNull("Principal should be set", principal);
323
assertTrue("Principal should contain realm", principal.contains("@"));
324
325
// Validate realm and service principal
326
String realm = SecureTestEnvironment.getRealm();
327
String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();
328
329
assertNotNull("Realm should be available", realm);
330
assertNotNull("Hadoop principal should be available", hadoopPrincipal);
331
assertTrue("Hadoop principal should contain realm", hadoopPrincipal.contains(realm));
332
333
} finally {
334
SecureTestEnvironment.cleanup();
335
}
336
}
337
```
338
339
## Security Testing Best Practices
340
341
### Cleanup Pattern
342
343
Always ensure proper cleanup of security resources:
344
345
```java
346
public class SecurityTestPattern {
347
348
@Rule
349
public TemporaryFolder tempFolder = new TemporaryFolder();
350
351
@Before
352
public void setupSecurity() throws Exception {
353
SecureTestEnvironment.prepare(tempFolder);
354
}
355
356
@After
357
public void cleanupSecurity() throws Exception {
358
// Always cleanup, even if test fails
359
SecureTestEnvironment.cleanup();
360
}
361
362
// Or use try-with-resources pattern in individual tests
363
@Test
364
public void testWithAutoCleanup() throws Exception {
365
SecureTestEnvironment.prepare(tempFolder);
366
try {
367
// Test logic here
368
} finally {
369
SecureTestEnvironment.cleanup();
370
}
371
}
372
}
373
```
374
375
### Error Handling in Security Tests
376
377
```java
378
@Test
379
public void testSecurityErrorHandling() throws Exception {
380
try {
381
SecureTestEnvironment.prepare(tempFolder);
382
383
Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(
384
new Configuration()
385
);
386
387
// Test with invalid principal (should handle gracefully)
388
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);
389
DataStream<String> data = env.fromElements("test");
390
391
try {
392
data.print();
393
env.execute("Security Error Test");
394
} catch (Exception e) {
395
// Verify it's a security-related exception
396
assertTrue("Should be security-related error",
397
e.getMessage().contains("Kerberos") ||
398
e.getMessage().contains("authentication") ||
399
e.getMessage().contains("principal"));
400
}
401
402
} finally {
403
SecureTestEnvironment.cleanup();
404
}
405
}
406
```