or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-management.mdindex.mdmessage-protocol.mdsecurity-authentication.mdserver-operations.mdshuffle-database.mdtransport-context.md

security-authentication.mddocs/

0

# Security and Authentication

1

2

The security and authentication API provides comprehensive protection for Apache Spark's network communications through SASL authentication mechanisms and AES encryption protocols. This layer ensures secure data transmission and proper authentication of clients and servers in distributed Spark environments.

3

4

## Capabilities

5

6

### Transport Encryption

7

8

Core interface for transport layer encryption supporting multiple cipher algorithms.

9

10

```java { .api }

11

public interface TransportCipher {

12

/**

13

* Get the key identifier for this cipher

14

* @return String representing the key ID used for encryption

15

* @throws GeneralSecurityException if key retrieval fails

16

*/

17

String getKeyId() throws GeneralSecurityException;

18

19

/**

20

* Add encryption/decryption handlers to the Netty channel pipeline

21

* @param channel - Netty Channel to add cipher handlers to

22

* @throws IOException if I/O operations fail during setup

23

* @throws GeneralSecurityException if cryptographic operations fail

24

*/

25

void addToChannel(Channel channel) throws IOException, GeneralSecurityException;

26

}

27

```

28

29

### AES-CTR Cipher Implementation

30

31

Counter mode AES cipher implementation for transport layer encryption.

32

33

```java { .api }

34

public class CtrTransportCipher implements TransportCipher {

35

/**

36

* Create CTR transport cipher with specified key and initialization vector

37

* @param key - Secret key for AES encryption

38

* @param iv - Initialization vector for CTR mode

39

* @throws GeneralSecurityException if cipher initialization fails

40

*/

41

public CtrTransportCipher(SecretKey key, byte[] iv) throws GeneralSecurityException;

42

43

@Override

44

public String getKeyId() throws GeneralSecurityException;

45

46

@Override

47

public void addToChannel(Channel channel) throws IOException, GeneralSecurityException;

48

}

49

```

50

51

### AES-GCM Cipher Implementation

52

53

Galois/Counter Mode AES cipher implementation providing authenticated encryption.

54

55

```java { .api }

56

public class GcmTransportCipher implements TransportCipher {

57

/**

58

* Create GCM transport cipher with specified key

59

* @param key - Secret key for AES-GCM encryption

60

* @throws GeneralSecurityException if cipher initialization fails

61

*/

62

public GcmTransportCipher(SecretKey key) throws GeneralSecurityException;

63

64

@Override

65

public String getKeyId() throws GeneralSecurityException;

66

67

@Override

68

public void addToChannel(Channel channel) throws IOException, GeneralSecurityException;

69

}

70

```

71

72

## Authentication Protocol

73

74

### Authentication Client Bootstrap

75

76

Client bootstrap for setting up authentication protocol with servers.

77

78

```java { .api }

79

public class AuthClientBootstrap implements TransportClientBootstrap {

80

/**

81

* Create authentication client bootstrap

82

* @param conf - Transport configuration containing auth settings

83

* @param appId - Application identifier for authentication

84

* @param secretKeyHolder - Provider for authentication secrets

85

*/

86

public AuthClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);

87

88

@Override

89

public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

90

}

91

```

92

93

### Authentication Server Bootstrap

94

95

Server bootstrap for handling authentication protocol from clients.

96

97

```java { .api }

98

public class AuthServerBootstrap implements TransportServerBootstrap {

99

/**

100

* Create authentication server bootstrap

101

* @param conf - Transport configuration containing auth settings

102

* @param secretKeyHolder - Provider for validating authentication secrets

103

*/

104

public AuthServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);

105

106

@Override

107

public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

108

}

109

```

110

111

## SASL Authentication

112

113

### SASL Client Implementation

114

115

SASL client implementation for Spark's authentication needs.

116

117

```java { .api }

118

public class SparkSaslClient implements SaslEncryptionBackend {

119

/**

120

* Create SASL client for authentication

121

* @param secretKeyId - Identifier for the secret key

122

* @param secretKey - Secret key for SASL authentication

123

* @param encryptionEnabled - Whether to enable SASL encryption

124

*/

125

public SparkSaslClient(String secretKeyId, String secretKey, boolean encryptionEnabled);

126

127

/**

128

* Create and configure SASL client

129

* @return SaslClient configured for Spark authentication

130

* @throws IOException if SASL client creation fails

131

*/

132

public SaslClient createSaslClient() throws IOException;

133

134

/**

135

* Check if SASL negotiation is complete

136

* @return boolean indicating if authentication is complete

137

*/

138

public boolean isComplete();

139

140

/**

141

* Process a SASL challenge from the server

142

* @param challenge - byte array containing the server challenge

143

* @return byte array containing the client response

144

* @throws IOException if challenge processing fails

145

*/

146

public byte[] response(byte[] challenge) throws IOException;

147

}

148

```

149

150

### SASL Server Implementation

151

152

SASL server implementation for validating client authentications.

153

154

```java { .api }

155

public class SparkSaslServer implements SaslEncryptionBackend {

156

/**

157

* Create SASL server for authentication

158

* @param secretKeyId - Identifier for the secret key

159

* @param secretKey - Secret key for SASL authentication

160

* @param encryptionEnabled - Whether to enable SASL encryption

161

*/

162

public SparkSaslServer(String secretKeyId, String secretKey, boolean encryptionEnabled);

163

164

/**

165

* Create and configure SASL server

166

* @return SaslServer configured for Spark authentication

167

* @throws IOException if SASL server creation fails

168

*/

169

public SaslServer createSaslServer() throws IOException;

170

171

/**

172

* Check if SASL negotiation is complete

173

* @return boolean indicating if authentication is complete

174

*/

175

public boolean isComplete();

176

177

/**

178

* Process a SASL response from the client

179

* @param response - byte array containing the client response

180

* @return byte array containing the server challenge

181

* @throws IOException if response processing fails

182

*/

183

public byte[] response(byte[] response) throws IOException;

184

}

185

```

186

187

### SASL Bootstrap Components

188

189

Bootstrap implementations for integrating SASL authentication into the transport layer.

190

191

```java { .api }

192

public class SaslClientBootstrap implements TransportClientBootstrap {

193

/**

194

* Create SASL client bootstrap

195

* @param conf - Transport configuration

196

* @param appId - Application identifier

197

* @param secretKeyHolder - Provider for authentication secrets

198

*/

199

public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);

200

201

@Override

202

public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

203

}

204

205

public class SaslServerBootstrap implements TransportServerBootstrap {

206

/**

207

* Create SASL server bootstrap

208

* @param conf - Transport configuration

209

* @param secretKeyHolder - Provider for validating authentication secrets

210

*/

211

public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);

212

213

@Override

214

public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);

215

}

216

```

217

218

### SASL RPC Handler

219

220

RPC handler that provides SASL authentication capabilities.

221

222

```java { .api }

223

public class SaslRpcHandler extends AbstractAuthRpcHandler {

224

/**

225

* Create SASL RPC handler

226

* @param conf - Transport configuration

227

* @param channel - Netty channel for communication

228

* @param delegate - Underlying RPC handler to delegate to after authentication

229

* @param secretKeyHolder - Provider for authentication secrets

230

*/

231

public SaslRpcHandler(TransportConf conf, Channel channel, RpcHandler delegate, SecretKeyHolder secretKeyHolder);

232

233

@Override

234

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

235

236

@Override

237

public StreamManager getStreamManager();

238

239

@Override

240

public void channelActive(TransportClient client);

241

242

@Override

243

public void channelInactive(TransportClient client);

244

}

245

```

246

247

## Secret Management

248

249

### SecretKeyHolder Interface

250

251

Interface for managing authentication secrets and user mappings.

252

253

```java { .api }

254

public interface SecretKeyHolder {

255

/**

256

* Get the SASL user identifier for the given application

257

* @param appId - Application identifier

258

* @return String representing the SASL user, or null if not found

259

*/

260

String getSaslUser(String appId);

261

262

/**

263

* Get the secret key for the given application

264

* @param appId - Application identifier

265

* @return String representing the secret key, or null if not found

266

*/

267

String getSecretKey(String appId);

268

}

269

```

270

271

## SASL Messages and Encryption

272

273

### SASL Message Protocol

274

275

Message types used in SASL authentication exchange.

276

277

```java { .api }

278

public class SaslMessage implements Encodable {

279

/**

280

* Create a SASL message

281

* @param payload - byte array containing SASL data

282

*/

283

public SaslMessage(byte[] payload);

284

285

/**

286

* Get the SASL payload

287

* @return byte array containing the SASL data

288

*/

289

public byte[] payload();

290

291

@Override

292

public int encodedLength();

293

294

@Override

295

public void encode(ByteBuf buf);

296

}

297

```

298

299

### SASL Encryption Backend

300

301

Backend interface for SASL encryption operations.

302

303

```java { .api }

304

public interface SaslEncryptionBackend {

305

/**

306

* Encrypt data using SASL encryption

307

* @param data - byte array to encrypt

308

* @param offset - starting offset in the data array

309

* @param len - number of bytes to encrypt

310

* @return byte array containing encrypted data

311

* @throws IOException if encryption fails

312

*/

313

byte[] wrap(byte[] data, int offset, int len) throws IOException;

314

315

/**

316

* Decrypt data using SASL encryption

317

* @param data - byte array to decrypt

318

* @param offset - starting offset in the data array

319

* @param len - number of bytes to decrypt

320

* @return byte array containing decrypted data

321

* @throws IOException if decryption fails

322

*/

323

byte[] unwrap(byte[] data, int offset, int len) throws IOException;

324

325

/**

326

* Dispose of the encryption backend and clean up resources

327

* @throws IOException if cleanup fails

328

*/

329

void dispose() throws IOException;

330

}

331

```

332

333

### SASL Encryption Implementation

334

335

Implementation of SASL encryption for secure data transmission.

336

337

```java { .api }

338

public class SaslEncryption implements SaslEncryptionBackend {

339

/**

340

* Create SASL encryption with the specified backend

341

* @param backend - SaslEncryptionBackend for actual encryption operations

342

*/

343

public SaslEncryption(SaslEncryptionBackend backend);

344

345

@Override

346

public byte[] wrap(byte[] data, int offset, int len) throws IOException;

347

348

@Override

349

public byte[] unwrap(byte[] data, int offset, int len) throws IOException;

350

351

@Override

352

public void dispose() throws IOException;

353

}

354

```

355

356

## Exception Classes

357

358

### SaslTimeoutException

359

360

Exception thrown when SASL authentication operations timeout.

361

362

```java { .api }

363

public class SaslTimeoutException extends RuntimeException {

364

/**

365

* Create SASL timeout exception

366

* @param message - Description of the timeout condition

367

*/

368

public SaslTimeoutException(String message);

369

370

/**

371

* Create SASL timeout exception with cause

372

* @param message - Description of the timeout condition

373

* @param cause - Underlying cause of the timeout

374

*/

375

public SaslTimeoutException(String message, Throwable cause);

376

}

377

```

378

379

## Usage Examples

380

381

### Setting Up Transport Encryption

382

383

```java

384

import org.apache.spark.network.crypto.*;

385

import javax.crypto.KeyGenerator;

386

import javax.crypto.SecretKey;

387

import java.security.SecureRandom;

388

389

// Generate AES key for encryption

390

KeyGenerator keyGen = KeyGenerator.getInstance("AES");

391

keyGen.init(256);

392

SecretKey secretKey = keyGen.generateKey();

393

394

// Create CTR cipher

395

byte[] iv = new byte[16];

396

new SecureRandom().nextBytes(iv);

397

CtrTransportCipher ctrCipher = new CtrTransportCipher(secretKey, iv);

398

399

// Create GCM cipher

400

GcmTransportCipher gcmCipher = new GcmTransportCipher(secretKey);

401

402

// Apply cipher to channel

403

Channel channel = getNettyChannel(); // Obtain channel from transport

404

ctrCipher.addToChannel(channel);

405

406

System.out.println("Encryption enabled with key ID: " + ctrCipher.getKeyId());

407

```

408

409

### SASL Authentication Setup

410

411

```java

412

import org.apache.spark.network.sasl.*;

413

414

// Create secret key holder

415

SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {

416

@Override

417

public String getSaslUser(String appId) {

418

return "spark-user-" + appId;

419

}

420

421

@Override

422

public String getSecretKey(String appId) {

423

return "secret-key-for-" + appId;

424

}

425

};

426

427

// Client-side SASL bootstrap

428

SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, "myapp", secretKeyHolder);

429

430

// Server-side SASL bootstrap

431

SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);

432

433

// Create authenticated transport context

434

List<TransportClientBootstrap> clientBootstraps = Arrays.asList(clientBootstrap);

435

List<TransportServerBootstrap> serverBootstraps = Arrays.asList(serverBootstrap);

436

437

TransportContext context = new TransportContext(conf, rpcHandler);

438

TransportClientFactory clientFactory = context.createClientFactory(clientBootstraps);

439

TransportServer server = context.createServer(9999, serverBootstraps);

440

441

System.out.println("SASL-authenticated server started on port: " + server.getPort());

442

```

443

444

### Authentication Protocol Implementation

445

446

```java

447

import org.apache.spark.network.crypto.*;

448

449

// Client-side authentication

450

AuthClientBootstrap authClient = new AuthClientBootstrap(conf, "spark-app", secretKeyHolder);

451

452

// Server-side authentication

453

AuthServerBootstrap authServer = new AuthServerBootstrap(conf, secretKeyHolder);

454

455

// Create transport with authentication

456

TransportContext authContext = new TransportContext(conf, rpcHandler);

457

TransportServer authServerInstance = authContext.createServer(8443, Arrays.asList(authServer));

458

TransportClientFactory authClientFactory = authContext.createClientFactory(Arrays.asList(authClient));

459

460

// Connect authenticated client

461

TransportClient authenticatedClient = authClientFactory.createClient("localhost", 8443);

462

System.out.println("Authenticated client connected: " + authenticatedClient.isActive());

463

```

464

465

### Custom Secret Key Management

466

467

```java

468

import java.util.concurrent.ConcurrentHashMap;

469

import java.util.Map;

470

471

// Custom secret key holder implementation

472

public class DatabaseSecretKeyHolder implements SecretKeyHolder {

473

private final Map<String, String> userCache = new ConcurrentHashMap<>();

474

private final Map<String, String> keyCache = new ConcurrentHashMap<>();

475

476

@Override

477

public String getSaslUser(String appId) {

478

return userCache.computeIfAbsent(appId, id -> {

479

// In real implementation, query database

480

return "user-" + id;

481

});

482

}

483

484

@Override

485

public String getSecretKey(String appId) {

486

return keyCache.computeIfAbsent(appId, id -> {

487

// In real implementation, securely retrieve from key store

488

return generateSecretKey(id);

489

});

490

}

491

492

private String generateSecretKey(String appId) {

493

// Generate or retrieve secure key for application

494

return "generated-key-" + appId.hashCode();

495

}

496

}

497

498

// Usage

499

SecretKeyHolder customKeyHolder = new DatabaseSecretKeyHolder();

500

SaslServerBootstrap customSaslServer = new SaslServerBootstrap(conf, customKeyHolder);

501

```

502

503

### SASL Encryption Usage

504

505

```java

506

import org.apache.spark.network.sasl.*;

507

508

// Create SASL client and server for testing

509

String secretKey = "shared-secret-key";

510

SparkSaslClient saslClient = new SparkSaslClient("app1", secretKey, true);

511

SparkSaslServer saslServer = new SparkSaslServer("app1", secretKey, true);

512

513

try {

514

// Perform SASL handshake

515

SaslClient client = saslClient.createSaslClient();

516

SaslServer server = saslServer.createSaslServer();

517

518

byte[] challenge = null;

519

byte[] response = client.hasInitialResponse() ? client.evaluateChallenge(new byte[0]) : null;

520

521

while (!client.isComplete() || !server.isComplete()) {

522

if (!server.isComplete()) {

523

challenge = server.evaluateResponse(response != null ? response : new byte[0]);

524

}

525

if (!client.isComplete()) {

526

response = client.evaluateChallenge(challenge != null ? challenge : new byte[0]);

527

}

528

}

529

530

System.out.println("SASL handshake completed successfully");

531

532

// Test encryption if enabled

533

if (saslClient.isComplete() && server.getQop() != null) {

534

String testData = "Hello, encrypted world!";

535

byte[] encrypted = saslClient.wrap(testData.getBytes(), 0, testData.getBytes().length);

536

byte[] decrypted = saslServer.unwrap(encrypted, 0, encrypted.length);

537

538

System.out.println("Original: " + testData);

539

System.out.println("Decrypted: " + new String(decrypted));

540

}

541

542

} catch (Exception e) {

543

System.err.println("SASL authentication failed: " + e.getMessage());

544

} finally {

545

saslClient.dispose();

546

saslServer.dispose();

547

}

548

```

549

550

### Integrated Security Setup

551

552

```java

553

// Complete security setup with both authentication and encryption

554

public TransportContext createSecureTransportContext(TransportConf conf, RpcHandler rpcHandler) throws Exception {

555

// Create secret key holder

556

SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {

557

@Override

558

public String getSaslUser(String appId) {

559

return System.getProperty("spark.app.user", "spark-user");

560

}

561

562

@Override

563

public String getSecretKey(String appId) {

564

return System.getProperty("spark.app.secret", "default-secret");

565

}

566

};

567

568

// Create bootstraps for authentication and encryption

569

List<TransportClientBootstrap> clientBootstraps = Arrays.asList(

570

new SaslClientBootstrap(conf, "secure-app", secretKeyHolder),

571

new AuthClientBootstrap(conf, "secure-app", secretKeyHolder)

572

);

573

574

List<TransportServerBootstrap> serverBootstraps = Arrays.asList(

575

new SaslServerBootstrap(conf, secretKeyHolder),

576

new AuthServerBootstrap(conf, secretKeyHolder)

577

);

578

579

// Create secure transport context

580

TransportContext secureContext = new TransportContext(conf, rpcHandler);

581

582

// Test the secure setup

583

TransportServer secureServer = secureContext.createServer(0, serverBootstraps);

584

TransportClientFactory secureClientFactory = secureContext.createClientFactory(clientBootstraps);

585

586

System.out.println("Secure transport context created successfully");

587

System.out.println("Server port: " + secureServer.getPort());

588

589

return secureContext;

590

}

591

```

592

593

### Handling Authentication Failures

594

595

```java

596

// Custom RPC handler with authentication error handling

597

public class SecureRpcHandler extends RpcHandler {

598

private final RpcHandler delegate;

599

private final Set<String> authenticatedClients = ConcurrentHashMap.newKeySet();

600

601

public SecureRpcHandler(RpcHandler delegate) {

602

this.delegate = delegate;

603

}

604

605

@Override

606

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {

607

String clientId = client.getSocketAddress().toString();

608

609

if (!authenticatedClients.contains(clientId)) {

610

callback.onFailure(new SecurityException("Client not authenticated: " + clientId));

611

return;

612

}

613

614

delegate.receive(client, message, callback);

615

}

616

617

@Override

618

public StreamManager getStreamManager() {

619

return delegate.getStreamManager();

620

}

621

622

@Override

623

public void channelActive(TransportClient client) {

624

// Mark client as authenticated after successful bootstrap

625

String clientId = client.getSocketAddress().toString();

626

authenticatedClients.add(clientId);

627

System.out.println("Client authenticated: " + clientId);

628

delegate.channelActive(client);

629

}

630

631

@Override

632

public void channelInactive(TransportClient client) {

633

String clientId = client.getSocketAddress().toString();

634

authenticatedClients.remove(clientId);

635

System.out.println("Client disconnected: " + clientId);

636

delegate.channelInactive(client);

637

}

638

639

@Override

640

public void exceptionCaught(Throwable cause, TransportClient client) {

641

if (cause instanceof SaslTimeoutException) {

642

System.err.println("SASL authentication timeout for client: " + client.getSocketAddress());

643

} else if (cause instanceof SecurityException) {

644

System.err.println("Security exception for client: " + client.getSocketAddress() + " - " + cause.getMessage());

645

}

646

delegate.exceptionCaught(cause, client);

647

}

648

}

649

```

650

651

## Types

652

653

### Authentication Message Types

654

655

```java { .api }

656

public class AuthMessage implements Encodable {

657

public static class Challenge extends AuthMessage {

658

public Challenge(byte[] challenge);

659

public byte[] challenge();

660

}

661

662

public static class Response extends AuthMessage {

663

public Response(byte[] response);

664

public byte[] response();

665

}

666

667

public static class Success extends AuthMessage {

668

public Success(byte[] payload);

669

public byte[] payload();

670

}

671

}

672

```

673

674

### Abstract Base Classes

675

676

```java { .api }

677

public abstract class AbstractAuthRpcHandler extends RpcHandler {

678

protected final TransportConf conf;

679

protected final Channel channel;

680

protected final RpcHandler delegate;

681

682

protected AbstractAuthRpcHandler(TransportConf conf, Channel channel, RpcHandler delegate);

683

684

protected abstract boolean isAuthenticated();

685

protected abstract void doAuthenticationHandshake(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

686

}

687

```