Apache Pulsar Java client library for distributed pub-sub messaging platform
—
Authentication mechanisms, TLS configuration, message encryption, and comprehensive security features for secure messaging in Pulsar clusters.
Base interface for authentication mechanisms with support for various authentication methods.
/**
* Base interface for authentication mechanisms
* Supports TLS, token-based, OAuth, and custom authentication methods
*/
interface Authentication extends Serializable, Closeable {
/** Get authentication method name */
String getAuthMethodName();
/** Get authentication data provider */
AuthenticationDataProvider getAuthData() throws PulsarClientException;
/** Configure authentication with encoded parameter string */
void configure(String encodedAuthParamString);
/** Configure authentication with parameter map */
void configure(Map<String, String> authParams);
/** Start authentication process */
void start() throws PulsarClientException;
/** Close authentication resources */
void close() throws IOException;
}Factory class for creating authentication instances with built-in support for common authentication methods.
/**
* Factory for creating authentication instances
* Provides convenient methods for common authentication types
*/
class AuthenticationFactory {
/** Create TLS authentication using certificate and key files */
static Authentication TLS(String certFilePath, String keyFilePath);
/** Create token-based authentication with static token */
static Authentication token(String token);
/** Create token-based authentication with token supplier */
static Authentication token(Supplier<String> tokenSupplier);
/** Create OAuth 2.0 authentication */
static Authentication oauth2(String issuerUrl, String privateKeyUrl, String audience);
/** Create OAuth 2.0 authentication with credentials URL */
static Authentication oauth2(String issuerUrl, String credentialsUrl, String audience, String scope);
/** Create SASL authentication */
static Authentication sasl(String saslJaasClientSectionName, String serverType);
/** Create custom authentication using plugin class name and parameters */
static Authentication create(String authPluginClassName, String encodedAuthParamString) throws UnsupportedAuthenticationException;
/** Create custom authentication using plugin class name and parameter map */
static Authentication create(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;
}Authentication Examples:
import org.apache.pulsar.client.api.*;
// TLS authentication
Authentication tlsAuth = AuthenticationFactory.TLS(
"/path/to/client.cert.pem",
"/path/to/client.key.pem"
);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker:6651")
.authentication(tlsAuth)
.build();
// Token authentication
Authentication tokenAuth = AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9...");
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://broker:6650")
.authentication(tokenAuth)
.build();
// Token authentication with supplier (for token refresh)
Authentication tokenAuth = AuthenticationFactory.token(() -> {
return refreshAndGetNewToken();
});
// OAuth 2.0 authentication
Authentication oauthAuth = AuthenticationFactory.oauth2(
"https://issuer.example.com",
"/path/to/private.key",
"audience-value"
);
// Custom authentication
Map<String, String> authParams = new HashMap<>();
authParams.put("username", "myuser");
authParams.put("password", "mypassword");
Authentication customAuth = AuthenticationFactory.create(
"com.example.MyAuthPlugin",
authParams
);Interface for providing authentication data for different transport protocols.
/**
* Provider for authentication data across different transport protocols
* Supports TLS, HTTP, and command-based authentication data
*/
interface AuthenticationDataProvider {
/** Check if TLS authentication data is available */
boolean hasDataForTls();
/** Get TLS certificate file path */
String getTlsCertificateFilePath();
/** Get TLS private key file path */
String getTlsPrivateKeyFilePath();
/** Check if HTTP authentication data is available */
boolean hasDataForHttp();
/** Get HTTP headers for authentication */
Set<Map.Entry<String, String>> getHttpHeaders() throws PulsarClientException;
/** Check if command authentication data is available */
boolean hasDataFromCommand();
/** Get command data for authentication */
String getCommandData();
/** Check if token authentication data is available */
boolean hasDataForToken();
/** Get authentication token */
String getToken() throws PulsarClientException;
}Comprehensive message encryption support with key management and crypto operations.
/**
* Interface for reading encryption keys
* Supports both public and private key retrieval with metadata
*/
interface CryptoKeyReader {
/** Get public key for encryption */
EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta);
/** Get private key for decryption */
EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta);
}
/**
* Container for encryption key information
*/
class EncryptionKeyInfo {
/** Create encryption key info */
EncryptionKeyInfo(byte[] key, Map<String, String> metadata);
/** Get key bytes */
byte[] getKey();
/** Get key metadata */
Map<String, String> getMetadata();
}
/**
* Interface for message encryption/decryption operations
*/
interface MessageCrypto {
/** Encrypt message payload */
boolean encrypt(Set<String> encKeys, CryptoKeyReader keyReader,
MessageMetadata.Builder msgMetadata, ByteBuf payload);
/** Decrypt message payload */
boolean decrypt(MessageMetadata msgMetadata, ByteBuf payload,
CryptoKeyReader keyReader);
/** Get decrypted data */
ByteBuf getDecryptedData();
/** Release resources */
void close();
}Encryption Examples:
// Custom crypto key reader implementation
class FileCryptoKeyReader implements CryptoKeyReader {
private String publicKeyPath;
private String privateKeyPath;
public FileCryptoKeyReader(String publicKeyPath, String privateKeyPath) {
this.publicKeyPath = publicKeyPath;
this.privateKeyPath = privateKeyPath;
}
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
byte[] keyBytes = readKeyFromFile(publicKeyPath + "/" + keyName + ".pub");
return new EncryptionKeyInfo(keyBytes, keyMeta);
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
byte[] keyBytes = readKeyFromFile(privateKeyPath + "/" + keyName + ".key");
return new EncryptionKeyInfo(keyBytes, keyMeta);
}
}
// Producer with encryption
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("encrypted-topic")
.addEncryptionKey("my-app-key")
.cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))
.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
.create();
// Consumer with decryption
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("encrypted-topic")
.subscriptionName("encrypted-sub")
.cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))
.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
.subscribe();
// Default crypto key reader (simplified)
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("encrypted-topic")
.addEncryptionKey("my-app-key")
.defaultCryptoKeyReader("/path/to/public/key")
.create();Comprehensive TLS configuration for secure broker connections.
/**
* KeyStore parameters for TLS configuration
*/
class KeyStoreParams {
/** Create KeyStore parameters */
KeyStoreParams(String keyStorePath, String keyStorePassword, String keyStoreType);
/** Get key store path */
String getKeyStorePath();
/** Get key store password */
String getKeyStorePassword();
/** Get key store type */
String getKeyStoreType();
}TLS Configuration Examples:
// File-based TLS configuration
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker:6651")
.tlsCertificateFilePath("/path/to/client.cert.pem")
.tlsKeyFilePath("/path/to/client.key.pem")
.tlsTrustCertsFilePath("/path/to/ca.cert.pem")
.enableTlsHostnameVerification(true)
.build();
// KeyStore-based TLS configuration
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker:6651")
.useKeyStoreTls(true)
.tlsKeyStoreType("JKS")
.tlsKeyStorePath("/path/to/client.keystore")
.tlsKeyStorePassword("keystorePassword")
.tlsTrustStoreType("JKS")
.tlsTrustStorePath("/path/to/truststore.jks")
.tlsTrustStorePassword("truststorePassword")
.build();
// TLS with custom cipher suites and protocols
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker:6651")
.tlsCiphers(Set.of("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"))
.tlsProtocols(Set.of("TLSv1.3", "TLSv1.2"))
.sslProvider("Conscrypt")
.build();
// TLS with insecure connection (for testing)
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker:6651")
.allowTlsInsecureConnection(true)
.enableTlsHostnameVerification(false)
.build();Support for fine-grained access control and authorization policies.
/**
* Authorization data provider for role-based access control
*/
interface AuthorizationDataProvider {
/** Get subject (user/role) for authorization */
String getSubject();
/** Get additional authorization properties */
Map<String, String> getAuthorizationProperties();
}
/**
* Interface for encoded authentication parameter support
*/
interface EncodedAuthenticationParameterSupport {
/** Check if encoded parameters are supported */
default boolean supportsEncodedAuthParamString() {
return false;
}
}Specialized support for JSON Web Token (JWT) authentication.
/**
* JWT token authentication support
* Handles token validation, refresh, and expiration
*/
class JWTAuthenticationProvider implements Authentication {
/** Create JWT authentication with static token */
static Authentication createWithToken(String token);
/** Create JWT authentication with token supplier */
static Authentication createWithTokenSupplier(Supplier<String> tokenSupplier);
/** Create JWT authentication with token file path */
static Authentication createWithTokenFile(String tokenFilePath);
}JWT Examples:
// JWT with static token
Authentication jwtAuth = AuthenticationFactory.token(
"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
);
// JWT with token file (automatically reloaded)
Authentication jwtAuth = AuthenticationFactory.token(() -> {
try {
return Files.readString(Paths.get("/path/to/token.jwt"));
} catch (IOException e) {
throw new RuntimeException("Failed to read token", e);
}
});
// JWT with automatic refresh
Authentication jwtAuth = AuthenticationFactory.token(() -> {
// Implement token refresh logic
return getRefreshedToken();
});
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://broker:6650")
.authentication(jwtAuth)
.build();Comprehensive exception hierarchy for security-related operations.
/**
* Authentication-related exceptions
*/
class PulsarClientException {
/** Authentication failed */
static class AuthenticationException extends PulsarClientException {
AuthenticationException(String msg);
AuthenticationException(Throwable t);
}
/** Authorization failed */
static class AuthorizationException extends PulsarClientException {
AuthorizationException(String msg);
}
/** Unsupported authentication method */
static class UnsupportedAuthenticationException extends PulsarClientException {
UnsupportedAuthenticationException(String msg, Throwable t);
}
/** Error getting authentication data */
static class GettingAuthenticationDataException extends PulsarClientException {
GettingAuthenticationDataException(String msg);
GettingAuthenticationDataException(Throwable t);
}
/** Encryption/decryption errors */
static class CryptoException extends PulsarClientException {
CryptoException(String msg);
}
}Configuration helpers for implementing security best practices.
/**
* Security configuration utilities
*/
class SecurityUtils {
/** Validate TLS configuration */
static boolean validateTlsConfiguration(ClientBuilder builder);
/** Check if connection is secure */
static boolean isSecureConnection(String serviceUrl);
/** Validate authentication configuration */
static boolean validateAuthentication(Authentication auth);
}Security Best Practices Examples:
// Production-ready secure client configuration
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker:6651")
.authentication(AuthenticationFactory.token(() -> getTokenFromSecureStore()))
.tlsCertificateFilePath("/secure/path/client.cert.pem")
.tlsKeyFilePath("/secure/path/client.key.pem")
.tlsTrustCertsFilePath("/secure/path/ca-bundle.cert.pem")
.enableTlsHostnameVerification(true)
.tlsProtocols(Set.of("TLSv1.3"))
.build();
// Encrypted producer with secure key management
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("sensitive-data")
.addEncryptionKey("data-encryption-key")
.cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())
.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
.create();
// Consumer with decryption and access logging
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("sensitive-data")
.subscriptionName("secure-processor")
.cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())
.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
.intercept(new SecurityAuditInterceptor())
.subscribe();enum ProducerCryptoFailureAction {
/** Fail the send operation */
FAIL,
/** Send message unencrypted */
SEND
}
enum ConsumerCryptoFailureAction {
/** Fail the receive operation */
FAIL,
/** Discard the message */
DISCARD,
/** Consume message as-is */
CONSUME
}
interface DummyCryptoKeyReaderImpl extends CryptoKeyReader {
/** Dummy implementation that returns null keys */
static CryptoKeyReader INSTANCE = new DummyCryptoKeyReaderImpl();
}
class KeyStoreParams {
String keyStorePath;
String keyStorePassword;
String keyStoreType;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-client