0
# Security and Authentication
1
2
The security and authentication API provides comprehensive protection for Apache Spark's network communications through SASL authentication mechanisms and AES encryption protocols. This layer ensures secure data transmission and proper authentication of clients and servers in distributed Spark environments.
3
4
## Capabilities
5
6
### Transport Encryption
7
8
Core interface for transport layer encryption supporting multiple cipher algorithms.
9
10
```java { .api }
11
public interface TransportCipher {
12
/**
13
* Get the key identifier for this cipher
14
* @return String representing the key ID used for encryption
15
* @throws GeneralSecurityException if key retrieval fails
16
*/
17
String getKeyId() throws GeneralSecurityException;
18
19
/**
20
* Add encryption/decryption handlers to the Netty channel pipeline
21
* @param channel - Netty Channel to add cipher handlers to
22
* @throws IOException if I/O operations fail during setup
23
* @throws GeneralSecurityException if cryptographic operations fail
24
*/
25
void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
26
}
27
```
28
29
### AES-CTR Cipher Implementation
30
31
Counter mode AES cipher implementation for transport layer encryption.
32
33
```java { .api }
34
public class CtrTransportCipher implements TransportCipher {
35
/**
36
* Create CTR transport cipher with specified key and initialization vector
37
* @param key - Secret key for AES encryption
38
* @param iv - Initialization vector for CTR mode
39
* @throws GeneralSecurityException if cipher initialization fails
40
*/
41
public CtrTransportCipher(SecretKey key, byte[] iv) throws GeneralSecurityException;
42
43
@Override
44
public String getKeyId() throws GeneralSecurityException;
45
46
@Override
47
public void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
48
}
49
```
50
51
### AES-GCM Cipher Implementation
52
53
Galois/Counter Mode AES cipher implementation providing authenticated encryption.
54
55
```java { .api }
56
public class GcmTransportCipher implements TransportCipher {
57
/**
58
* Create GCM transport cipher with specified key
59
* @param key - Secret key for AES-GCM encryption
60
* @throws GeneralSecurityException if cipher initialization fails
61
*/
62
public GcmTransportCipher(SecretKey key) throws GeneralSecurityException;
63
64
@Override
65
public String getKeyId() throws GeneralSecurityException;
66
67
@Override
68
public void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
69
}
70
```
71
72
## Authentication Protocol
73
74
### Authentication Client Bootstrap
75
76
Client bootstrap for setting up authentication protocol with servers.
77
78
```java { .api }
79
public class AuthClientBootstrap implements TransportClientBootstrap {
80
/**
81
* Create authentication client bootstrap
82
* @param conf - Transport configuration containing auth settings
83
* @param appId - Application identifier for authentication
84
* @param secretKeyHolder - Provider for authentication secrets
85
*/
86
public AuthClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
87
88
@Override
89
public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
90
}
91
```
92
93
### Authentication Server Bootstrap
94
95
Server bootstrap for handling authentication protocol from clients.
96
97
```java { .api }
98
public class AuthServerBootstrap implements TransportServerBootstrap {
99
/**
100
* Create authentication server bootstrap
101
* @param conf - Transport configuration containing auth settings
102
* @param secretKeyHolder - Provider for validating authentication secrets
103
*/
104
public AuthServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
105
106
@Override
107
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
108
}
109
```
110
111
## SASL Authentication
112
113
### SASL Client Implementation
114
115
SASL client implementation for Spark's authentication needs.
116
117
```java { .api }
118
public class SparkSaslClient implements SaslEncryptionBackend {
119
/**
120
* Create SASL client for authentication
121
* @param secretKeyId - Identifier for the secret key
122
* @param secretKey - Secret key for SASL authentication
123
* @param encryptionEnabled - Whether to enable SASL encryption
124
*/
125
public SparkSaslClient(String secretKeyId, String secretKey, boolean encryptionEnabled);
126
127
/**
128
* Create and configure SASL client
129
* @return SaslClient configured for Spark authentication
130
* @throws IOException if SASL client creation fails
131
*/
132
public SaslClient createSaslClient() throws IOException;
133
134
/**
135
* Check if SASL negotiation is complete
136
* @return boolean indicating if authentication is complete
137
*/
138
public boolean isComplete();
139
140
/**
141
* Process a SASL challenge from the server
142
* @param challenge - byte array containing the server challenge
143
* @return byte array containing the client response
144
* @throws IOException if challenge processing fails
145
*/
146
public byte[] response(byte[] challenge) throws IOException;
147
}
148
```
149
150
### SASL Server Implementation
151
152
SASL server implementation for validating client authentications.
153
154
```java { .api }
155
public class SparkSaslServer implements SaslEncryptionBackend {
156
/**
157
* Create SASL server for authentication
158
* @param secretKeyId - Identifier for the secret key
159
* @param secretKey - Secret key for SASL authentication
160
* @param encryptionEnabled - Whether to enable SASL encryption
161
*/
162
public SparkSaslServer(String secretKeyId, String secretKey, boolean encryptionEnabled);
163
164
/**
165
* Create and configure SASL server
166
* @return SaslServer configured for Spark authentication
167
* @throws IOException if SASL server creation fails
168
*/
169
public SaslServer createSaslServer() throws IOException;
170
171
/**
172
* Check if SASL negotiation is complete
173
* @return boolean indicating if authentication is complete
174
*/
175
public boolean isComplete();
176
177
/**
178
* Process a SASL response from the client
179
* @param response - byte array containing the client response
180
* @return byte array containing the server challenge
181
* @throws IOException if response processing fails
182
*/
183
public byte[] response(byte[] response) throws IOException;
184
}
185
```
186
187
### SASL Bootstrap Components
188
189
Bootstrap implementations for integrating SASL authentication into the transport layer.
190
191
```java { .api }
192
public class SaslClientBootstrap implements TransportClientBootstrap {
193
/**
194
* Create SASL client bootstrap
195
* @param conf - Transport configuration
196
* @param appId - Application identifier
197
* @param secretKeyHolder - Provider for authentication secrets
198
*/
199
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
200
201
@Override
202
public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
203
}
204
205
public class SaslServerBootstrap implements TransportServerBootstrap {
206
/**
207
* Create SASL server bootstrap
208
* @param conf - Transport configuration
209
* @param secretKeyHolder - Provider for validating authentication secrets
210
*/
211
public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
212
213
@Override
214
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
215
}
216
```
217
218
### SASL RPC Handler
219
220
RPC handler that provides SASL authentication capabilities.
221
222
```java { .api }
223
public class SaslRpcHandler extends AbstractAuthRpcHandler {
224
/**
225
* Create SASL RPC handler
226
* @param conf - Transport configuration
227
* @param channel - Netty channel for communication
228
* @param delegate - Underlying RPC handler to delegate to after authentication
229
* @param secretKeyHolder - Provider for authentication secrets
230
*/
231
public SaslRpcHandler(TransportConf conf, Channel channel, RpcHandler delegate, SecretKeyHolder secretKeyHolder);
232
233
@Override
234
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
235
236
@Override
237
public StreamManager getStreamManager();
238
239
@Override
240
public void channelActive(TransportClient client);
241
242
@Override
243
public void channelInactive(TransportClient client);
244
}
245
```
246
247
## Secret Management
248
249
### SecretKeyHolder Interface
250
251
Interface for managing authentication secrets and user mappings.
252
253
```java { .api }
254
public interface SecretKeyHolder {
255
/**
256
* Get the SASL user identifier for the given application
257
* @param appId - Application identifier
258
* @return String representing the SASL user, or null if not found
259
*/
260
String getSaslUser(String appId);
261
262
/**
263
* Get the secret key for the given application
264
* @param appId - Application identifier
265
* @return String representing the secret key, or null if not found
266
*/
267
String getSecretKey(String appId);
268
}
269
```
270
271
## SASL Messages and Encryption
272
273
### SASL Message Protocol
274
275
Message types used in SASL authentication exchange.
276
277
```java { .api }
278
public class SaslMessage implements Encodable {
279
/**
280
* Create a SASL message
281
* @param payload - byte array containing SASL data
282
*/
283
public SaslMessage(byte[] payload);
284
285
/**
286
* Get the SASL payload
287
* @return byte array containing the SASL data
288
*/
289
public byte[] payload();
290
291
@Override
292
public int encodedLength();
293
294
@Override
295
public void encode(ByteBuf buf);
296
}
297
```
298
299
### SASL Encryption Backend
300
301
Backend interface for SASL encryption operations.
302
303
```java { .api }
304
public interface SaslEncryptionBackend {
305
/**
306
* Encrypt data using SASL encryption
307
* @param data - byte array to encrypt
308
* @param offset - starting offset in the data array
309
* @param len - number of bytes to encrypt
310
* @return byte array containing encrypted data
311
* @throws IOException if encryption fails
312
*/
313
byte[] wrap(byte[] data, int offset, int len) throws IOException;
314
315
/**
316
* Decrypt data using SASL encryption
317
* @param data - byte array to decrypt
318
* @param offset - starting offset in the data array
319
* @param len - number of bytes to decrypt
320
* @return byte array containing decrypted data
321
* @throws IOException if decryption fails
322
*/
323
byte[] unwrap(byte[] data, int offset, int len) throws IOException;
324
325
/**
326
* Dispose of the encryption backend and clean up resources
327
* @throws IOException if cleanup fails
328
*/
329
void dispose() throws IOException;
330
}
331
```
332
333
### SASL Encryption Implementation
334
335
Implementation of SASL encryption for secure data transmission.
336
337
```java { .api }
338
public class SaslEncryption implements SaslEncryptionBackend {
339
/**
340
* Create SASL encryption with the specified backend
341
* @param backend - SaslEncryptionBackend for actual encryption operations
342
*/
343
public SaslEncryption(SaslEncryptionBackend backend);
344
345
@Override
346
public byte[] wrap(byte[] data, int offset, int len) throws IOException;
347
348
@Override
349
public byte[] unwrap(byte[] data, int offset, int len) throws IOException;
350
351
@Override
352
public void dispose() throws IOException;
353
}
354
```
355
356
## Exception Classes
357
358
### SaslTimeoutException
359
360
Exception thrown when SASL authentication operations timeout.
361
362
```java { .api }
363
public class SaslTimeoutException extends RuntimeException {
364
/**
365
* Create SASL timeout exception
366
* @param message - Description of the timeout condition
367
*/
368
public SaslTimeoutException(String message);
369
370
/**
371
* Create SASL timeout exception with cause
372
* @param message - Description of the timeout condition
373
* @param cause - Underlying cause of the timeout
374
*/
375
public SaslTimeoutException(String message, Throwable cause);
376
}
377
```
378
379
## Usage Examples
380
381
### Setting Up Transport Encryption
382
383
```java
384
import org.apache.spark.network.crypto.*;
385
import javax.crypto.KeyGenerator;
386
import javax.crypto.SecretKey;
387
import java.security.SecureRandom;
388
389
// Generate AES key for encryption
390
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
391
keyGen.init(256);
392
SecretKey secretKey = keyGen.generateKey();
393
394
// Create CTR cipher
395
byte[] iv = new byte[16];
396
new SecureRandom().nextBytes(iv);
397
CtrTransportCipher ctrCipher = new CtrTransportCipher(secretKey, iv);
398
399
// Create GCM cipher
400
GcmTransportCipher gcmCipher = new GcmTransportCipher(secretKey);
401
402
// Apply cipher to channel
403
Channel channel = getNettyChannel(); // Obtain channel from transport
404
ctrCipher.addToChannel(channel);
405
406
System.out.println("Encryption enabled with key ID: " + ctrCipher.getKeyId());
407
```
408
409
### SASL Authentication Setup
410
411
```java
412
import org.apache.spark.network.sasl.*;
413
414
// Create secret key holder
415
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
416
@Override
417
public String getSaslUser(String appId) {
418
return "spark-user-" + appId;
419
}
420
421
@Override
422
public String getSecretKey(String appId) {
423
return "secret-key-for-" + appId;
424
}
425
};
426
427
// Client-side SASL bootstrap
428
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, "myapp", secretKeyHolder);
429
430
// Server-side SASL bootstrap
431
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
432
433
// Create authenticated transport context
434
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(clientBootstrap);
435
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(serverBootstrap);
436
437
TransportContext context = new TransportContext(conf, rpcHandler);
438
TransportClientFactory clientFactory = context.createClientFactory(clientBootstraps);
439
TransportServer server = context.createServer(9999, serverBootstraps);
440
441
System.out.println("SASL-authenticated server started on port: " + server.getPort());
442
```
443
444
### Authentication Protocol Implementation
445
446
```java
447
import org.apache.spark.network.crypto.*;
448
449
// Client-side authentication
450
AuthClientBootstrap authClient = new AuthClientBootstrap(conf, "spark-app", secretKeyHolder);
451
452
// Server-side authentication
453
AuthServerBootstrap authServer = new AuthServerBootstrap(conf, secretKeyHolder);
454
455
// Create transport with authentication
456
TransportContext authContext = new TransportContext(conf, rpcHandler);
457
TransportServer authServerInstance = authContext.createServer(8443, Arrays.asList(authServer));
458
TransportClientFactory authClientFactory = authContext.createClientFactory(Arrays.asList(authClient));
459
460
// Connect authenticated client
461
TransportClient authenticatedClient = authClientFactory.createClient("localhost", 8443);
462
System.out.println("Authenticated client connected: " + authenticatedClient.isActive());
463
```
464
465
### Custom Secret Key Management
466
467
```java
468
import java.util.concurrent.ConcurrentHashMap;
469
import java.util.Map;
470
471
// Custom secret key holder implementation
472
public class DatabaseSecretKeyHolder implements SecretKeyHolder {
473
private final Map<String, String> userCache = new ConcurrentHashMap<>();
474
private final Map<String, String> keyCache = new ConcurrentHashMap<>();
475
476
@Override
477
public String getSaslUser(String appId) {
478
return userCache.computeIfAbsent(appId, id -> {
479
// In real implementation, query database
480
return "user-" + id;
481
});
482
}
483
484
@Override
485
public String getSecretKey(String appId) {
486
return keyCache.computeIfAbsent(appId, id -> {
487
// In real implementation, securely retrieve from key store
488
return generateSecretKey(id);
489
});
490
}
491
492
private String generateSecretKey(String appId) {
493
// Generate or retrieve secure key for application
494
return "generated-key-" + appId.hashCode();
495
}
496
}
497
498
// Usage
499
SecretKeyHolder customKeyHolder = new DatabaseSecretKeyHolder();
500
SaslServerBootstrap customSaslServer = new SaslServerBootstrap(conf, customKeyHolder);
501
```
502
503
### SASL Encryption Usage
504
505
```java
506
import org.apache.spark.network.sasl.*;
507
508
// Create SASL client and server for testing
509
String secretKey = "shared-secret-key";
510
SparkSaslClient saslClient = new SparkSaslClient("app1", secretKey, true);
511
SparkSaslServer saslServer = new SparkSaslServer("app1", secretKey, true);
512
513
try {
514
// Perform SASL handshake
515
SaslClient client = saslClient.createSaslClient();
516
SaslServer server = saslServer.createSaslServer();
517
518
byte[] challenge = null;
519
byte[] response = client.hasInitialResponse() ? client.evaluateChallenge(new byte[0]) : null;
520
521
while (!client.isComplete() || !server.isComplete()) {
522
if (!server.isComplete()) {
523
challenge = server.evaluateResponse(response != null ? response : new byte[0]);
524
}
525
if (!client.isComplete()) {
526
response = client.evaluateChallenge(challenge != null ? challenge : new byte[0]);
527
}
528
}
529
530
System.out.println("SASL handshake completed successfully");
531
532
// Test encryption if enabled
533
if (saslClient.isComplete() && server.getQop() != null) {
534
String testData = "Hello, encrypted world!";
535
byte[] encrypted = saslClient.wrap(testData.getBytes(), 0, testData.getBytes().length);
536
byte[] decrypted = saslServer.unwrap(encrypted, 0, encrypted.length);
537
538
System.out.println("Original: " + testData);
539
System.out.println("Decrypted: " + new String(decrypted));
540
}
541
542
} catch (Exception e) {
543
System.err.println("SASL authentication failed: " + e.getMessage());
544
} finally {
545
saslClient.dispose();
546
saslServer.dispose();
547
}
548
```
549
550
### Integrated Security Setup
551
552
```java
553
// Complete security setup with both authentication and encryption
554
public TransportContext createSecureTransportContext(TransportConf conf, RpcHandler rpcHandler) throws Exception {
555
// Create secret key holder
556
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
557
@Override
558
public String getSaslUser(String appId) {
559
return System.getProperty("spark.app.user", "spark-user");
560
}
561
562
@Override
563
public String getSecretKey(String appId) {
564
return System.getProperty("spark.app.secret", "default-secret");
565
}
566
};
567
568
// Create bootstraps for authentication and encryption
569
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
570
new SaslClientBootstrap(conf, "secure-app", secretKeyHolder),
571
new AuthClientBootstrap(conf, "secure-app", secretKeyHolder)
572
);
573
574
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
575
new SaslServerBootstrap(conf, secretKeyHolder),
576
new AuthServerBootstrap(conf, secretKeyHolder)
577
);
578
579
// Create secure transport context
580
TransportContext secureContext = new TransportContext(conf, rpcHandler);
581
582
// Test the secure setup
583
TransportServer secureServer = secureContext.createServer(0, serverBootstraps);
584
TransportClientFactory secureClientFactory = secureContext.createClientFactory(clientBootstraps);
585
586
System.out.println("Secure transport context created successfully");
587
System.out.println("Server port: " + secureServer.getPort());
588
589
return secureContext;
590
}
591
```
592
593
### Handling Authentication Failures
594
595
```java
596
// Custom RPC handler with authentication error handling
597
public class SecureRpcHandler extends RpcHandler {
598
private final RpcHandler delegate;
599
private final Set<String> authenticatedClients = ConcurrentHashMap.newKeySet();
600
601
public SecureRpcHandler(RpcHandler delegate) {
602
this.delegate = delegate;
603
}
604
605
@Override
606
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
607
String clientId = client.getSocketAddress().toString();
608
609
if (!authenticatedClients.contains(clientId)) {
610
callback.onFailure(new SecurityException("Client not authenticated: " + clientId));
611
return;
612
}
613
614
delegate.receive(client, message, callback);
615
}
616
617
@Override
618
public StreamManager getStreamManager() {
619
return delegate.getStreamManager();
620
}
621
622
@Override
623
public void channelActive(TransportClient client) {
624
// Mark client as authenticated after successful bootstrap
625
String clientId = client.getSocketAddress().toString();
626
authenticatedClients.add(clientId);
627
System.out.println("Client authenticated: " + clientId);
628
delegate.channelActive(client);
629
}
630
631
@Override
632
public void channelInactive(TransportClient client) {
633
String clientId = client.getSocketAddress().toString();
634
authenticatedClients.remove(clientId);
635
System.out.println("Client disconnected: " + clientId);
636
delegate.channelInactive(client);
637
}
638
639
@Override
640
public void exceptionCaught(Throwable cause, TransportClient client) {
641
if (cause instanceof SaslTimeoutException) {
642
System.err.println("SASL authentication timeout for client: " + client.getSocketAddress());
643
} else if (cause instanceof SecurityException) {
644
System.err.println("Security exception for client: " + client.getSocketAddress() + " - " + cause.getMessage());
645
}
646
delegate.exceptionCaught(cause, client);
647
}
648
}
649
```
650
651
## Types
652
653
### Authentication Message Types
654
655
```java { .api }
656
public class AuthMessage implements Encodable {
657
public static class Challenge extends AuthMessage {
658
public Challenge(byte[] challenge);
659
public byte[] challenge();
660
}
661
662
public static class Response extends AuthMessage {
663
public Response(byte[] response);
664
public byte[] response();
665
}
666
667
public static class Success extends AuthMessage {
668
public Success(byte[] payload);
669
public byte[] payload();
670
}
671
}
672
```
673
674
### Abstract Base Classes
675
676
```java { .api }
677
public abstract class AbstractAuthRpcHandler extends RpcHandler {
678
protected final TransportConf conf;
679
protected final Channel channel;
680
protected final RpcHandler delegate;
681
682
protected AbstractAuthRpcHandler(TransportConf conf, Channel channel, RpcHandler delegate);
683
684
protected abstract boolean isAuthenticated();
685
protected abstract void doAuthenticationHandshake(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
686
}
687
```