CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-client

Apache Pulsar Java client library for distributed pub-sub messaging platform

Pending
Overview
Eval results
Files

authentication-security.mddocs/

Authentication and Security

Authentication mechanisms, TLS configuration, message encryption, and comprehensive security features for secure messaging in Pulsar clusters.

Capabilities

Authentication Interface

Base interface for authentication mechanisms with support for various authentication methods.

/**
 * Base interface for authentication mechanisms
 * Supports TLS, token-based, OAuth, and custom authentication methods
 */
interface Authentication extends Serializable, Closeable {
    /** Get authentication method name */
    String getAuthMethodName();
    
    /** Get authentication data provider */
    AuthenticationDataProvider getAuthData() throws PulsarClientException;
    
    /** Configure authentication with encoded parameter string */
    void configure(String encodedAuthParamString);
    
    /** Configure authentication with parameter map */
    void configure(Map<String, String> authParams);
    
    /** Start authentication process */
    void start() throws PulsarClientException;
    
    /** Close authentication resources */
    void close() throws IOException;
}

AuthenticationFactory

Factory class for creating authentication instances with built-in support for common authentication methods.

/**
 * Factory for creating authentication instances
 * Provides convenient methods for common authentication types
 */
class AuthenticationFactory {
    /** Create TLS authentication using certificate and key files */
    static Authentication TLS(String certFilePath, String keyFilePath);
    
    /** Create token-based authentication with static token */
    static Authentication token(String token);
    
    /** Create token-based authentication with token supplier */
    static Authentication token(Supplier<String> tokenSupplier);
    
    /** Create OAuth 2.0 authentication */
    static Authentication oauth2(String issuerUrl, String privateKeyUrl, String audience);
    
    /** Create OAuth 2.0 authentication with credentials URL */
    static Authentication oauth2(String issuerUrl, String credentialsUrl, String audience, String scope);
    
    /** Create SASL authentication */
    static Authentication sasl(String saslJaasClientSectionName, String serverType);
    
    /** Create custom authentication using plugin class name and parameters */
    static Authentication create(String authPluginClassName, String encodedAuthParamString) throws UnsupportedAuthenticationException;
    
    /** Create custom authentication using plugin class name and parameter map */
    static Authentication create(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;
}

Authentication Examples:

import org.apache.pulsar.client.api.*;

// TLS authentication
Authentication tlsAuth = AuthenticationFactory.TLS(
    "/path/to/client.cert.pem",
    "/path/to/client.key.pem"
);

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker:6651")
    .authentication(tlsAuth)
    .build();

// Token authentication
Authentication tokenAuth = AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9...");

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://broker:6650")
    .authentication(tokenAuth)
    .build();

// Token authentication with supplier (for token refresh)
Authentication tokenAuth = AuthenticationFactory.token(() -> {
    return refreshAndGetNewToken();
});

// OAuth 2.0 authentication
Authentication oauthAuth = AuthenticationFactory.oauth2(
    "https://issuer.example.com",
    "/path/to/private.key",
    "audience-value"
);

// Custom authentication
Map<String, String> authParams = new HashMap<>();
authParams.put("username", "myuser");
authParams.put("password", "mypassword");

Authentication customAuth = AuthenticationFactory.create(
    "com.example.MyAuthPlugin",
    authParams
);

AuthenticationDataProvider

Interface for providing authentication data for different transport protocols.

/**
 * Provider for authentication data across different transport protocols
 * Supports TLS, HTTP, and command-based authentication data
 */
interface AuthenticationDataProvider {
    /** Check if TLS authentication data is available */
    boolean hasDataForTls();
    
    /** Get TLS certificate file path */
    String getTlsCertificateFilePath();
    
    /** Get TLS private key file path */
    String getTlsPrivateKeyFilePath();
    
    /** Check if HTTP authentication data is available */
    boolean hasDataForHttp();
    
    /** Get HTTP headers for authentication */
    Set<Map.Entry<String, String>> getHttpHeaders() throws PulsarClientException;
    
    /** Check if command authentication data is available */
    boolean hasDataFromCommand();
    
    /** Get command data for authentication */
    String getCommandData();
    
    /** Check if token authentication data is available */
    boolean hasDataForToken();
    
    /** Get authentication token */
    String getToken() throws PulsarClientException;
}

Message Encryption

Comprehensive message encryption support with key management and crypto operations.

/**
 * Interface for reading encryption keys
 * Supports both public and private key retrieval with metadata
 */
interface CryptoKeyReader {
    /** Get public key for encryption */
    EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta);
    
    /** Get private key for decryption */
    EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta);
}

/**
 * Container for encryption key information
 */
class EncryptionKeyInfo {
    /** Create encryption key info */
    EncryptionKeyInfo(byte[] key, Map<String, String> metadata);
    
    /** Get key bytes */
    byte[] getKey();
    
    /** Get key metadata */
    Map<String, String> getMetadata();
}

/**
 * Interface for message encryption/decryption operations
 */
interface MessageCrypto {
    /** Encrypt message payload */
    boolean encrypt(Set<String> encKeys, CryptoKeyReader keyReader, 
                   MessageMetadata.Builder msgMetadata, ByteBuf payload);
    
    /** Decrypt message payload */
    boolean decrypt(MessageMetadata msgMetadata, ByteBuf payload, 
                   CryptoKeyReader keyReader);
    
    /** Get decrypted data */
    ByteBuf getDecryptedData();
    
    /** Release resources */
    void close();
}

Encryption Examples:

// Custom crypto key reader implementation
class FileCryptoKeyReader implements CryptoKeyReader {
    private String publicKeyPath;
    private String privateKeyPath;
    
    public FileCryptoKeyReader(String publicKeyPath, String privateKeyPath) {
        this.publicKeyPath = publicKeyPath;
        this.privateKeyPath = privateKeyPath;
    }
    
    @Override
    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
        byte[] keyBytes = readKeyFromFile(publicKeyPath + "/" + keyName + ".pub");
        return new EncryptionKeyInfo(keyBytes, keyMeta);
    }
    
    @Override
    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
        byte[] keyBytes = readKeyFromFile(privateKeyPath + "/" + keyName + ".key");
        return new EncryptionKeyInfo(keyBytes, keyMeta);
    }
}

// Producer with encryption
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("encrypted-topic")
    .addEncryptionKey("my-app-key")
    .cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))
    .cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
    .create();

// Consumer with decryption
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("encrypted-topic")
    .subscriptionName("encrypted-sub")
    .cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))
    .cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
    .subscribe();

// Default crypto key reader (simplified)
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("encrypted-topic")
    .addEncryptionKey("my-app-key")
    .defaultCryptoKeyReader("/path/to/public/key")
    .create();

TLS Configuration

Comprehensive TLS configuration for secure broker connections.

/**
 * KeyStore parameters for TLS configuration
 */
class KeyStoreParams {
    /** Create KeyStore parameters */
    KeyStoreParams(String keyStorePath, String keyStorePassword, String keyStoreType);
    
    /** Get key store path */
    String getKeyStorePath();
    
    /** Get key store password */
    String getKeyStorePassword();
    
    /** Get key store type */
    String getKeyStoreType();
}

TLS Configuration Examples:

// File-based TLS configuration
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker:6651")
    .tlsCertificateFilePath("/path/to/client.cert.pem")
    .tlsKeyFilePath("/path/to/client.key.pem")
    .tlsTrustCertsFilePath("/path/to/ca.cert.pem")
    .enableTlsHostnameVerification(true)
    .build();

// KeyStore-based TLS configuration
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker:6651")
    .useKeyStoreTls(true)
    .tlsKeyStoreType("JKS")
    .tlsKeyStorePath("/path/to/client.keystore")
    .tlsKeyStorePassword("keystorePassword")
    .tlsTrustStoreType("JKS")
    .tlsTrustStorePath("/path/to/truststore.jks")
    .tlsTrustStorePassword("truststorePassword")
    .build();

// TLS with custom cipher suites and protocols
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker:6651")
    .tlsCiphers(Set.of("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"))
    .tlsProtocols(Set.of("TLSv1.3", "TLSv1.2"))
    .sslProvider("Conscrypt")
    .build();

// TLS with insecure connection (for testing)
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker:6651")
    .allowTlsInsecureConnection(true)
    .enableTlsHostnameVerification(false)
    .build();

Authorization and Access Control

Support for fine-grained access control and authorization policies.

/**
 * Authorization data provider for role-based access control
 */
interface AuthorizationDataProvider {
    /** Get subject (user/role) for authorization */
    String getSubject();
    
    /** Get additional authorization properties */
    Map<String, String> getAuthorizationProperties();
}

/**
 * Interface for encoded authentication parameter support
 */
interface EncodedAuthenticationParameterSupport {
    /** Check if encoded parameters are supported */
    default boolean supportsEncodedAuthParamString() {
        return false;
    }
}

JWT Token Support

Specialized support for JSON Web Token (JWT) authentication.

/**
 * JWT token authentication support
 * Handles token validation, refresh, and expiration
 */
class JWTAuthenticationProvider implements Authentication {
    /** Create JWT authentication with static token */
    static Authentication createWithToken(String token);
    
    /** Create JWT authentication with token supplier */
    static Authentication createWithTokenSupplier(Supplier<String> tokenSupplier);
    
    /** Create JWT authentication with token file path */
    static Authentication createWithTokenFile(String tokenFilePath);
}

JWT Examples:

// JWT with static token
Authentication jwtAuth = AuthenticationFactory.token(
    "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
);

// JWT with token file (automatically reloaded)
Authentication jwtAuth = AuthenticationFactory.token(() -> {
    try {
        return Files.readString(Paths.get("/path/to/token.jwt"));
    } catch (IOException e) {
        throw new RuntimeException("Failed to read token", e);
    }
});

// JWT with automatic refresh
Authentication jwtAuth = AuthenticationFactory.token(() -> {
    // Implement token refresh logic
    return getRefreshedToken();
});

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://broker:6650")
    .authentication(jwtAuth)
    .build();

Security Exception Handling

Comprehensive exception hierarchy for security-related operations.

/**
 * Authentication-related exceptions
 */
class PulsarClientException {
    /** Authentication failed */
    static class AuthenticationException extends PulsarClientException {
        AuthenticationException(String msg);
        AuthenticationException(Throwable t);
    }
    
    /** Authorization failed */
    static class AuthorizationException extends PulsarClientException {
        AuthorizationException(String msg);
    }
    
    /** Unsupported authentication method */
    static class UnsupportedAuthenticationException extends PulsarClientException {
        UnsupportedAuthenticationException(String msg, Throwable t);
    }
    
    /** Error getting authentication data */
    static class GettingAuthenticationDataException extends PulsarClientException {
        GettingAuthenticationDataException(String msg);
        GettingAuthenticationDataException(Throwable t);
    }
    
    /** Encryption/decryption errors */
    static class CryptoException extends PulsarClientException {
        CryptoException(String msg);
    }
}

Security Best Practices Configuration

Configuration helpers for implementing security best practices.

/**
 * Security configuration utilities
 */
class SecurityUtils {
    /** Validate TLS configuration */
    static boolean validateTlsConfiguration(ClientBuilder builder);
    
    /** Check if connection is secure */
    static boolean isSecureConnection(String serviceUrl);
    
    /** Validate authentication configuration */
    static boolean validateAuthentication(Authentication auth);
}

Security Best Practices Examples:

// Production-ready secure client configuration
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker:6651")
    .authentication(AuthenticationFactory.token(() -> getTokenFromSecureStore()))
    .tlsCertificateFilePath("/secure/path/client.cert.pem")
    .tlsKeyFilePath("/secure/path/client.key.pem")
    .tlsTrustCertsFilePath("/secure/path/ca-bundle.cert.pem")
    .enableTlsHostnameVerification(true)
    .tlsProtocols(Set.of("TLSv1.3"))
    .build();

// Encrypted producer with secure key management
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("sensitive-data")
    .addEncryptionKey("data-encryption-key")
    .cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())
    .cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
    .create();

// Consumer with decryption and access logging
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("sensitive-data")
    .subscriptionName("secure-processor")
    .cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())
    .cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
    .intercept(new SecurityAuditInterceptor())
    .subscribe();

Supporting Types and Enums

enum ProducerCryptoFailureAction {
    /** Fail the send operation */
    FAIL,
    /** Send message unencrypted */
    SEND
}

enum ConsumerCryptoFailureAction {
    /** Fail the receive operation */
    FAIL,
    /** Discard the message */
    DISCARD,
    /** Consume message as-is */
    CONSUME
}

interface DummyCryptoKeyReaderImpl extends CryptoKeyReader {
    /** Dummy implementation that returns null keys */
    static CryptoKeyReader INSTANCE = new DummyCryptoKeyReaderImpl();
}

class KeyStoreParams {
    String keyStorePath;
    String keyStorePassword;
    String keyStoreType;
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-client

docs

authentication-security.md

client-management.md

index.md

message-consumption.md

message-production.md

message-reading.md

schema-serialization.md

transaction-support.md

tile.json