or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md

security-testing.mddocs/

0

# Security Testing

1

2

Utilities for testing Flink applications with Kerberos security enabled, including MiniKDC lifecycle management and secure configuration setup for comprehensive security testing scenarios.

3

4

## Capabilities

5

6

### Secure Test Environment

7

8

`SecureTestEnvironment` provides helper utilities for handling MiniKDC lifecycle and secure configurations in Flink tests.

9

10

```java { .api }

11

/**

12

* Helper for handling MiniKDC lifecycle and secure configurations

13

*/

14

public class SecureTestEnvironment {

15

/**

16

* Default hostname for security testing

17

*/

18

public static final String HOST_NAME = "localhost";

19

20

/**

21

* Prepares secure environment with additional principals

22

* @param folder Temporary folder for KDC files

23

* @param additionalPrincipals Additional principals to create beyond defaults

24

*/

25

public static void prepare(TemporaryFolder folder, String... additionalPrincipals) throws Exception;

26

27

/**

28

* Cleans up secure environment and stops KDC

29

*/

30

public static void cleanup() throws Exception;

31

32

/**

33

* Populates Flink secure configurations

34

* @param configuration Configuration to populate with security settings

35

* @return Updated configuration with security settings

36

*/

37

public static Configuration populateFlinkSecureConfigurations(Configuration configuration);

38

39

/**

40

* Gets client security configurations

41

* @return Map of client security configurations by name

42

*/

43

public static Map<String, ClientSecurityConfiguration> getClientSecurityConfigurationMap();

44

45

/**

46

* Gets KDC realm name

47

* @return KDC realm string

48

*/

49

public static String getRealm();

50

51

/**

52

* Gets test keytab file path

53

* @return Path to test keytab file

54

*/

55

public static String getTestKeytab();

56

57

/**

58

* Gets Hadoop service principal

59

* @return Hadoop service principal string

60

*/

61

public static String getHadoopServicePrincipal();

62

}

63

```

64

65

**Usage Example:**

66

67

```java

68

import org.apache.flink.test.util.SecureTestEnvironment;

69

import org.apache.flink.configuration.Configuration;

70

import org.apache.flink.configuration.SecurityOptions;

71

import org.junit.rules.TemporaryFolder;

72

73

public class SecureFlinkTest {

74

75

@Rule

76

public TemporaryFolder tempFolder = new TemporaryFolder();

77

78

@Test

79

public void testSecureFlinkJob() throws Exception {

80

try {

81

// Prepare secure environment with additional principals

82

SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");

83

84

// Get secure configuration

85

Configuration secureConfig = new Configuration();

86

secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(secureConfig);

87

88

// Verify security settings were applied

89

assertTrue(secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE));

90

assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));

91

assertNotNull(secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL));

92

93

// Get security information

94

String realm = SecureTestEnvironment.getRealm();

95

String keytab = SecureTestEnvironment.getTestKeytab();

96

String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();

97

98

assertNotNull("Realm should be set", realm);

99

assertNotNull("Keytab should be available", keytab);

100

assertNotNull("Hadoop principal should be set", hadoopPrincipal);

101

102

// Use secure configuration for your Flink job

103

// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, secureConfig);

104

// ... run your secure job

105

106

} finally {

107

// Always cleanup security environment

108

SecureTestEnvironment.cleanup();

109

}

110

}

111

}

112

```

113

114

### Testing Security Context

115

116

`TestingSecurityContext` provides test security context handling for client and server principals.

117

118

```java { .api }

119

/**

120

* Test security context for handling client and server principals

121

*/

122

public class TestingSecurityContext {

123

/**

124

* Install security context with configurations

125

* @param securityConfiguration Security configuration

126

* @param clientSecurityConfigurations Map of client security configurations

127

*/

128

public static void install(

129

SecurityConfiguration securityConfiguration,

130

Map<String, ClientSecurityConfiguration> clientSecurityConfigurations

131

) throws Exception;

132

133

/**

134

* Client security configuration

135

*/

136

public static class ClientSecurityConfiguration {

137

// Implementation details for client security configuration

138

}

139

}

140

```

141

142

## Complete Security Testing Examples

143

144

### Basic Secure Environment Test

145

146

```java

147

import org.apache.flink.test.util.SecureTestEnvironment;

148

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

149

import org.apache.flink.streaming.api.datastream.DataStream;

150

import org.apache.flink.configuration.Configuration;

151

152

public class BasicSecurityTest {

153

154

@Rule

155

public TemporaryFolder tempFolder = new TemporaryFolder();

156

157

@Test

158

public void testBasicSecureExecution() throws Exception {

159

try {

160

// Setup secure environment

161

SecureTestEnvironment.prepare(tempFolder);

162

163

// Create secure configuration

164

Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(

165

new Configuration()

166

);

167

168

// Create environment with secure config

169

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, config);

170

171

// Simple streaming job

172

DataStream<String> source = env.fromElements("secure1", "secure2", "secure3");

173

DataStream<String> processed = source.map(s -> "SECURE-" + s.toUpperCase());

174

175

processed.print();

176

177

// Execute securely

178

env.execute("Secure Test Job");

179

180

} finally {

181

SecureTestEnvironment.cleanup();

182

}

183

}

184

}

185

```

186

187

### Advanced Security Testing with Custom Principals

188

189

```java

190

@Test

191

public void testCustomPrincipals() throws Exception {

192

try {

193

// Setup with custom principals

194

SecureTestEnvironment.prepare(

195

tempFolder,

196

"alice/localhost@EXAMPLE.COM",

197

"bob/localhost@EXAMPLE.COM",

198

"service/localhost@EXAMPLE.COM"

199

);

200

201

// Get client security configurations

202

Map<String, ClientSecurityConfiguration> clientConfigs =

203

SecureTestEnvironment.getClientSecurityConfigurationMap();

204

205

assertFalse("Client configurations should be available", clientConfigs.isEmpty());

206

207

// Test with security context

208

SecurityConfiguration securityConfig = // ... create security config

209

TestingSecurityContext.install(securityConfig, clientConfigs);

210

211

// Create secure Flink configuration

212

Configuration flinkConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(

213

new Configuration()

214

);

215

216

// Verify security settings

217

String keytab = SecureTestEnvironment.getTestKeytab();

218

String realm = SecureTestEnvironment.getRealm();

219

220

assertTrue("Keytab file should exist", new File(keytab).exists());

221

assertEquals("EXAMPLE.COM", realm);

222

223

// Run secure job with multiple principals

224

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, flinkConfig);

225

226

DataStream<String> data = env.fromElements("principal1", "principal2", "principal3");

227

DataStream<String> authenticated = data.map(s ->

228

String.format("Authenticated[%s]@%s", s, realm)

229

);

230

231

authenticated.print();

232

env.execute("Multi-Principal Security Test");

233

234

} finally {

235

SecureTestEnvironment.cleanup();

236

}

237

}

238

```

239

240

### Integration with MiniCluster Security

241

242

```java

243

import org.apache.flink.test.util.MiniClusterWithClientResource;

244

import org.apache.flink.test.util.MiniClusterResourceConfiguration;

245

246

public class SecureMiniClusterTest {

247

248

@Rule

249

public TemporaryFolder tempFolder = new TemporaryFolder();

250

251

private MiniClusterWithClientResource secureCluster;

252

253

@Before

254

public void setupSecureCluster() throws Exception {

255

// Setup secure environment first

256

SecureTestEnvironment.prepare(tempFolder, "testuser/localhost@EXAMPLE.COM");

257

258

// Create secure configuration

259

Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(

260

new Configuration()

261

);

262

263

// Create secure mini cluster

264

MiniClusterResourceConfiguration clusterConfig =

265

new MiniClusterResourceConfiguration.Builder()

266

.setNumberSlotsPerTaskManager(2)

267

.setNumberTaskManagers(1)

268

.setConfiguration(secureConfig)

269

.build();

270

271

secureCluster = new MiniClusterWithClientResource(clusterConfig);

272

secureCluster.before();

273

}

274

275

@After

276

public void teardownSecureCluster() throws Exception {

277

if (secureCluster != null) {

278

secureCluster.after();

279

}

280

SecureTestEnvironment.cleanup();

281

}

282

283

@Test

284

public void testSecureClusterExecution() throws Exception {

285

// Get secure test environment

286

TestEnvironment env = secureCluster.getTestEnvironment();

287

288

// Create secure job

289

DataSet<String> input = env.fromElements("secure-data-1", "secure-data-2");

290

DataSet<String> processed = input.map(s -> "AUTHENTICATED-" + s);

291

292

List<String> results = processed.collect();

293

294

// Verify secure execution

295

assertEquals(2, results.size());

296

assertTrue(results.get(0).startsWith("AUTHENTICATED-"));

297

assertTrue(results.get(1).startsWith("AUTHENTICATED-"));

298

}

299

}

300

```

301

302

### Testing Security Configuration Validation

303

304

```java

305

@Test

306

public void testSecurityConfigurationValidation() throws Exception {

307

try {

308

SecureTestEnvironment.prepare(tempFolder);

309

310

Configuration config = new Configuration();

311

Configuration secureConfig = SecureTestEnvironment.populateFlinkSecureConfigurations(config);

312

313

// Validate required security settings

314

assertTrue("Kerberos should be enabled",

315

secureConfig.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false));

316

317

String keytab = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);

318

assertNotNull("Keytab path should be set", keytab);

319

assertTrue("Keytab file should exist", new File(keytab).exists());

320

321

String principal = secureConfig.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);

322

assertNotNull("Principal should be set", principal);

323

assertTrue("Principal should contain realm", principal.contains("@"));

324

325

// Validate realm and service principal

326

String realm = SecureTestEnvironment.getRealm();

327

String hadoopPrincipal = SecureTestEnvironment.getHadoopServicePrincipal();

328

329

assertNotNull("Realm should be available", realm);

330

assertNotNull("Hadoop principal should be available", hadoopPrincipal);

331

assertTrue("Hadoop principal should contain realm", hadoopPrincipal.contains(realm));

332

333

} finally {

334

SecureTestEnvironment.cleanup();

335

}

336

}

337

```

338

339

## Security Testing Best Practices

340

341

### Cleanup Pattern

342

343

Always ensure proper cleanup of security resources:

344

345

```java

346

public class SecurityTestPattern {

347

348

@Rule

349

public TemporaryFolder tempFolder = new TemporaryFolder();

350

351

@Before

352

public void setupSecurity() throws Exception {

353

SecureTestEnvironment.prepare(tempFolder);

354

}

355

356

@After

357

public void cleanupSecurity() throws Exception {

358

// Always cleanup, even if test fails

359

SecureTestEnvironment.cleanup();

360

}

361

362

// Or use try-with-resources pattern in individual tests

363

@Test

364

public void testWithAutoCleanup() throws Exception {

365

SecureTestEnvironment.prepare(tempFolder);

366

try {

367

// Test logic here

368

} finally {

369

SecureTestEnvironment.cleanup();

370

}

371

}

372

}

373

```

374

375

### Error Handling in Security Tests

376

377

```java

378

@Test

379

public void testSecurityErrorHandling() throws Exception {

380

try {

381

SecureTestEnvironment.prepare(tempFolder);

382

383

Configuration config = SecureTestEnvironment.populateFlinkSecureConfigurations(

384

new Configuration()

385

);

386

387

// Test with invalid principal (should handle gracefully)

388

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);

389

DataStream<String> data = env.fromElements("test");

390

391

try {

392

data.print();

393

env.execute("Security Error Test");

394

} catch (Exception e) {

395

// Verify it's a security-related exception

396

assertTrue("Should be security-related error",

397

e.getMessage().contains("Kerberos") ||

398

e.getMessage().contains("authentication") ||

399

e.getMessage().contains("principal"));

400

}

401

402

} finally {

403

SecureTestEnvironment.cleanup();

404

}

405

}

406

```