or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md

security-testing.mddocs/

0

# Security Testing

1

2

Security testing utilities provide support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC (Mini Key Distribution Center).

3

4

## Secure Test Environment

5

6

### SecureTestEnvironment

7

8

Helper class that manages MiniKDC lifecycle for secure Flink testing scenarios.

9

10

```java { .api }

11

public class SecureTestEnvironment {

12

// Initialize secure testing environment

13

public static void prepare(TemporaryFolder tempFolder);

14

15

// Clean up secure testing resources

16

public static void cleanup();

17

18

// Configure Flink security settings

19

public static Configuration populateFlinkSecureConfigurations(@Nullable Configuration flinkConf);

20

21

// Get client security configurations

22

public static Map<String, TestingSecurityContext.ClientSecurityConfiguration> getClientSecurityConfigurationMap();

23

24

// Get test keytab file path

25

public static String getTestKeytab();

26

27

// Get Hadoop service principal

28

public static String getHadoopServicePrincipal();

29

}

30

```

31

32

**Usage Example:**

33

34

```java

35

public class SecureFlinkTest {

36

@ClassRule

37

public static final TemporaryFolder tempFolder = new TemporaryFolder();

38

39

@BeforeClass

40

public static void setupSecurity() {

41

// Initialize MiniKDC and security environment

42

SecureTestEnvironment.prepare(tempFolder);

43

}

44

45

@AfterClass

46

public static void cleanupSecurity() {

47

// Cleanup MiniKDC and security resources

48

SecureTestEnvironment.cleanup();

49

}

50

51

@Test

52

public void testSecureJob() throws Exception {

53

// Get secure Flink configuration

54

Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(null);

55

56

// Create secure mini cluster

57

LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(flinkConfig, true);

58

59

try {

60

// Run secure job

61

TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);

62

testEnv.setAsContext();

63

64

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

65

// ... secure job logic

66

67

} finally {

68

TestEnvironment.unsetAsContext();

69

TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

70

}

71

}

72

}

73

```

74

75

## Security Context Management

76

77

### TestingSecurityContext

78

79

Provides security context management for testing with both client and server principals in MiniKDC environments.

80

81

```java { .api }

82

@Internal

83

public class TestingSecurityContext {

84

// Install security context with client configurations

85

public static void install(SecurityUtils.SecurityConfiguration config,

86

Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;

87

88

// Client security configuration

89

public static class ClientSecurityConfiguration {

90

public String getPrincipal();

91

public String getKeytab();

92

public ClientSecurityConfiguration(String principal, String keytab);

93

}

94

}

95

```

96

97

**Usage Example:**

98

99

```java

100

@Test

101

public void testWithClientSecurity() throws Exception {

102

// Prepare security environment

103

SecureTestEnvironment.prepare(tempFolder);

104

105

// Get client security configurations

106

Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs =

107

SecureTestEnvironment.getClientSecurityConfigurationMap();

108

109

// Configure security

110

SecurityUtils.SecurityConfiguration securityConfig = new SecurityUtils.SecurityConfiguration();

111

TestingSecurityContext.install(securityConfig, clientConfigs);

112

113

// Now run tests with security context installed

114

// ... test logic

115

116

SecureTestEnvironment.cleanup();

117

}

118

```

119

120

## Kerberos Authentication Testing

121

122

### Setting up Kerberos Environment

123

124

```java

125

public class KerberosTest extends AbstractTestBase {

126

@ClassRule

127

public static final TemporaryFolder tempFolder = new TemporaryFolder();

128

129

@BeforeClass

130

public static void setupKerberos() {

131

SecureTestEnvironment.prepare(tempFolder);

132

}

133

134

@AfterClass

135

public static void cleanupKerberos() {

136

SecureTestEnvironment.cleanup();

137

}

138

139

@Test

140

public void testKerberosAuthentication() throws Exception {

141

// Get secure configuration

142

Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());

143

144

// Verify Kerberos settings are populated

145

assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_KEYTAB));

146

assertTrue(secureConfig.contains(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));

147

148

// Start secure cluster

149

LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);

150

151

try {

152

// Test secure job execution

153

TestEnvironment testEnv = new TestEnvironment(cluster, getParallelism(), false);

154

testEnv.setAsContext();

155

156

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

157

158

// Job that requires authentication

159

DataSet<String> secureData = env.fromElements("secure", "data");

160

List<String> result = secureData.collect();

161

162

TestBaseUtils.compareResultAsText(result, "secure\ndata");

163

164

} finally {

165

TestEnvironment.unsetAsContext();

166

TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

167

}

168

}

169

}

170

```

171

172

## HDFS Security Testing

173

174

When testing with secure HDFS, additional configuration is required:

175

176

```java

177

@Test

178

public void testSecureHDFS() throws Exception {

179

SecureTestEnvironment.prepare(tempFolder);

180

181

// Get Hadoop service principal for HDFS

182

String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();

183

String keytabPath = SecureTestEnvironment.getTestKeytab();

184

185

Configuration config = new Configuration();

186

config.setString("security.kerberos.login.principal", hadoopPrincipal);

187

config.setString("security.kerberos.login.keytab", keytabPath);

188

189

// Populate additional HDFS security settings

190

Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);

191

192

LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(secureConfig, true);

193

194

try {

195

TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);

196

testEnv.setAsContext();

197

198

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

199

200

// Test reading from secure HDFS

201

String hdfsPath = "hdfs://localhost:9000/secure/data.txt";

202

DataSet<String> hdfsData = env.readTextFile(hdfsPath);

203

204

// Process and verify results

205

List<String> result = hdfsData.collect();

206

assertFalse("Should read data from secure HDFS", result.isEmpty());

207

208

} finally {

209

TestEnvironment.unsetAsContext();

210

TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

211

}

212

213

SecureTestEnvironment.cleanup();

214

}

215

```

216

217

## Security Configuration Options

218

219

Common security configuration options used in secure testing:

220

221

```java

222

// Kerberos authentication

223

config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);

224

config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);

225

226

// SSL/TLS configuration

227

config.setBoolean(SecurityOptions.SSL_ENABLED, true);

228

config.setString(SecurityOptions.SSL_KEYSTORE, keystorePath);

229

config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, keystorePassword);

230

231

// SASL authentication

232

config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);

233

```

234

235

## Integration with Hadoop MiniKDC

236

237

The security testing infrastructure integrates with Hadoop's MiniKDC:

238

239

```java

240

// MiniKDC provides:

241

// - Kerberos realm setup

242

// - Principal and keytab generation

243

// - KDC server lifecycle management

244

// - Test user and service principals

245

246

@Test

247

public void testMiniKDCIntegration() throws Exception {

248

SecureTestEnvironment.prepare(tempFolder);

249

250

// MiniKDC is now running and configured

251

String testKeytab = SecureTestEnvironment.getTestKeytab();

252

assertTrue("Test keytab should exist", new File(testKeytab).exists());

253

254

String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();

255

assertTrue("Hadoop principal should be configured", hadoopPrincipal != null && !hadoopPrincipal.isEmpty());

256

257

// Client configurations are available

258

Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientConfigs =

259

SecureTestEnvironment.getClientSecurityConfigurationMap();

260

assertFalse("Client configurations should be available", clientConfigs.isEmpty());

261

262

SecureTestEnvironment.cleanup();

263

}

264

```

265

266

## Common Security Testing Patterns

267

268

### Complete Secure Test Setup

269

270

```java

271

public class SecureIntegrationTest extends AbstractTestBase {

272

@ClassRule

273

public static final TemporaryFolder tempFolder = new TemporaryFolder();

274

275

@BeforeClass

276

public static void setupSecureEnvironment() {

277

SecureTestEnvironment.prepare(tempFolder);

278

}

279

280

@AfterClass

281

public static void cleanupSecureEnvironment() {

282

SecureTestEnvironment.cleanup();

283

}

284

285

private LocalFlinkMiniCluster createSecureCluster() throws Exception {

286

Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(new Configuration());

287

return TestBaseUtils.startCluster(config, true);

288

}

289

290

@Test

291

public void testSecureJobExecution() throws Exception {

292

LocalFlinkMiniCluster cluster = createSecureCluster();

293

294

try {

295

runSecureTest(cluster);

296

} finally {

297

TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

298

}

299

}

300

301

private void runSecureTest(LocalFlinkMiniCluster cluster) throws Exception {

302

TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);

303

testEnv.setAsContext();

304

305

try {

306

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

307

// ... secure job logic

308

} finally {

309

TestEnvironment.unsetAsContext();

310

}

311

}

312

}

313

```

314

315

## Security Testing Dependencies

316

317

To use security testing features, ensure your project includes the hadoop-minikdc dependency (marked as optional in the POM):

318

319

```xml

320

<dependency>

321

<groupId>org.apache.hadoop</groupId>

322

<artifactId>hadoop-minikdc</artifactId>

323

<version>${minikdc.version}</version>

324

<scope>test</scope>

325

</dependency>

326

```