0
# Authentication
1
2
Pluggable authentication system supporting SASL and custom authentication protocols with encryption capabilities, providing secure communication channels for distributed Spark applications.
3
4
## Capabilities
5
6
### SASL Authentication
7
8
SASL (Simple Authentication and Security Layer) support for secure authentication with multiple mechanisms and encryption.
9
10
```java { .api }
11
/**
12
* Client-side SASL authentication bootstrap for secure connections
13
*/
14
public class SaslClientBootstrap implements TransportClientBootstrap {
15
/**
16
* Create a SASL client bootstrap
17
* @param conf Transport configuration containing SASL settings
18
* @param appId Application identifier for authentication
19
* @param secretKeyHolder Provider for secret keys and user information
20
*/
21
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
22
23
/**
24
* Bootstrap the client channel with SASL authentication
25
* @param client Transport client instance
26
* @param channel Netty channel to authenticate
27
* @throws RuntimeException if authentication fails
28
*/
29
public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
30
}
31
32
/**
33
* Server-side SASL authentication bootstrap for accepting secure connections
34
*/
35
public class SaslServerBootstrap implements TransportServerBootstrap {
36
/**
37
* Create a SASL server bootstrap
38
* @param conf Transport configuration containing SASL settings
39
* @param secretKeyHolder Provider for secret keys and user information
40
*/
41
public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
42
43
/**
44
* Bootstrap the server channel with SASL authentication
45
* @param channel Netty channel to authenticate
46
* @param rpcHandler Original RPC handler to wrap with authentication
47
* @return RPC handler with SASL authentication support
48
*/
49
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
50
}
51
```
52
53
**Usage Examples:**
54
55
```java
56
// Client-side SASL setup
57
SecretKeyHolder keyHolder = new MySecretKeyHolder();
58
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
59
new SaslClientBootstrap(conf, "my-app-id", keyHolder)
60
);
61
TransportClientFactory factory = context.createClientFactory(clientBootstraps);
62
TransportClient client = factory.createClient("spark-server", 7337);
63
64
// Server-side SASL setup
65
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
66
new SaslServerBootstrap(conf, keyHolder)
67
);
68
TransportServer server = context.createServer(7337, serverBootstraps);
69
```
70
71
### Secret Key Management
72
73
Interface for providing authentication credentials and managing secret keys securely.
74
75
```java { .api }
76
/**
77
* Interface for holding and retrieving secret keys for authentication
78
* Implementations should provide secure storage and retrieval of credentials
79
*/
80
public interface SecretKeyHolder {
81
/**
82
* Get the SASL user identifier for the given application
83
* @param appId Application identifier
84
* @return SASL username for authentication
85
*/
86
String getSaslUser(String appId);
87
88
/**
89
* Get the secret key for the given application
90
* @param appId Application identifier
91
* @return Secret key for authentication
92
*/
93
String getSecretKey(String appId);
94
}
95
```
96
97
**Usage Examples:**
98
99
```java
100
// Implementing SecretKeyHolder
101
public class FileBasedSecretKeyHolder implements SecretKeyHolder {
102
private final Properties secrets;
103
104
public FileBasedSecretKeyHolder(String secretsFile) throws IOException {
105
secrets = new Properties();
106
try (InputStream is = new FileInputStream(secretsFile)) {
107
secrets.load(is);
108
}
109
}
110
111
@Override
112
public String getSaslUser(String appId) {
113
return secrets.getProperty(appId + ".user", appId);
114
}
115
116
@Override
117
public String getSecretKey(String appId) {
118
return secrets.getProperty(appId + ".secret");
119
}
120
}
121
122
// In-memory implementation
123
public class MapBasedSecretKeyHolder implements SecretKeyHolder {
124
private final Map<String, String> users = new HashMap<>();
125
private final Map<String, String> secrets = new HashMap<>();
126
127
public void addCredential(String appId, String user, String secret) {
128
users.put(appId, user);
129
secrets.put(appId, secret);
130
}
131
132
@Override
133
public String getSaslUser(String appId) {
134
return users.get(appId);
135
}
136
137
@Override
138
public String getSecretKey(String appId) {
139
return secrets.get(appId);
140
}
141
}
142
```
143
144
### SASL Implementation Classes
145
146
Internal SASL client and server implementations providing the authentication protocol mechanics.
147
148
```java { .api }
149
/**
150
* SASL client implementation for Spark providing authentication handshake
151
*/
152
public class SparkSaslClient {
153
/**
154
* Create a SASL client for authentication
155
* @param secretKeyId Application identifier for key lookup
156
* @param secretKeyHolder Provider for authentication credentials
157
* @param encrypt Whether to enable SASL encryption after authentication
158
*/
159
public SparkSaslClient(String secretKeyId, SecretKeyHolder secretKeyHolder, boolean encrypt);
160
161
/**
162
* Check if client is complete (authentication finished)
163
* @return true if authentication is complete
164
*/
165
public boolean isComplete();
166
167
/**
168
* Process challenge from server and generate response
169
* @param challenge Challenge bytes from server
170
* @return Response bytes to send to server
171
* @throws Exception if processing fails
172
*/
173
public byte[] response(byte[] challenge) throws Exception;
174
175
/**
176
* Dispose of client resources and cleanup
177
*/
178
public void dispose();
179
}
180
181
/**
182
* SASL server implementation for Spark providing authentication verification
183
*/
184
public class SparkSaslServer {
185
/**
186
* Create a SASL server for authentication
187
* @param secretKeyId Application identifier for key lookup
188
* @param secretKeyHolder Provider for authentication credentials
189
* @param encrypt Whether to enable SASL encryption after authentication
190
*/
191
public SparkSaslServer(String secretKeyId, SecretKeyHolder secretKeyHolder, boolean encrypt);
192
193
/**
194
* Check if server is complete (authentication finished)
195
* @return true if authentication is complete
196
*/
197
public boolean isComplete();
198
199
/**
200
* Process response from client and generate challenge
201
* @param response Response bytes from client
202
* @return Challenge bytes to send to client, or null if complete
203
* @throws Exception if processing fails
204
*/
205
public byte[] response(byte[] response) throws Exception;
206
207
/**
208
* Get the authenticated user identifier
209
* @return User ID that was authenticated
210
*/
211
public String getAuthenticatedUser();
212
213
/**
214
* Dispose of server resources and cleanup
215
*/
216
public void dispose();
217
}
218
```
219
220
### SASL RPC Handler
221
222
RPC handler with integrated SASL authentication support, wrapping application RPC handlers with security.
223
224
```java { .api }
225
/**
226
* RPC handler with SASL authentication support
227
* Wraps application RPC handlers to provide authentication before processing
228
*/
229
public class SaslRpcHandler extends RpcHandler {
230
/**
231
* Create a SASL RPC handler
232
* @param conf Transport configuration
233
* @param wrapped Application RPC handler to protect with authentication
234
* @param secretKeyHolder Provider for authentication credentials
235
*/
236
public SaslRpcHandler(TransportConf conf, RpcHandler wrapped, SecretKeyHolder secretKeyHolder);
237
238
/**
239
* Receive and process RPC message with authentication check
240
* @param client Transport client connection
241
* @param message RPC message content
242
* @param callback Callback for sending response
243
*/
244
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
245
246
/**
247
* Get stream manager with authentication support
248
* @return StreamManager that checks authentication before serving streams
249
*/
250
public StreamManager getStreamManager();
251
252
/**
253
* Handle channel activation with authentication setup
254
* @param client Transport client that connected
255
*/
256
public void channelActive(TransportClient client);
257
258
/**
259
* Handle channel deactivation with cleanup
260
* @param client Transport client that disconnected
261
*/
262
public void channelInactive(TransportClient client);
263
}
264
```
265
266
### Custom Authentication Protocol
267
268
Custom authentication protocol support with challenge-response mechanism and encryption setup.
269
270
```java { .api }
271
/**
272
* Client bootstrap for custom authentication protocol
273
*/
274
public class AuthClientBootstrap implements TransportClientBootstrap {
275
/**
276
* Create custom auth client bootstrap
277
* @param conf Transport configuration
278
* @param appId Application identifier
279
* @param secretKeyHolder Provider for authentication credentials
280
*/
281
public AuthClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
282
283
/**
284
* Bootstrap client channel with custom authentication
285
* @param client Transport client instance
286
* @param channel Netty channel to authenticate
287
* @throws RuntimeException if authentication fails
288
*/
289
public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
290
}
291
292
/**
293
* Server bootstrap for custom authentication protocol
294
*/
295
public class AuthServerBootstrap implements TransportServerBootstrap {
296
/**
297
* Create custom auth server bootstrap
298
* @param conf Transport configuration
299
* @param secretKeyHolder Provider for authentication credentials
300
*/
301
public AuthServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
302
303
/**
304
* Bootstrap server channel with custom authentication
305
* @param channel Netty channel to authenticate
306
* @param rpcHandler Original RPC handler to protect
307
* @return RPC handler with authentication support
308
*/
309
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
310
}
311
```
312
313
### Authentication Messages
314
315
Protocol messages for custom authentication handshake with challenge-response mechanism.
316
317
```java { .api }
318
/**
319
* Client challenge message for authentication negotiation
320
*/
321
public class ClientChallenge implements Encodable {
322
/**
323
* Create a client challenge message
324
* @param appId Application identifier
325
* @param kdf Key derivation function name
326
* @param iterations Number of iterations for key derivation
327
* @param cipher Cipher algorithm name
328
* @param keyLength Key length in bits
329
* @param nonce Random nonce for challenge
330
* @param challenge Challenge data
331
*/
332
public ClientChallenge(String appId, String kdf, int iterations, String cipher,
333
int keyLength, byte[] nonce, byte[] challenge);
334
335
/**
336
* Get application identifier
337
* @return Application ID string
338
*/
339
public String appId();
340
341
/**
342
* Get key derivation function name
343
* @return KDF algorithm name
344
*/
345
public String kdf();
346
347
/**
348
* Get key derivation iterations
349
* @return Number of iterations
350
*/
351
public int iterations();
352
353
/**
354
* Get cipher algorithm name
355
* @return Cipher algorithm
356
*/
357
public String cipher();
358
359
/**
360
* Get key length in bits
361
* @return Key length
362
*/
363
public int keyLength();
364
365
/**
366
* Get challenge nonce
367
* @return Nonce bytes
368
*/
369
public byte[] nonce();
370
371
/**
372
* Get challenge data
373
* @return Challenge bytes
374
*/
375
public byte[] challenge();
376
377
/**
378
* Calculate encoded length
379
* @return Length in bytes
380
*/
381
public int encodedLength();
382
383
/**
384
* Encode to ByteBuf
385
* @param buf Output buffer
386
*/
387
public void encode(ByteBuf buf);
388
389
/**
390
* Decode from ByteBuffer
391
* @param buffer Input buffer
392
* @return Decoded ClientChallenge
393
*/
394
public static ClientChallenge decodeMessage(ByteBuffer buffer);
395
}
396
397
/**
398
* Server response message for authentication negotiation
399
*/
400
public class ServerResponse implements Encodable {
401
/**
402
* Create a server response message
403
* @param response Response data to client challenge
404
* @param inputIv Input initialization vector for encryption
405
* @param outputIv Output initialization vector for encryption
406
*/
407
public ServerResponse(byte[] response, byte[] inputIv, byte[] outputIv);
408
409
/**
410
* Get server response data
411
* @return Response bytes
412
*/
413
public byte[] response();
414
415
/**
416
* Get input initialization vector
417
* @return Input IV bytes
418
*/
419
public byte[] inputIv();
420
421
/**
422
* Get output initialization vector
423
* @return Output IV bytes
424
*/
425
public byte[] outputIv();
426
427
/**
428
* Calculate encoded length
429
* @return Length in bytes
430
*/
431
public int encodedLength();
432
433
/**
434
* Encode to ByteBuf
435
* @param buf Output buffer
436
*/
437
public void encode(ByteBuf buf);
438
439
/**
440
* Decode from ByteBuffer
441
* @param buffer Input buffer
442
* @return Decoded ServerResponse
443
*/
444
public static ServerResponse decodeMessage(ByteBuffer buffer);
445
}
446
```
447
448
### Transport Cipher
449
450
Encryption management for authenticated channels, providing transparent encryption/decryption after authentication.
451
452
```java { .api }
453
/**
454
* Manages encryption/decryption for transport channels after authentication
455
*/
456
public class TransportCipher {
457
/**
458
* Create a transport cipher with encryption parameters
459
* @param cryptoConf Crypto configuration properties
460
* @param cipher Cipher transformation string
461
* @param inKey Key for decrypting incoming data
462
* @param outKey Key for encrypting outgoing data
463
* @param inIv Initialization vector for incoming data
464
* @param outIv Initialization vector for outgoing data
465
* @throws IOException if cipher setup fails
466
*/
467
public TransportCipher(Properties cryptoConf, String cipher, byte[] inKey, byte[] outKey,
468
byte[] inIv, byte[] outIv) throws IOException;
469
470
/**
471
* Get the cipher transformation string
472
* @return Cipher transformation (e.g., "AES/CTR/NoPadding")
473
*/
474
public String getCipherTransformation();
475
476
/**
477
* Get input initialization vector
478
* @return Input IV bytes
479
*/
480
public byte[] getInputIv();
481
482
/**
483
* Get output initialization vector
484
* @return Output IV bytes
485
*/
486
public byte[] getOutputIv();
487
488
/**
489
* Add encryption handlers to the Netty channel pipeline
490
* @param ch Channel to add encryption to
491
* @throws IOException if handlers cannot be added
492
*/
493
public void addToChannel(Channel ch) throws IOException;
494
}
495
```
496
497
**Usage Examples:**
498
499
```java
500
// Setting up encrypted transport after custom auth
501
Properties cryptoConf = new Properties();
502
cryptoConf.setProperty("spark.network.crypto.keyLength", "128");
503
cryptoConf.setProperty("spark.network.crypto.keyFactoryAlgorithm", "PBKDF2WithHmacSHA1");
504
505
TransportCipher cipher = new TransportCipher(
506
cryptoConf,
507
"AES/CTR/NoPadding",
508
inKey, outKey,
509
inIv, outIv
510
);
511
512
// Add encryption to channel after authentication
513
cipher.addToChannel(channel);
514
```
515
516
## Authentication Usage Patterns
517
518
### Complete SASL Setup
519
520
```java
521
// Server setup with SASL
522
SecretKeyHolder secretKeyHolder = new MySecretKeyHolder();
523
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
524
new SaslServerBootstrap(conf, secretKeyHolder)
525
);
526
527
TransportContext context = new TransportContext(conf, new MyRpcHandler());
528
TransportServer server = context.createServer(7337, serverBootstraps);
529
530
// Client setup with SASL
531
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
532
new SaslClientBootstrap(conf, "my-app", secretKeyHolder)
533
);
534
535
TransportClientFactory factory = context.createClientFactory(clientBootstraps);
536
TransportClient client = factory.createClient("localhost", 7337);
537
538
// Client is now authenticated and can send RPCs
539
client.sendRpc(message, callback);
540
```
541
542
### Custom Authentication Flow
543
544
```java
545
// Custom auth server
546
List<TransportServerBootstrap> bootstraps = Arrays.asList(
547
new AuthServerBootstrap(conf, secretKeyHolder)
548
);
549
TransportServer server = context.createServer(8080, bootstraps);
550
551
// Custom auth client
552
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
553
new AuthClientBootstrap(conf, "custom-app", secretKeyHolder)
554
);
555
TransportClientFactory factory = context.createClientFactory(clientBootstraps);
556
TransportClient client = factory.createClient("localhost", 8080);
557
```
558
559
### Secure Configuration
560
561
```java
562
// Configure authentication and encryption settings
563
Map<String, String> configMap = new HashMap<>();
564
configMap.put("spark.authenticate", "true");
565
configMap.put("spark.network.sasl.serverAlwaysEncrypt", "true");
566
configMap.put("spark.network.crypto.enabled", "true");
567
configMap.put("spark.network.crypto.keyLength", "256");
568
569
TransportConf conf = new TransportConf("secure-app", new MapConfigProvider(configMap));
570
```