0
# Authentication and Security
1
2
Authentication mechanisms, TLS configuration, message encryption, and comprehensive security features for secure messaging in Pulsar clusters.
3
4
## Capabilities
5
6
### Authentication Interface
7
8
Base interface for authentication mechanisms with support for various authentication methods.
9
10
```java { .api }
11
/**
12
* Base interface for authentication mechanisms
13
* Supports TLS, token-based, OAuth, and custom authentication methods
14
*/
15
interface Authentication extends Serializable, Closeable {
16
/** Get authentication method name */
17
String getAuthMethodName();
18
19
/** Get authentication data provider */
20
AuthenticationDataProvider getAuthData() throws PulsarClientException;
21
22
/** Configure authentication with encoded parameter string */
23
void configure(String encodedAuthParamString);
24
25
/** Configure authentication with parameter map */
26
void configure(Map<String, String> authParams);
27
28
/** Start authentication process */
29
void start() throws PulsarClientException;
30
31
/** Close authentication resources */
32
void close() throws IOException;
33
}
34
```
35
36
### AuthenticationFactory
37
38
Factory class for creating authentication instances with built-in support for common authentication methods.
39
40
```java { .api }
41
/**
42
* Factory for creating authentication instances
43
* Provides convenient methods for common authentication types
44
*/
45
class AuthenticationFactory {
46
/** Create TLS authentication using certificate and key files */
47
static Authentication TLS(String certFilePath, String keyFilePath);
48
49
/** Create token-based authentication with static token */
50
static Authentication token(String token);
51
52
/** Create token-based authentication with token supplier */
53
static Authentication token(Supplier<String> tokenSupplier);
54
55
/** Create OAuth 2.0 authentication */
56
static Authentication oauth2(String issuerUrl, String privateKeyUrl, String audience);
57
58
/** Create OAuth 2.0 authentication with credentials URL */
59
static Authentication oauth2(String issuerUrl, String credentialsUrl, String audience, String scope);
60
61
/** Create SASL authentication */
62
static Authentication sasl(String saslJaasClientSectionName, String serverType);
63
64
/** Create custom authentication using plugin class name and parameters */
65
static Authentication create(String authPluginClassName, String encodedAuthParamString) throws UnsupportedAuthenticationException;
66
67
/** Create custom authentication using plugin class name and parameter map */
68
static Authentication create(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;
69
}
70
```
71
72
**Authentication Examples:**
73
74
```java
75
import org.apache.pulsar.client.api.*;
76
77
// TLS authentication
78
Authentication tlsAuth = AuthenticationFactory.TLS(
79
"/path/to/client.cert.pem",
80
"/path/to/client.key.pem"
81
);
82
83
PulsarClient client = PulsarClient.builder()
84
.serviceUrl("pulsar+ssl://broker:6651")
85
.authentication(tlsAuth)
86
.build();
87
88
// Token authentication
89
Authentication tokenAuth = AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9...");
90
91
PulsarClient client = PulsarClient.builder()
92
.serviceUrl("pulsar://broker:6650")
93
.authentication(tokenAuth)
94
.build();
95
96
// Token authentication with supplier (for token refresh)
97
Authentication tokenAuth = AuthenticationFactory.token(() -> {
98
return refreshAndGetNewToken();
99
});
100
101
// OAuth 2.0 authentication
102
Authentication oauthAuth = AuthenticationFactory.oauth2(
103
"https://issuer.example.com",
104
"/path/to/private.key",
105
"audience-value"
106
);
107
108
// Custom authentication
109
Map<String, String> authParams = new HashMap<>();
110
authParams.put("username", "myuser");
111
authParams.put("password", "mypassword");
112
113
Authentication customAuth = AuthenticationFactory.create(
114
"com.example.MyAuthPlugin",
115
authParams
116
);
117
```
118
119
### AuthenticationDataProvider
120
121
Interface for providing authentication data for different transport protocols.
122
123
```java { .api }
124
/**
125
* Provider for authentication data across different transport protocols
126
* Supports TLS, HTTP, and command-based authentication data
127
*/
128
interface AuthenticationDataProvider {
129
/** Check if TLS authentication data is available */
130
boolean hasDataForTls();
131
132
/** Get TLS certificate file path */
133
String getTlsCertificateFilePath();
134
135
/** Get TLS private key file path */
136
String getTlsPrivateKeyFilePath();
137
138
/** Check if HTTP authentication data is available */
139
boolean hasDataForHttp();
140
141
/** Get HTTP headers for authentication */
142
Set<Map.Entry<String, String>> getHttpHeaders() throws PulsarClientException;
143
144
/** Check if command authentication data is available */
145
boolean hasDataFromCommand();
146
147
/** Get command data for authentication */
148
String getCommandData();
149
150
/** Check if token authentication data is available */
151
boolean hasDataForToken();
152
153
/** Get authentication token */
154
String getToken() throws PulsarClientException;
155
}
156
```
157
158
### Message Encryption
159
160
Comprehensive message encryption support with key management and crypto operations.
161
162
```java { .api }
163
/**
164
* Interface for reading encryption keys
165
* Supports both public and private key retrieval with metadata
166
*/
167
interface CryptoKeyReader {
168
/** Get public key for encryption */
169
EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta);
170
171
/** Get private key for decryption */
172
EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta);
173
}
174
175
/**
176
* Container for encryption key information
177
*/
178
class EncryptionKeyInfo {
179
/** Create encryption key info */
180
EncryptionKeyInfo(byte[] key, Map<String, String> metadata);
181
182
/** Get key bytes */
183
byte[] getKey();
184
185
/** Get key metadata */
186
Map<String, String> getMetadata();
187
}
188
189
/**
190
* Interface for message encryption/decryption operations
191
*/
192
interface MessageCrypto {
193
/** Encrypt message payload */
194
boolean encrypt(Set<String> encKeys, CryptoKeyReader keyReader,
195
MessageMetadata.Builder msgMetadata, ByteBuf payload);
196
197
/** Decrypt message payload */
198
boolean decrypt(MessageMetadata msgMetadata, ByteBuf payload,
199
CryptoKeyReader keyReader);
200
201
/** Get decrypted data */
202
ByteBuf getDecryptedData();
203
204
/** Release resources */
205
void close();
206
}
207
```
208
209
**Encryption Examples:**
210
211
```java
212
// Custom crypto key reader implementation
213
class FileCryptoKeyReader implements CryptoKeyReader {
214
private String publicKeyPath;
215
private String privateKeyPath;
216
217
public FileCryptoKeyReader(String publicKeyPath, String privateKeyPath) {
218
this.publicKeyPath = publicKeyPath;
219
this.privateKeyPath = privateKeyPath;
220
}
221
222
@Override
223
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
224
byte[] keyBytes = readKeyFromFile(publicKeyPath + "/" + keyName + ".pub");
225
return new EncryptionKeyInfo(keyBytes, keyMeta);
226
}
227
228
@Override
229
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
230
byte[] keyBytes = readKeyFromFile(privateKeyPath + "/" + keyName + ".key");
231
return new EncryptionKeyInfo(keyBytes, keyMeta);
232
}
233
}
234
235
// Producer with encryption
236
Producer<String> producer = client.newProducer(Schema.STRING)
237
.topic("encrypted-topic")
238
.addEncryptionKey("my-app-key")
239
.cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))
240
.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
241
.create();
242
243
// Consumer with decryption
244
Consumer<String> consumer = client.newConsumer(Schema.STRING)
245
.topic("encrypted-topic")
246
.subscriptionName("encrypted-sub")
247
.cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))
248
.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
249
.subscribe();
250
251
// Default crypto key reader (simplified)
252
Producer<String> producer = client.newProducer(Schema.STRING)
253
.topic("encrypted-topic")
254
.addEncryptionKey("my-app-key")
255
.defaultCryptoKeyReader("/path/to/public/key")
256
.create();
257
```
258
259
### TLS Configuration
260
261
Comprehensive TLS configuration for secure broker connections.
262
263
```java { .api }
264
/**
265
* KeyStore parameters for TLS configuration
266
*/
267
class KeyStoreParams {
268
/** Create KeyStore parameters */
269
KeyStoreParams(String keyStorePath, String keyStorePassword, String keyStoreType);
270
271
/** Get key store path */
272
String getKeyStorePath();
273
274
/** Get key store password */
275
String getKeyStorePassword();
276
277
/** Get key store type */
278
String getKeyStoreType();
279
}
280
```
281
282
**TLS Configuration Examples:**
283
284
```java
285
// File-based TLS configuration
286
PulsarClient client = PulsarClient.builder()
287
.serviceUrl("pulsar+ssl://broker:6651")
288
.tlsCertificateFilePath("/path/to/client.cert.pem")
289
.tlsKeyFilePath("/path/to/client.key.pem")
290
.tlsTrustCertsFilePath("/path/to/ca.cert.pem")
291
.enableTlsHostnameVerification(true)
292
.build();
293
294
// KeyStore-based TLS configuration
295
PulsarClient client = PulsarClient.builder()
296
.serviceUrl("pulsar+ssl://broker:6651")
297
.useKeyStoreTls(true)
298
.tlsKeyStoreType("JKS")
299
.tlsKeyStorePath("/path/to/client.keystore")
300
.tlsKeyStorePassword("keystorePassword")
301
.tlsTrustStoreType("JKS")
302
.tlsTrustStorePath("/path/to/truststore.jks")
303
.tlsTrustStorePassword("truststorePassword")
304
.build();
305
306
// TLS with custom cipher suites and protocols
307
PulsarClient client = PulsarClient.builder()
308
.serviceUrl("pulsar+ssl://broker:6651")
309
.tlsCiphers(Set.of("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"))
310
.tlsProtocols(Set.of("TLSv1.3", "TLSv1.2"))
311
.sslProvider("Conscrypt")
312
.build();
313
314
// TLS with insecure connection (for testing)
315
PulsarClient client = PulsarClient.builder()
316
.serviceUrl("pulsar+ssl://broker:6651")
317
.allowTlsInsecureConnection(true)
318
.enableTlsHostnameVerification(false)
319
.build();
320
```
321
322
### Authorization and Access Control
323
324
Support for fine-grained access control and authorization policies.
325
326
```java { .api }
327
/**
328
* Authorization data provider for role-based access control
329
*/
330
interface AuthorizationDataProvider {
331
/** Get subject (user/role) for authorization */
332
String getSubject();
333
334
/** Get additional authorization properties */
335
Map<String, String> getAuthorizationProperties();
336
}
337
338
/**
339
* Interface for encoded authentication parameter support
340
*/
341
interface EncodedAuthenticationParameterSupport {
342
/** Check if encoded parameters are supported */
343
default boolean supportsEncodedAuthParamString() {
344
return false;
345
}
346
}
347
```
348
349
### JWT Token Support
350
351
Specialized support for JSON Web Token (JWT) authentication.
352
353
```java { .api }
354
/**
355
* JWT token authentication support
356
* Handles token validation, refresh, and expiration
357
*/
358
class JWTAuthenticationProvider implements Authentication {
359
/** Create JWT authentication with static token */
360
static Authentication createWithToken(String token);
361
362
/** Create JWT authentication with token supplier */
363
static Authentication createWithTokenSupplier(Supplier<String> tokenSupplier);
364
365
/** Create JWT authentication with token file path */
366
static Authentication createWithTokenFile(String tokenFilePath);
367
}
368
```
369
370
**JWT Examples:**
371
372
```java
373
// JWT with static token
374
Authentication jwtAuth = AuthenticationFactory.token(
375
"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
376
);
377
378
// JWT with token file (automatically reloaded)
379
Authentication jwtAuth = AuthenticationFactory.token(() -> {
380
try {
381
return Files.readString(Paths.get("/path/to/token.jwt"));
382
} catch (IOException e) {
383
throw new RuntimeException("Failed to read token", e);
384
}
385
});
386
387
// JWT with automatic refresh
388
Authentication jwtAuth = AuthenticationFactory.token(() -> {
389
// Implement token refresh logic
390
return getRefreshedToken();
391
});
392
393
PulsarClient client = PulsarClient.builder()
394
.serviceUrl("pulsar://broker:6650")
395
.authentication(jwtAuth)
396
.build();
397
```
398
399
### Security Exception Handling
400
401
Comprehensive exception hierarchy for security-related operations.
402
403
```java { .api }
404
/**
405
* Authentication-related exceptions
406
*/
407
class PulsarClientException {
408
/** Authentication failed */
409
static class AuthenticationException extends PulsarClientException {
410
AuthenticationException(String msg);
411
AuthenticationException(Throwable t);
412
}
413
414
/** Authorization failed */
415
static class AuthorizationException extends PulsarClientException {
416
AuthorizationException(String msg);
417
}
418
419
/** Unsupported authentication method */
420
static class UnsupportedAuthenticationException extends PulsarClientException {
421
UnsupportedAuthenticationException(String msg, Throwable t);
422
}
423
424
/** Error getting authentication data */
425
static class GettingAuthenticationDataException extends PulsarClientException {
426
GettingAuthenticationDataException(String msg);
427
GettingAuthenticationDataException(Throwable t);
428
}
429
430
/** Encryption/decryption errors */
431
static class CryptoException extends PulsarClientException {
432
CryptoException(String msg);
433
}
434
}
435
```
436
437
### Security Best Practices Configuration
438
439
Configuration helpers for implementing security best practices.
440
441
```java { .api }
442
/**
443
* Security configuration utilities
444
*/
445
class SecurityUtils {
446
/** Validate TLS configuration */
447
static boolean validateTlsConfiguration(ClientBuilder builder);
448
449
/** Check if connection is secure */
450
static boolean isSecureConnection(String serviceUrl);
451
452
/** Validate authentication configuration */
453
static boolean validateAuthentication(Authentication auth);
454
}
455
```
456
457
**Security Best Practices Examples:**
458
459
```java
460
// Production-ready secure client configuration
461
PulsarClient client = PulsarClient.builder()
462
.serviceUrl("pulsar+ssl://broker:6651")
463
.authentication(AuthenticationFactory.token(() -> getTokenFromSecureStore()))
464
.tlsCertificateFilePath("/secure/path/client.cert.pem")
465
.tlsKeyFilePath("/secure/path/client.key.pem")
466
.tlsTrustCertsFilePath("/secure/path/ca-bundle.cert.pem")
467
.enableTlsHostnameVerification(true)
468
.tlsProtocols(Set.of("TLSv1.3"))
469
.build();
470
471
// Encrypted producer with secure key management
472
Producer<String> producer = client.newProducer(Schema.STRING)
473
.topic("sensitive-data")
474
.addEncryptionKey("data-encryption-key")
475
.cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())
476
.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
477
.create();
478
479
// Consumer with decryption and access logging
480
Consumer<String> consumer = client.newConsumer(Schema.STRING)
481
.topic("sensitive-data")
482
.subscriptionName("secure-processor")
483
.cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())
484
.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
485
.intercept(new SecurityAuditInterceptor())
486
.subscribe();
487
```
488
489
## Supporting Types and Enums
490
491
```java { .api }
492
enum ProducerCryptoFailureAction {
493
/** Fail the send operation */
494
FAIL,
495
/** Send message unencrypted */
496
SEND
497
}
498
499
enum ConsumerCryptoFailureAction {
500
/** Fail the receive operation */
501
FAIL,
502
/** Discard the message */
503
DISCARD,
504
/** Consume message as-is */
505
CONSUME
506
}
507
508
interface DummyCryptoKeyReaderImpl extends CryptoKeyReader {
509
/** Dummy implementation that returns null keys */
510
static CryptoKeyReader INSTANCE = new DummyCryptoKeyReaderImpl();
511
}
512
513
class KeyStoreParams {
514
String keyStorePath;
515
String keyStorePassword;
516
String keyStoreType;
517
}
518
```