0
# SASL Authentication
1
2
Security framework providing SASL-based authentication and encryption for secure network communication in Apache Spark. The SASL integration enables secure connections with pluggable authentication mechanisms and optional data encryption.
3
4
## Capabilities
5
6
### SecretKeyHolder Interface
7
8
Interface for providing SASL authentication credentials and secret keys.
9
10
```java { .api }
11
/**
12
* SecretKeyHolder provides access to SASL credentials for authentication.
13
* Applications implement this interface to integrate with their credential management systems.
14
*/
15
public interface SecretKeyHolder {
16
/**
17
* Gets the SASL username for a specific application ID.
18
*
19
* @param appId The application identifier
20
* @return The SASL username to use for authentication
21
*/
22
String getSaslUser(String appId);
23
24
/**
25
* Gets the secret key for a specific application ID.
26
* The secret key is used for SASL authentication and optionally for encryption.
27
*
28
* @param appId The application identifier
29
* @return The secret key as a string
30
*/
31
String getSecretKey(String appId);
32
}
33
```
34
35
### Bootstrap Classes
36
37
Bootstrap implementations for integrating SASL authentication into the transport layer.
38
39
#### SaslClientBootstrap
40
41
```java { .api }
42
/**
43
* Client-side SASL authentication bootstrap that configures client channels
44
* for SASL authentication with the server.
45
*/
46
public class SaslClientBootstrap implements TransportClientBootstrap {
47
/**
48
* Creates a SASL client bootstrap.
49
*
50
* @param conf Transport configuration containing SASL settings
51
* @param appId Application identifier for credential lookup
52
* @param secretKeyHolder Provider for SASL credentials
53
*/
54
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
55
56
@Override
57
public void doBootstrap(TransportClient client, Channel channel) {
58
// Configures channel for SASL authentication
59
// Adds SASL handlers to the Netty pipeline
60
// Performs SASL handshake with server
61
}
62
}
63
```
64
65
#### SaslServerBootstrap
66
67
```java { .api }
68
/**
69
* Server-side SASL authentication bootstrap that configures server channels
70
* to require SASL authentication from clients.
71
*/
72
public class SaslServerBootstrap implements TransportServerBootstrap {
73
/**
74
* Creates a SASL server bootstrap.
75
*
76
* @param conf Transport configuration containing SASL settings
77
* @param secretKeyHolder Provider for SASL credentials
78
*/
79
public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
80
81
@Override
82
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
83
// Configures channel for SASL authentication
84
// Wraps the original RPC handler with SASL authentication
85
// Returns a SaslRpcHandler that enforces authentication
86
return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
87
}
88
}
89
```
90
91
### SASL Implementation Classes
92
93
#### SparkSaslClient
94
95
```java { .api }
96
/**
97
* Spark's SASL client implementation providing authentication capabilities.
98
* Handles the client side of SASL authentication protocols.
99
*/
100
public class SparkSaslClient {
101
/**
102
* Creates a SASL client for authentication.
103
*
104
* @param protocol The protocol name (typically "spark")
105
* @param serverName The server name for authentication
106
* @param props Additional SASL properties
107
* @param cbh Callback handler for authentication credentials
108
*/
109
public SparkSaslClient(String protocol, String serverName, Map<String, String> props,
110
CallbackHandler cbh);
111
112
/**
113
* Checks if the SASL client is complete (authentication finished).
114
*
115
* @return true if authentication is complete
116
*/
117
public boolean isComplete();
118
119
/**
120
* Gets the negotiated security layer (QOP - Quality of Protection).
121
*
122
* @return The negotiated QOP (auth, auth-int, or auth-conf)
123
*/
124
public String getNegotiatedProperty(String propName);
125
126
/**
127
* Processes a challenge from the server during authentication.
128
*
129
* @param challenge The challenge bytes from the server
130
* @return Response bytes to send back to server
131
* @throws SaslException if authentication fails
132
*/
133
public byte[] evaluateChallenge(byte[] challenge) throws SaslException;
134
135
/**
136
* Wraps outgoing data with security layer (encryption/integrity).
137
*
138
* @param outgoing The data to wrap
139
* @param offset Starting offset in the data
140
* @param len Length of data to wrap
141
* @return Wrapped data with security applied
142
* @throws SaslException if wrapping fails
143
*/
144
public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
145
146
/**
147
* Unwraps incoming data from security layer.
148
*
149
* @param incoming The wrapped data from server
150
* @param offset Starting offset in the data
151
* @param len Length of data to unwrap
152
* @return Unwrapped original data
153
* @throws SaslException if unwrapping fails
154
*/
155
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
156
157
/**
158
* Disposes of the SASL client and releases resources.
159
*
160
* @throws SaslException if disposal fails
161
*/
162
public void dispose() throws SaslException;
163
}
164
```
165
166
#### SparkSaslServer
167
168
```java { .api }
169
/**
170
* Spark's SASL server implementation providing authentication capabilities.
171
* Handles the server side of SASL authentication protocols.
172
*/
173
public class SparkSaslServer {
174
/**
175
* Creates a SASL server for authentication.
176
*
177
* @param protocol The protocol name (typically "spark")
178
* @param serverName The server name for authentication
179
* @param props Additional SASL properties
180
* @param cbh Callback handler for authentication credentials
181
*/
182
public SparkSaslServer(String protocol, String serverName, Map<String, String> props,
183
CallbackHandler cbh);
184
185
/**
186
* Checks if the SASL server is complete (authentication finished).
187
*
188
* @return true if authentication is complete
189
*/
190
public boolean isComplete();
191
192
/**
193
* Gets the authenticated user's authorization ID.
194
*
195
* @return The authorization ID of the authenticated user
196
*/
197
public String getAuthorizationID();
198
199
/**
200
* Processes a response from the client during authentication.
201
*
202
* @param response The response bytes from the client
203
* @return Challenge bytes to send to client (null if complete)
204
* @throws SaslException if authentication fails
205
*/
206
public byte[] evaluateResponse(byte[] response) throws SaslException;
207
208
/**
209
* Wraps outgoing data with security layer (encryption/integrity).
210
*
211
* @param outgoing The data to wrap
212
* @param offset Starting offset in the data
213
* @param len Length of data to wrap
214
* @return Wrapped data with security applied
215
* @throws SaslException if wrapping fails
216
*/
217
public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
218
219
/**
220
* Unwraps incoming data from security layer.
221
*
222
* @param incoming The wrapped data from client
223
* @param offset Starting offset in the data
224
* @param len Length of data to unwrap
225
* @return Unwrapped original data
226
* @throws SaslException if unwrapping fails
227
*/
228
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
229
230
/**
231
* Disposes of the SASL server and releases resources.
232
*
233
* @throws SaslException if disposal fails
234
*/
235
public void dispose() throws SaslException;
236
}
237
```
238
239
### SASL RPC Handler
240
241
#### SaslRpcHandler
242
243
```java { .api }
244
/**
245
* RPC handler that enforces SASL authentication before delegating to the wrapped handler.
246
* All RPC requests must be authenticated before processing.
247
*/
248
public class SaslRpcHandler extends RpcHandler {
249
/**
250
* Creates a SASL-protected RPC handler.
251
*
252
* @param conf Transport configuration
253
* @param channel The Netty channel for this connection
254
* @param wrappedHandler The underlying RPC handler to protect
255
* @param secretKeyHolder Provider for authentication credentials
256
*/
257
public SaslRpcHandler(TransportConf conf, Channel channel, RpcHandler wrappedHandler,
258
SecretKeyHolder secretKeyHolder);
259
260
@Override
261
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
262
// Ensures client is authenticated before processing RPC
263
if (!isAuthenticated(client)) {
264
callback.onFailure(new SecurityException("Client not authenticated"));
265
return;
266
}
267
268
// Delegate to wrapped handler if authenticated
269
wrappedHandler.receive(client, message, callback);
270
}
271
272
@Override
273
public StreamManager getStreamManager() {
274
return new AuthenticatedStreamManager(wrappedHandler.getStreamManager());
275
}
276
277
/**
278
* Checks if a client has completed SASL authentication.
279
*
280
* @param client The client to check
281
* @return true if client is authenticated
282
*/
283
public boolean isAuthenticated(TransportClient client);
284
}
285
```
286
287
### Encryption Support
288
289
#### SaslEncryptionBackend
290
291
```java { .api }
292
/**
293
* Interface for SASL encryption backend implementations.
294
* Provides encryption/decryption services when SASL QOP includes confidentiality.
295
*/
296
public interface SaslEncryptionBackend {
297
/**
298
* Gets the maximum size of data that can be encrypted in a single operation.
299
*
300
* @return Maximum encryption block size in bytes
301
*/
302
int maxEncryptedSize();
303
304
/**
305
* Encrypts data using the negotiated SASL security layer.
306
*
307
* @param data The plaintext data to encrypt
308
* @return Encrypted data
309
* @throws IOException if encryption fails
310
*/
311
byte[] encrypt(byte[] data) throws IOException;
312
313
/**
314
* Decrypts data using the negotiated SASL security layer.
315
*
316
* @param encryptedData The encrypted data to decrypt
317
* @return Plaintext data
318
* @throws IOException if decryption fails
319
*/
320
byte[] decrypt(byte[] encryptedData) throws IOException;
321
}
322
```
323
324
#### SaslEncryption
325
326
```java { .api }
327
/**
328
* Utility class for SASL encryption operations.
329
* Provides helper methods for encrypting and decrypting data streams.
330
*/
331
public class SaslEncryption {
332
/**
333
* Creates an encryption backend from a completed SASL client.
334
*
335
* @param saslClient The completed SASL client
336
* @param maxOutboundBlockSize Maximum size for outbound encrypted blocks
337
* @return SaslEncryptionBackend for encryption operations
338
*/
339
public static SaslEncryptionBackend createEncryptionBackend(SparkSaslClient saslClient,
340
int maxOutboundBlockSize);
341
342
/**
343
* Creates an encryption backend from a completed SASL server.
344
*
345
* @param saslServer The completed SASL server
346
* @param maxOutboundBlockSize Maximum size for outbound encrypted blocks
347
* @return SaslEncryptionBackend for encryption operations
348
*/
349
public static SaslEncryptionBackend createEncryptionBackend(SparkSaslServer saslServer,
350
int maxOutboundBlockSize);
351
352
/**
353
* Encrypts a managed buffer using SASL encryption.
354
*
355
* @param backend The encryption backend to use
356
* @param buffer The buffer to encrypt
357
* @return New managed buffer containing encrypted data
358
* @throws IOException if encryption fails
359
*/
360
public static ManagedBuffer encryptBuffer(SaslEncryptionBackend backend, ManagedBuffer buffer)
361
throws IOException;
362
363
/**
364
* Decrypts a managed buffer using SASL encryption.
365
*
366
* @param backend The encryption backend to use
367
* @param encryptedBuffer The encrypted buffer to decrypt
368
* @return New managed buffer containing decrypted data
369
* @throws IOException if decryption fails
370
*/
371
public static ManagedBuffer decryptBuffer(SaslEncryptionBackend backend, ManagedBuffer encryptedBuffer)
372
throws IOException;
373
}
374
```
375
376
### SASL Messages
377
378
#### SaslMessage
379
380
```java { .api }
381
/**
382
* Message wrapper for SASL protocol messages during authentication handshake.
383
* Used internally by the SASL bootstrap implementations.
384
*/
385
public class SaslMessage extends AbstractMessage {
386
/** The SASL message payload */
387
public final ManagedBuffer body;
388
389
/**
390
* Creates a SASL protocol message.
391
*
392
* @param body The SASL message data
393
*/
394
public SaslMessage(ManagedBuffer body);
395
396
@Override
397
public Type type() {
398
return Type.User; // SASL messages use User type
399
}
400
}
401
```
402
403
## Usage Examples
404
405
### Basic SASL Setup
406
407
```java
408
import org.apache.spark.network.sasl.*;
409
import org.apache.spark.network.TransportContext;
410
import java.util.Arrays;
411
412
// Implement SecretKeyHolder
413
public class SimpleSecretKeyHolder implements SecretKeyHolder {
414
private final Map<String, String> secrets = new HashMap<>();
415
416
public SimpleSecretKeyHolder() {
417
// In practice, load from secure configuration
418
secrets.put("spark-app-1", "secret-key-123");
419
secrets.put("spark-app-2", "secret-key-456");
420
}
421
422
@Override
423
public String getSaslUser(String appId) {
424
return "spark-user-" + appId;
425
}
426
427
@Override
428
public String getSecretKey(String appId) {
429
return secrets.get(appId);
430
}
431
}
432
433
// Create server with SASL authentication
434
SecretKeyHolder secretKeyHolder = new SimpleSecretKeyHolder();
435
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
436
437
TransportContext context = new TransportContext(conf, rpcHandler);
438
TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));
439
440
// Create client with SASL authentication
441
String appId = "spark-app-1";
442
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, appId, secretKeyHolder);
443
444
TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));
445
TransportClient client = clientFactory.createClient("localhost", 8080);
446
```
447
448
### Advanced SASL Configuration
449
450
```java
451
import org.apache.spark.network.util.MapConfigProvider;
452
453
public class SaslConfiguration {
454
public static TransportConf createSaslConfig() {
455
Map<String, String> config = new HashMap<>();
456
457
// Enable SASL authentication
458
config.put("spark.authenticate", "true");
459
config.put("spark.authenticate.secret", "default-secret");
460
461
// Configure SASL properties
462
config.put("spark.network.sasl.serverAlwaysEncrypt", "true");
463
config.put("spark.network.sasl.maxEncryptedBlockSize", "65536");
464
config.put("spark.network.sasl.timeout", "30s");
465
466
// Configure encryption (QOP - Quality of Protection)
467
config.put("javax.security.sasl.qop", "auth-conf"); // auth, auth-int, or auth-conf
468
469
return new TransportConf("spark.network", new MapConfigProvider(config));
470
}
471
}
472
```
473
474
### Custom Secret Key Management
475
476
```java
477
public class DatabaseSecretKeyHolder implements SecretKeyHolder {
478
private final DataSource dataSource;
479
private final Cache<String, String> secretCache;
480
481
public DatabaseSecretKeyHolder(DataSource dataSource) {
482
this.dataSource = dataSource;
483
this.secretCache = CacheBuilder.newBuilder()
484
.maximumSize(1000)
485
.expireAfterWrite(10, TimeUnit.MINUTES)
486
.build();
487
}
488
489
@Override
490
public String getSaslUser(String appId) {
491
return "spark-user"; // Could be app-specific
492
}
493
494
@Override
495
public String getSecretKey(String appId) {
496
try {
497
return secretCache.get(appId, () -> loadSecretFromDatabase(appId));
498
} catch (Exception e) {
499
throw new RuntimeException("Failed to load secret for app: " + appId, e);
500
}
501
}
502
503
private String loadSecretFromDatabase(String appId) throws SQLException {
504
try (Connection conn = dataSource.getConnection();
505
PreparedStatement stmt = conn.prepareStatement(
506
"SELECT secret_key FROM app_secrets WHERE app_id = ?")) {
507
508
stmt.setString(1, appId);
509
510
try (ResultSet rs = stmt.executeQuery()) {
511
if (rs.next()) {
512
return rs.getString("secret_key");
513
} else {
514
throw new SecurityException("No secret found for app: " + appId);
515
}
516
}
517
}
518
}
519
}
520
```
521
522
### Handling SASL Authentication Errors
523
524
```java
525
public class SaslErrorHandler {
526
public void handleClientCreation() {
527
try {
528
TransportClient client = clientFactory.createClient("remote-host", 9090);
529
530
// Test connection with a simple RPC
531
ByteBuffer testMessage = ByteBuffer.wrap("ping".getBytes());
532
ByteBuffer response = client.sendRpcSync(testMessage, 5000);
533
534
System.out.println("SASL authentication successful");
535
536
} catch (Exception e) {
537
if (e.getCause() instanceof SaslException) {
538
System.err.println("SASL authentication failed: " + e.getCause().getMessage());
539
handleSaslFailure((SaslException) e.getCause());
540
} else {
541
System.err.println("Connection failed: " + e.getMessage());
542
}
543
}
544
}
545
546
private void handleSaslFailure(SaslException e) {
547
String message = e.getMessage();
548
549
if (message.contains("authentication failed")) {
550
System.err.println("Invalid credentials - check secret key");
551
} else if (message.contains("timeout")) {
552
System.err.println("SASL handshake timed out - check network connectivity");
553
} else if (message.contains("mechanism")) {
554
System.err.println("SASL mechanism not supported - check configuration");
555
} else {
556
System.err.println("Unknown SASL error: " + message);
557
}
558
}
559
}
560
```
561
562
### SASL with Encryption
563
564
```java
565
public class EncryptedCommunicationExample {
566
public void setupEncryptedCommunication() {
567
// Configure for encryption (confidentiality)
568
Map<String, String> config = new HashMap<>();
569
config.put("javax.security.sasl.qop", "auth-conf"); // Enable encryption
570
config.put("spark.network.sasl.serverAlwaysEncrypt", "true");
571
config.put("spark.network.sasl.maxEncryptedBlockSize", "65536"); // 64KB blocks
572
573
TransportConf conf = new TransportConf("spark.network", new MapConfigProvider(config));
574
575
// Rest of setup same as basic SASL
576
SecretKeyHolder secretKeyHolder = new SimpleSecretKeyHolder();
577
578
// Server with encryption
579
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
580
TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));
581
582
// Client with encryption
583
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, "app-1", secretKeyHolder);
584
TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));
585
586
// All communication is now encrypted automatically
587
TransportClient client = clientFactory.createClient("localhost", 8080);
588
}
589
}
590
```
591
592
### SASL Authentication Monitoring
593
594
```java
595
public class SaslMonitoring {
596
private final AtomicLong successfulAuth = new AtomicLong(0);
597
private final AtomicLong failedAuth = new AtomicLong(0);
598
599
public class MonitoringSecretKeyHolder implements SecretKeyHolder {
600
private final SecretKeyHolder delegate;
601
602
public MonitoringSecretKeyHolder(SecretKeyHolder delegate) {
603
this.delegate = delegate;
604
}
605
606
@Override
607
public String getSaslUser(String appId) {
608
try {
609
String user = delegate.getSaslUser(appId);
610
System.out.println("SASL user lookup for app " + appId + ": " + user);
611
return user;
612
} catch (Exception e) {
613
System.err.println("Failed to get SASL user for app " + appId + ": " + e.getMessage());
614
throw e;
615
}
616
}
617
618
@Override
619
public String getSecretKey(String appId) {
620
try {
621
String key = delegate.getSecretKey(appId);
622
successfulAuth.incrementAndGet();
623
System.out.println("SASL secret key retrieved for app: " + appId);
624
return key;
625
} catch (Exception e) {
626
failedAuth.incrementAndGet();
627
System.err.println("Failed to get secret key for app " + appId + ": " + e.getMessage());
628
throw e;
629
}
630
}
631
}
632
633
public void printStats() {
634
System.out.println("SASL Authentication Stats:");
635
System.out.println(" Successful: " + successfulAuth.get());
636
System.out.println(" Failed: " + failedAuth.get());
637
}
638
}