CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common

Core networking library for Apache Spark providing transport layer abstractions and utilities

Pending
Overview
Eval results
Files

authentication.mddocs/

Authentication

Pluggable authentication system supporting SASL and custom authentication protocols with encryption capabilities, providing secure communication channels for distributed Spark applications.

Capabilities

SASL Authentication

SASL (Simple Authentication and Security Layer) support for secure authentication with multiple mechanisms and encryption.

/**
 * Client-side SASL authentication bootstrap for secure connections
 */
public class SaslClientBootstrap implements TransportClientBootstrap {
    /**
     * Create a SASL client bootstrap
     * @param conf Transport configuration containing SASL settings
     * @param appId Application identifier for authentication
     * @param secretKeyHolder Provider for secret keys and user information
     */
    public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
    
    /**
     * Bootstrap the client channel with SASL authentication
     * @param client Transport client instance
     * @param channel Netty channel to authenticate
     * @throws RuntimeException if authentication fails
     */
    public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

/**
 * Server-side SASL authentication bootstrap for accepting secure connections
 */
public class SaslServerBootstrap implements TransportServerBootstrap {
    /**
     * Create a SASL server bootstrap
     * @param conf Transport configuration containing SASL settings
     * @param secretKeyHolder Provider for secret keys and user information
     */
    public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
    
    /**
     * Bootstrap the server channel with SASL authentication
     * @param channel Netty channel to authenticate
     * @param rpcHandler Original RPC handler to wrap with authentication
     * @return RPC handler with SASL authentication support
     */
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

Usage Examples:

// Client-side SASL setup
SecretKeyHolder keyHolder = new MySecretKeyHolder();
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
    new SaslClientBootstrap(conf, "my-app-id", keyHolder)
);
TransportClientFactory factory = context.createClientFactory(clientBootstraps);
TransportClient client = factory.createClient("spark-server", 7337);

// Server-side SASL setup
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
    new SaslServerBootstrap(conf, keyHolder)
);
TransportServer server = context.createServer(7337, serverBootstraps);

Secret Key Management

Interface for providing authentication credentials and managing secret keys securely.

/**
 * Interface for holding and retrieving secret keys for authentication
 * Implementations should provide secure storage and retrieval of credentials
 */
public interface SecretKeyHolder {
    /**
     * Get the SASL user identifier for the given application
     * @param appId Application identifier
     * @return SASL username for authentication
     */
    String getSaslUser(String appId);
    
    /**
     * Get the secret key for the given application
     * @param appId Application identifier  
     * @return Secret key for authentication
     */
    String getSecretKey(String appId);
}

Usage Examples:

// Implementing SecretKeyHolder
public class FileBasedSecretKeyHolder implements SecretKeyHolder {
    private final Properties secrets;
    
    public FileBasedSecretKeyHolder(String secretsFile) throws IOException {
        secrets = new Properties();
        try (InputStream is = new FileInputStream(secretsFile)) {
            secrets.load(is);
        }
    }
    
    @Override
    public String getSaslUser(String appId) {
        return secrets.getProperty(appId + ".user", appId);
    }
    
    @Override
    public String getSecretKey(String appId) {
        return secrets.getProperty(appId + ".secret");
    }
}

// In-memory implementation
public class MapBasedSecretKeyHolder implements SecretKeyHolder {
    private final Map<String, String> users = new HashMap<>();
    private final Map<String, String> secrets = new HashMap<>();
    
    public void addCredential(String appId, String user, String secret) {
        users.put(appId, user);
        secrets.put(appId, secret);
    }
    
    @Override
    public String getSaslUser(String appId) {
        return users.get(appId);
    }
    
    @Override
    public String getSecretKey(String appId) {
        return secrets.get(appId);
    }
}

SASL Implementation Classes

Internal SASL client and server implementations providing the authentication protocol mechanics.

/**
 * SASL client implementation for Spark providing authentication handshake
 */
public class SparkSaslClient {
    /**
     * Create a SASL client for authentication
     * @param secretKeyId Application identifier for key lookup
     * @param secretKeyHolder Provider for authentication credentials
     * @param encrypt Whether to enable SASL encryption after authentication
     */
    public SparkSaslClient(String secretKeyId, SecretKeyHolder secretKeyHolder, boolean encrypt);
    
    /**
     * Check if client is complete (authentication finished)
     * @return true if authentication is complete
     */
    public boolean isComplete();
    
    /**
     * Process challenge from server and generate response
     * @param challenge Challenge bytes from server
     * @return Response bytes to send to server
     * @throws Exception if processing fails
     */
    public byte[] response(byte[] challenge) throws Exception;
    
    /**
     * Dispose of client resources and cleanup
     */
    public void dispose();
}

/**
 * SASL server implementation for Spark providing authentication verification
 */
public class SparkSaslServer {
    /**
     * Create a SASL server for authentication
     * @param secretKeyId Application identifier for key lookup
     * @param secretKeyHolder Provider for authentication credentials
     * @param encrypt Whether to enable SASL encryption after authentication
     */
    public SparkSaslServer(String secretKeyId, SecretKeyHolder secretKeyHolder, boolean encrypt);
    
    /**
     * Check if server is complete (authentication finished)
     * @return true if authentication is complete
     */
    public boolean isComplete();
    
    /**
     * Process response from client and generate challenge
     * @param response Response bytes from client
     * @return Challenge bytes to send to client, or null if complete
     * @throws Exception if processing fails
     */
    public byte[] response(byte[] response) throws Exception;
    
    /**
     * Get the authenticated user identifier
     * @return User ID that was authenticated
     */
    public String getAuthenticatedUser();
    
    /**
     * Dispose of server resources and cleanup
     */
    public void dispose();
}

SASL RPC Handler

RPC handler with integrated SASL authentication support, wrapping application RPC handlers with security.

/**
 * RPC handler with SASL authentication support
 * Wraps application RPC handlers to provide authentication before processing
 */
public class SaslRpcHandler extends RpcHandler {
    /**
     * Create a SASL RPC handler
     * @param conf Transport configuration
     * @param wrapped Application RPC handler to protect with authentication
     * @param secretKeyHolder Provider for authentication credentials
     */
    public SaslRpcHandler(TransportConf conf, RpcHandler wrapped, SecretKeyHolder secretKeyHolder);
    
    /**
     * Receive and process RPC message with authentication check
     * @param client Transport client connection
     * @param message RPC message content
     * @param callback Callback for sending response
     */
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
    
    /**
     * Get stream manager with authentication support
     * @return StreamManager that checks authentication before serving streams
     */
    public StreamManager getStreamManager();
    
    /**
     * Handle channel activation with authentication setup
     * @param client Transport client that connected
     */
    public void channelActive(TransportClient client);
    
    /**
     * Handle channel deactivation with cleanup
     * @param client Transport client that disconnected
     */
    public void channelInactive(TransportClient client);
}

Custom Authentication Protocol

Custom authentication protocol support with challenge-response mechanism and encryption setup.

/**
 * Client bootstrap for custom authentication protocol
 */
public class AuthClientBootstrap implements TransportClientBootstrap {
    /**
     * Create custom auth client bootstrap
     * @param conf Transport configuration
     * @param appId Application identifier
     * @param secretKeyHolder Provider for authentication credentials
     */
    public AuthClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
    
    /**
     * Bootstrap client channel with custom authentication
     * @param client Transport client instance
     * @param channel Netty channel to authenticate
     * @throws RuntimeException if authentication fails
     */
    public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

/**
 * Server bootstrap for custom authentication protocol
 */
public class AuthServerBootstrap implements TransportServerBootstrap {
    /**
     * Create custom auth server bootstrap
     * @param conf Transport configuration
     * @param secretKeyHolder Provider for authentication credentials
     */
    public AuthServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
    
    /**
     * Bootstrap server channel with custom authentication
     * @param channel Netty channel to authenticate
     * @param rpcHandler Original RPC handler to protect
     * @return RPC handler with authentication support
     */
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

Authentication Messages

Protocol messages for custom authentication handshake with challenge-response mechanism.

/**
 * Client challenge message for authentication negotiation
 */
public class ClientChallenge implements Encodable {
    /**
     * Create a client challenge message
     * @param appId Application identifier
     * @param kdf Key derivation function name
     * @param iterations Number of iterations for key derivation
     * @param cipher Cipher algorithm name
     * @param keyLength Key length in bits
     * @param nonce Random nonce for challenge
     * @param challenge Challenge data
     */
    public ClientChallenge(String appId, String kdf, int iterations, String cipher, 
                          int keyLength, byte[] nonce, byte[] challenge);
    
    /**
     * Get application identifier
     * @return Application ID string
     */
    public String appId();
    
    /**
     * Get key derivation function name
     * @return KDF algorithm name
     */
    public String kdf();
    
    /**
     * Get key derivation iterations
     * @return Number of iterations
     */
    public int iterations();
    
    /**
     * Get cipher algorithm name
     * @return Cipher algorithm
     */
    public String cipher();
    
    /**
     * Get key length in bits
     * @return Key length
     */
    public int keyLength();
    
    /**
     * Get challenge nonce
     * @return Nonce bytes
     */
    public byte[] nonce();
    
    /**
     * Get challenge data
     * @return Challenge bytes
     */
    public byte[] challenge();
    
    /**
     * Calculate encoded length
     * @return Length in bytes
     */
    public int encodedLength();
    
    /**
     * Encode to ByteBuf
     * @param buf Output buffer
     */
    public void encode(ByteBuf buf);
    
    /**
     * Decode from ByteBuffer
     * @param buffer Input buffer
     * @return Decoded ClientChallenge
     */
    public static ClientChallenge decodeMessage(ByteBuffer buffer);
}

/**
 * Server response message for authentication negotiation
 */
public class ServerResponse implements Encodable {
    /**
     * Create a server response message
     * @param response Response data to client challenge
     * @param inputIv Input initialization vector for encryption
     * @param outputIv Output initialization vector for encryption
     */
    public ServerResponse(byte[] response, byte[] inputIv, byte[] outputIv);
    
    /**
     * Get server response data
     * @return Response bytes
     */
    public byte[] response();
    
    /**
     * Get input initialization vector
     * @return Input IV bytes
     */
    public byte[] inputIv();
    
    /**
     * Get output initialization vector
     * @return Output IV bytes
     */
    public byte[] outputIv();
    
    /**
     * Calculate encoded length
     * @return Length in bytes
     */
    public int encodedLength();
    
    /**
     * Encode to ByteBuf
     * @param buf Output buffer
     */
    public void encode(ByteBuf buf);
    
    /**
     * Decode from ByteBuffer
     * @param buffer Input buffer
     * @return Decoded ServerResponse
     */
    public static ServerResponse decodeMessage(ByteBuffer buffer);
}

Transport Cipher

Encryption management for authenticated channels, providing transparent encryption/decryption after authentication.

/**
 * Manages encryption/decryption for transport channels after authentication
 */
public class TransportCipher {
    /**
     * Create a transport cipher with encryption parameters
     * @param cryptoConf Crypto configuration properties
     * @param cipher Cipher transformation string
     * @param inKey Key for decrypting incoming data
     * @param outKey Key for encrypting outgoing data
     * @param inIv Initialization vector for incoming data
     * @param outIv Initialization vector for outgoing data
     * @throws IOException if cipher setup fails
     */
    public TransportCipher(Properties cryptoConf, String cipher, byte[] inKey, byte[] outKey, 
                          byte[] inIv, byte[] outIv) throws IOException;
    
    /**
     * Get the cipher transformation string
     * @return Cipher transformation (e.g., "AES/CTR/NoPadding")
     */
    public String getCipherTransformation();
    
    /**
     * Get input initialization vector
     * @return Input IV bytes
     */
    public byte[] getInputIv();
    
    /**
     * Get output initialization vector
     * @return Output IV bytes
     */
    public byte[] getOutputIv();
    
    /**
     * Add encryption handlers to the Netty channel pipeline
     * @param ch Channel to add encryption to
     * @throws IOException if handlers cannot be added
     */
    public void addToChannel(Channel ch) throws IOException;
}

Usage Examples:

// Setting up encrypted transport after custom auth
Properties cryptoConf = new Properties();
cryptoConf.setProperty("spark.network.crypto.keyLength", "128");
cryptoConf.setProperty("spark.network.crypto.keyFactoryAlgorithm", "PBKDF2WithHmacSHA1");

TransportCipher cipher = new TransportCipher(
    cryptoConf,
    "AES/CTR/NoPadding", 
    inKey, outKey, 
    inIv, outIv
);

// Add encryption to channel after authentication
cipher.addToChannel(channel);

Authentication Usage Patterns

Complete SASL Setup

// Server setup with SASL
SecretKeyHolder secretKeyHolder = new MySecretKeyHolder();
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
    new SaslServerBootstrap(conf, secretKeyHolder)
);

TransportContext context = new TransportContext(conf, new MyRpcHandler());
TransportServer server = context.createServer(7337, serverBootstraps);

// Client setup with SASL
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
    new SaslClientBootstrap(conf, "my-app", secretKeyHolder)
);

TransportClientFactory factory = context.createClientFactory(clientBootstraps);
TransportClient client = factory.createClient("localhost", 7337);

// Client is now authenticated and can send RPCs
client.sendRpc(message, callback);

Custom Authentication Flow

// Custom auth server
List<TransportServerBootstrap> bootstraps = Arrays.asList(
    new AuthServerBootstrap(conf, secretKeyHolder)
);
TransportServer server = context.createServer(8080, bootstraps);

// Custom auth client
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
    new AuthClientBootstrap(conf, "custom-app", secretKeyHolder)
);
TransportClientFactory factory = context.createClientFactory(clientBootstraps);
TransportClient client = factory.createClient("localhost", 8080);

Secure Configuration

// Configure authentication and encryption settings
Map<String, String> configMap = new HashMap<>();
configMap.put("spark.authenticate", "true");
configMap.put("spark.network.sasl.serverAlwaysEncrypt", "true");
configMap.put("spark.network.crypto.enabled", "true");
configMap.put("spark.network.crypto.keyLength", "256");

TransportConf conf = new TransportConf("secure-app", new MapConfigProvider(configMap));

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common

docs

authentication.md

buffers.md

configuration.md

index.md

protocol.md

streaming.md

transport.md

tile.json