CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Pending
Overview
Eval results
Files

security-authentication.mddocs/

Security and Authentication

The security and authentication API provides comprehensive protection for Apache Spark's network communications through SASL authentication mechanisms and AES encryption protocols. This layer ensures secure data transmission and proper authentication of clients and servers in distributed Spark environments.

Capabilities

Transport Encryption

Core interface for transport layer encryption supporting multiple cipher algorithms.

public interface TransportCipher {
    /**
     * Get the key identifier for this cipher
     * @return String representing the key ID used for encryption
     * @throws GeneralSecurityException if key retrieval fails
     */
    String getKeyId() throws GeneralSecurityException;
    
    /**
     * Add encryption/decryption handlers to the Netty channel pipeline
     * @param channel - Netty Channel to add cipher handlers to
     * @throws IOException if I/O operations fail during setup
     * @throws GeneralSecurityException if cryptographic operations fail
     */
    void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
}

AES-CTR Cipher Implementation

Counter mode AES cipher implementation for transport layer encryption.

public class CtrTransportCipher implements TransportCipher {
    /**
     * Create CTR transport cipher with specified key and initialization vector
     * @param key - Secret key for AES encryption
     * @param iv - Initialization vector for CTR mode
     * @throws GeneralSecurityException if cipher initialization fails
     */
    public CtrTransportCipher(SecretKey key, byte[] iv) throws GeneralSecurityException;
    
    @Override
    public String getKeyId() throws GeneralSecurityException;
    
    @Override
    public void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
}

AES-GCM Cipher Implementation

Galois/Counter Mode AES cipher implementation providing authenticated encryption.

public class GcmTransportCipher implements TransportCipher {
    /**
     * Create GCM transport cipher with specified key
     * @param key - Secret key for AES-GCM encryption
     * @throws GeneralSecurityException if cipher initialization fails
     */
    public GcmTransportCipher(SecretKey key) throws GeneralSecurityException;
    
    @Override
    public String getKeyId() throws GeneralSecurityException;
    
    @Override
    public void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
}

Authentication Protocol

Authentication Client Bootstrap

Client bootstrap for setting up authentication protocol with servers.

public class AuthClientBootstrap implements TransportClientBootstrap {
    /**
     * Create authentication client bootstrap
     * @param conf - Transport configuration containing auth settings
     * @param appId - Application identifier for authentication
     * @param secretKeyHolder - Provider for authentication secrets
     */
    public AuthClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
    
    @Override
    public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

Authentication Server Bootstrap

Server bootstrap for handling authentication protocol from clients.

public class AuthServerBootstrap implements TransportServerBootstrap {
    /**
     * Create authentication server bootstrap
     * @param conf - Transport configuration containing auth settings
     * @param secretKeyHolder - Provider for validating authentication secrets
     */
    public AuthServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
    
    @Override
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

SASL Authentication

SASL Client Implementation

SASL client implementation for Spark's authentication needs.

public class SparkSaslClient implements SaslEncryptionBackend {
    /**
     * Create SASL client for authentication
     * @param secretKeyId - Identifier for the secret key
     * @param secretKey - Secret key for SASL authentication
     * @param encryptionEnabled - Whether to enable SASL encryption
     */
    public SparkSaslClient(String secretKeyId, String secretKey, boolean encryptionEnabled);
    
    /**
     * Create and configure SASL client
     * @return SaslClient configured for Spark authentication
     * @throws IOException if SASL client creation fails
     */
    public SaslClient createSaslClient() throws IOException;
    
    /**
     * Check if SASL negotiation is complete
     * @return boolean indicating if authentication is complete
     */
    public boolean isComplete();
    
    /**
     * Process a SASL challenge from the server
     * @param challenge - byte array containing the server challenge
     * @return byte array containing the client response
     * @throws IOException if challenge processing fails
     */
    public byte[] response(byte[] challenge) throws IOException;
}

SASL Server Implementation

SASL server implementation for validating client authentications.

public class SparkSaslServer implements SaslEncryptionBackend {
    /**
     * Create SASL server for authentication
     * @param secretKeyId - Identifier for the secret key
     * @param secretKey - Secret key for SASL authentication
     * @param encryptionEnabled - Whether to enable SASL encryption
     */
    public SparkSaslServer(String secretKeyId, String secretKey, boolean encryptionEnabled);
    
    /**
     * Create and configure SASL server
     * @return SaslServer configured for Spark authentication
     * @throws IOException if SASL server creation fails
     */
    public SaslServer createSaslServer() throws IOException;
    
    /**
     * Check if SASL negotiation is complete
     * @return boolean indicating if authentication is complete
     */
    public boolean isComplete();
    
    /**
     * Process a SASL response from the client
     * @param response - byte array containing the client response
     * @return byte array containing the server challenge
     * @throws IOException if response processing fails
     */
    public byte[] response(byte[] response) throws IOException;
}

SASL Bootstrap Components

Bootstrap implementations for integrating SASL authentication into the transport layer.

public class SaslClientBootstrap implements TransportClientBootstrap {
    /**
     * Create SASL client bootstrap
     * @param conf - Transport configuration
     * @param appId - Application identifier
     * @param secretKeyHolder - Provider for authentication secrets
     */
    public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
    
    @Override
    public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

public class SaslServerBootstrap implements TransportServerBootstrap {
    /**
     * Create SASL server bootstrap
     * @param conf - Transport configuration
     * @param secretKeyHolder - Provider for validating authentication secrets
     */
    public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
    
    @Override
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

SASL RPC Handler

RPC handler that provides SASL authentication capabilities.

public class SaslRpcHandler extends AbstractAuthRpcHandler {
    /**
     * Create SASL RPC handler
     * @param conf - Transport configuration
     * @param channel - Netty channel for communication
     * @param delegate - Underlying RPC handler to delegate to after authentication
     * @param secretKeyHolder - Provider for authentication secrets
     */
    public SaslRpcHandler(TransportConf conf, Channel channel, RpcHandler delegate, SecretKeyHolder secretKeyHolder);
    
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
    
    @Override
    public StreamManager getStreamManager();
    
    @Override
    public void channelActive(TransportClient client);
    
    @Override
    public void channelInactive(TransportClient client);
}

Secret Management

SecretKeyHolder Interface

Interface for managing authentication secrets and user mappings.

public interface SecretKeyHolder {
    /**
     * Get the SASL user identifier for the given application
     * @param appId - Application identifier
     * @return String representing the SASL user, or null if not found
     */
    String getSaslUser(String appId);
    
    /**
     * Get the secret key for the given application
     * @param appId - Application identifier
     * @return String representing the secret key, or null if not found
     */
    String getSecretKey(String appId);
}

SASL Messages and Encryption

SASL Message Protocol

Message types used in SASL authentication exchange.

public class SaslMessage implements Encodable {
    /**
     * Create a SASL message
     * @param payload - byte array containing SASL data
     */
    public SaslMessage(byte[] payload);
    
    /**
     * Get the SASL payload
     * @return byte array containing the SASL data
     */
    public byte[] payload();
    
    @Override
    public int encodedLength();
    
    @Override
    public void encode(ByteBuf buf);
}

SASL Encryption Backend

Backend interface for SASL encryption operations.

public interface SaslEncryptionBackend {
    /**
     * Encrypt data using SASL encryption
     * @param data - byte array to encrypt
     * @param offset - starting offset in the data array
     * @param len - number of bytes to encrypt
     * @return byte array containing encrypted data
     * @throws IOException if encryption fails
     */
    byte[] wrap(byte[] data, int offset, int len) throws IOException;
    
    /**
     * Decrypt data using SASL encryption
     * @param data - byte array to decrypt
     * @param offset - starting offset in the data array
     * @param len - number of bytes to decrypt
     * @return byte array containing decrypted data
     * @throws IOException if decryption fails
     */
    byte[] unwrap(byte[] data, int offset, int len) throws IOException;
    
    /**
     * Dispose of the encryption backend and clean up resources
     * @throws IOException if cleanup fails
     */
    void dispose() throws IOException;
}

SASL Encryption Implementation

Implementation of SASL encryption for secure data transmission.

public class SaslEncryption implements SaslEncryptionBackend {
    /**
     * Create SASL encryption with the specified backend
     * @param backend - SaslEncryptionBackend for actual encryption operations
     */
    public SaslEncryption(SaslEncryptionBackend backend);
    
    @Override
    public byte[] wrap(byte[] data, int offset, int len) throws IOException;
    
    @Override
    public byte[] unwrap(byte[] data, int offset, int len) throws IOException;
    
    @Override
    public void dispose() throws IOException;
}

Exception Classes

SaslTimeoutException

Exception thrown when SASL authentication operations timeout.

public class SaslTimeoutException extends RuntimeException {
    /**
     * Create SASL timeout exception
     * @param message - Description of the timeout condition
     */
    public SaslTimeoutException(String message);
    
    /**
     * Create SASL timeout exception with cause
     * @param message - Description of the timeout condition
     * @param cause - Underlying cause of the timeout
     */
    public SaslTimeoutException(String message, Throwable cause);
}

Usage Examples

Setting Up Transport Encryption

import org.apache.spark.network.crypto.*;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import java.security.SecureRandom;

// Generate AES key for encryption
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
keyGen.init(256);
SecretKey secretKey = keyGen.generateKey();

// Create CTR cipher
byte[] iv = new byte[16];
new SecureRandom().nextBytes(iv);
CtrTransportCipher ctrCipher = new CtrTransportCipher(secretKey, iv);

// Create GCM cipher
GcmTransportCipher gcmCipher = new GcmTransportCipher(secretKey);

// Apply cipher to channel
Channel channel = getNettyChannel(); // Obtain channel from transport
ctrCipher.addToChannel(channel);

System.out.println("Encryption enabled with key ID: " + ctrCipher.getKeyId());

SASL Authentication Setup

import org.apache.spark.network.sasl.*;

// Create secret key holder
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
    @Override
    public String getSaslUser(String appId) {
        return "spark-user-" + appId;
    }
    
    @Override
    public String getSecretKey(String appId) {
        return "secret-key-for-" + appId;
    }
};

// Client-side SASL bootstrap
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, "myapp", secretKeyHolder);

// Server-side SASL bootstrap
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);

// Create authenticated transport context
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(clientBootstrap);
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(serverBootstrap);

TransportContext context = new TransportContext(conf, rpcHandler);
TransportClientFactory clientFactory = context.createClientFactory(clientBootstraps);
TransportServer server = context.createServer(9999, serverBootstraps);

System.out.println("SASL-authenticated server started on port: " + server.getPort());

Authentication Protocol Implementation

import org.apache.spark.network.crypto.*;

// Client-side authentication
AuthClientBootstrap authClient = new AuthClientBootstrap(conf, "spark-app", secretKeyHolder);

// Server-side authentication
AuthServerBootstrap authServer = new AuthServerBootstrap(conf, secretKeyHolder);

// Create transport with authentication
TransportContext authContext = new TransportContext(conf, rpcHandler);
TransportServer authServerInstance = authContext.createServer(8443, Arrays.asList(authServer));
TransportClientFactory authClientFactory = authContext.createClientFactory(Arrays.asList(authClient));

// Connect authenticated client
TransportClient authenticatedClient = authClientFactory.createClient("localhost", 8443);
System.out.println("Authenticated client connected: " + authenticatedClient.isActive());

Custom Secret Key Management

import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

// Custom secret key holder implementation
public class DatabaseSecretKeyHolder implements SecretKeyHolder {
    private final Map<String, String> userCache = new ConcurrentHashMap<>();
    private final Map<String, String> keyCache = new ConcurrentHashMap<>();
    
    @Override
    public String getSaslUser(String appId) {
        return userCache.computeIfAbsent(appId, id -> {
            // In real implementation, query database
            return "user-" + id;
        });
    }
    
    @Override
    public String getSecretKey(String appId) {
        return keyCache.computeIfAbsent(appId, id -> {
            // In real implementation, securely retrieve from key store
            return generateSecretKey(id);
        });
    }
    
    private String generateSecretKey(String appId) {
        // Generate or retrieve secure key for application
        return "generated-key-" + appId.hashCode();
    }
}

// Usage
SecretKeyHolder customKeyHolder = new DatabaseSecretKeyHolder();
SaslServerBootstrap customSaslServer = new SaslServerBootstrap(conf, customKeyHolder);

SASL Encryption Usage

import org.apache.spark.network.sasl.*;

// Create SASL client and server for testing
String secretKey = "shared-secret-key";
SparkSaslClient saslClient = new SparkSaslClient("app1", secretKey, true);
SparkSaslServer saslServer = new SparkSaslServer("app1", secretKey, true);

try {
    // Perform SASL handshake
    SaslClient client = saslClient.createSaslClient();
    SaslServer server = saslServer.createSaslServer();
    
    byte[] challenge = null;
    byte[] response = client.hasInitialResponse() ? client.evaluateChallenge(new byte[0]) : null;
    
    while (!client.isComplete() || !server.isComplete()) {
        if (!server.isComplete()) {
            challenge = server.evaluateResponse(response != null ? response : new byte[0]);
        }
        if (!client.isComplete()) {
            response = client.evaluateChallenge(challenge != null ? challenge : new byte[0]);
        }
    }
    
    System.out.println("SASL handshake completed successfully");
    
    // Test encryption if enabled
    if (saslClient.isComplete() && server.getQop() != null) {
        String testData = "Hello, encrypted world!";
        byte[] encrypted = saslClient.wrap(testData.getBytes(), 0, testData.getBytes().length);
        byte[] decrypted = saslServer.unwrap(encrypted, 0, encrypted.length);
        
        System.out.println("Original: " + testData);
        System.out.println("Decrypted: " + new String(decrypted));
    }
    
} catch (Exception e) {
    System.err.println("SASL authentication failed: " + e.getMessage());
} finally {
    saslClient.dispose();
    saslServer.dispose();
}

Integrated Security Setup

// Complete security setup with both authentication and encryption
public TransportContext createSecureTransportContext(TransportConf conf, RpcHandler rpcHandler) throws Exception {
    // Create secret key holder
    SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
        @Override
        public String getSaslUser(String appId) {
            return System.getProperty("spark.app.user", "spark-user");
        }
        
        @Override
        public String getSecretKey(String appId) {
            return System.getProperty("spark.app.secret", "default-secret");
        }
    };
    
    // Create bootstraps for authentication and encryption
    List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
        new SaslClientBootstrap(conf, "secure-app", secretKeyHolder),
        new AuthClientBootstrap(conf, "secure-app", secretKeyHolder)
    );
    
    List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
        new SaslServerBootstrap(conf, secretKeyHolder),
        new AuthServerBootstrap(conf, secretKeyHolder)
    );
    
    // Create secure transport context
    TransportContext secureContext = new TransportContext(conf, rpcHandler);
    
    // Test the secure setup
    TransportServer secureServer = secureContext.createServer(0, serverBootstraps);
    TransportClientFactory secureClientFactory = secureContext.createClientFactory(clientBootstraps);
    
    System.out.println("Secure transport context created successfully");
    System.out.println("Server port: " + secureServer.getPort());
    
    return secureContext;
}

Handling Authentication Failures

// Custom RPC handler with authentication error handling
public class SecureRpcHandler extends RpcHandler {
    private final RpcHandler delegate;
    private final Set<String> authenticatedClients = ConcurrentHashMap.newKeySet();
    
    public SecureRpcHandler(RpcHandler delegate) {
        this.delegate = delegate;
    }
    
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        String clientId = client.getSocketAddress().toString();
        
        if (!authenticatedClients.contains(clientId)) {
            callback.onFailure(new SecurityException("Client not authenticated: " + clientId));
            return;
        }
        
        delegate.receive(client, message, callback);
    }
    
    @Override
    public StreamManager getStreamManager() {
        return delegate.getStreamManager();
    }
    
    @Override
    public void channelActive(TransportClient client) {
        // Mark client as authenticated after successful bootstrap
        String clientId = client.getSocketAddress().toString();
        authenticatedClients.add(clientId);
        System.out.println("Client authenticated: " + clientId);
        delegate.channelActive(client);
    }
    
    @Override
    public void channelInactive(TransportClient client) {
        String clientId = client.getSocketAddress().toString();
        authenticatedClients.remove(clientId);
        System.out.println("Client disconnected: " + clientId);
        delegate.channelInactive(client);
    }
    
    @Override
    public void exceptionCaught(Throwable cause, TransportClient client) {
        if (cause instanceof SaslTimeoutException) {
            System.err.println("SASL authentication timeout for client: " + client.getSocketAddress());
        } else if (cause instanceof SecurityException) {
            System.err.println("Security exception for client: " + client.getSocketAddress() + " - " + cause.getMessage());
        }
        delegate.exceptionCaught(cause, client);
    }
}

Types

Authentication Message Types

public class AuthMessage implements Encodable {
    public static class Challenge extends AuthMessage {
        public Challenge(byte[] challenge);
        public byte[] challenge();
    }
    
    public static class Response extends AuthMessage {
        public Response(byte[] response);
        public byte[] response();
    }
    
    public static class Success extends AuthMessage {
        public Success(byte[] payload);
        public byte[] payload();
    }
}

Abstract Base Classes

public abstract class AbstractAuthRpcHandler extends RpcHandler {
    protected final TransportConf conf;
    protected final Channel channel;
    protected final RpcHandler delegate;
    
    protected AbstractAuthRpcHandler(TransportConf conf, Channel channel, RpcHandler delegate);
    
    protected abstract boolean isAuthenticated();
    protected abstract void doAuthenticationHandshake(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12

docs

buffer-management.md

client-operations.md

configuration-management.md

index.md

message-protocol.md

security-authentication.md

server-operations.md

shuffle-database.md

transport-context.md

tile.json