CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-10

Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication

Pending
Overview
Eval results
Files

sasl-authentication.mddocs/

SASL Authentication

Security framework providing SASL-based authentication and encryption for secure network communication in Apache Spark. The SASL integration enables secure connections with pluggable authentication mechanisms and optional data encryption.

Capabilities

SecretKeyHolder Interface

Interface for providing SASL authentication credentials and secret keys.

/**
 * SecretKeyHolder provides access to SASL credentials for authentication.
 * Applications implement this interface to integrate with their credential management systems.
 */
public interface SecretKeyHolder {
  /**
   * Gets the SASL username for a specific application ID.
   * 
   * @param appId The application identifier
   * @return The SASL username to use for authentication
   */
  String getSaslUser(String appId);
  
  /**
   * Gets the secret key for a specific application ID.
   * The secret key is used for SASL authentication and optionally for encryption.
   * 
   * @param appId The application identifier  
   * @return The secret key as a string
   */
  String getSecretKey(String appId);
}

Bootstrap Classes

Bootstrap implementations for integrating SASL authentication into the transport layer.

SaslClientBootstrap

/**
 * Client-side SASL authentication bootstrap that configures client channels
 * for SASL authentication with the server.
 */
public class SaslClientBootstrap implements TransportClientBootstrap {
  /**
   * Creates a SASL client bootstrap.
   * 
   * @param conf Transport configuration containing SASL settings
   * @param appId Application identifier for credential lookup
   * @param secretKeyHolder Provider for SASL credentials
   */
  public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
  
  @Override
  public void doBootstrap(TransportClient client, Channel channel) {
    // Configures channel for SASL authentication
    // Adds SASL handlers to the Netty pipeline
    // Performs SASL handshake with server
  }
}

SaslServerBootstrap

/**
 * Server-side SASL authentication bootstrap that configures server channels
 * to require SASL authentication from clients.
 */
public class SaslServerBootstrap implements TransportServerBootstrap {
  /**
   * Creates a SASL server bootstrap.
   * 
   * @param conf Transport configuration containing SASL settings
   * @param secretKeyHolder Provider for SASL credentials
   */
  public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
  
  @Override
  public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
    // Configures channel for SASL authentication
    // Wraps the original RPC handler with SASL authentication
    // Returns a SaslRpcHandler that enforces authentication
    return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
  }
}

SASL Implementation Classes

SparkSaslClient

/**
 * Spark's SASL client implementation providing authentication capabilities.
 * Handles the client side of SASL authentication protocols.
 */
public class SparkSaslClient {
  /**
   * Creates a SASL client for authentication.
   * 
   * @param protocol The protocol name (typically "spark")
   * @param serverName The server name for authentication
   * @param props Additional SASL properties
   * @param cbh Callback handler for authentication credentials
   */
  public SparkSaslClient(String protocol, String serverName, Map<String, String> props, 
                        CallbackHandler cbh);
  
  /**
   * Checks if the SASL client is complete (authentication finished).
   * 
   * @return true if authentication is complete
   */
  public boolean isComplete();
  
  /**
   * Gets the negotiated security layer (QOP - Quality of Protection).
   * 
   * @return The negotiated QOP (auth, auth-int, or auth-conf)  
   */
  public String getNegotiatedProperty(String propName);
  
  /**
   * Processes a challenge from the server during authentication.
   * 
   * @param challenge The challenge bytes from the server
   * @return Response bytes to send back to server
   * @throws SaslException if authentication fails
   */
  public byte[] evaluateChallenge(byte[] challenge) throws SaslException;
  
  /**
   * Wraps outgoing data with security layer (encryption/integrity).
   * 
   * @param outgoing The data to wrap
   * @param offset Starting offset in the data
   * @param len Length of data to wrap
   * @return Wrapped data with security applied
   * @throws SaslException if wrapping fails
   */
  public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
  
  /**
   * Unwraps incoming data from security layer.
   * 
   * @param incoming The wrapped data from server
   * @param offset Starting offset in the data
   * @param len Length of data to unwrap
   * @return Unwrapped original data
   * @throws SaslException if unwrapping fails
   */
  public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
  
  /**
   * Disposes of the SASL client and releases resources.
   * 
   * @throws SaslException if disposal fails
   */
  public void dispose() throws SaslException;
}

SparkSaslServer

/**
 * Spark's SASL server implementation providing authentication capabilities.
 * Handles the server side of SASL authentication protocols.
 */
public class SparkSaslServer {
  /**
   * Creates a SASL server for authentication.
   * 
   * @param protocol The protocol name (typically "spark")
   * @param serverName The server name for authentication
   * @param props Additional SASL properties
   * @param cbh Callback handler for authentication credentials
   */
  public SparkSaslServer(String protocol, String serverName, Map<String, String> props,
                        CallbackHandler cbh);
  
  /**
   * Checks if the SASL server is complete (authentication finished).
   * 
   * @return true if authentication is complete
   */
  public boolean isComplete();
  
  /**
   * Gets the authenticated user's authorization ID.
   * 
   * @return The authorization ID of the authenticated user
   */
  public String getAuthorizationID();
  
  /**
   * Processes a response from the client during authentication.
   * 
   * @param response The response bytes from the client
   * @return Challenge bytes to send to client (null if complete)
   * @throws SaslException if authentication fails
   */
  public byte[] evaluateResponse(byte[] response) throws SaslException;
  
  /**
   * Wraps outgoing data with security layer (encryption/integrity).
   * 
   * @param outgoing The data to wrap
   * @param offset Starting offset in the data
   * @param len Length of data to wrap
   * @return Wrapped data with security applied
   * @throws SaslException if wrapping fails
   */
  public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
  
  /**
   * Unwraps incoming data from security layer.
   * 
   * @param incoming The wrapped data from client
   * @param offset Starting offset in the data
   * @param len Length of data to unwrap
   * @return Unwrapped original data
   * @throws SaslException if unwrapping fails
   */
  public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
  
  /**
   * Disposes of the SASL server and releases resources.
   * 
   * @throws SaslException if disposal fails
   */
  public void dispose() throws SaslException;
}

SASL RPC Handler

SaslRpcHandler

/**
 * RPC handler that enforces SASL authentication before delegating to the wrapped handler.
 * All RPC requests must be authenticated before processing.
 */
public class SaslRpcHandler extends RpcHandler {
  /**
   * Creates a SASL-protected RPC handler.
   * 
   * @param conf Transport configuration
   * @param channel The Netty channel for this connection
   * @param wrappedHandler The underlying RPC handler to protect
   * @param secretKeyHolder Provider for authentication credentials
   */
  public SaslRpcHandler(TransportConf conf, Channel channel, RpcHandler wrappedHandler,
                       SecretKeyHolder secretKeyHolder);
  
  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    // Ensures client is authenticated before processing RPC
    if (!isAuthenticated(client)) {
      callback.onFailure(new SecurityException("Client not authenticated"));
      return;
    }
    
    // Delegate to wrapped handler if authenticated
    wrappedHandler.receive(client, message, callback);
  }
  
  @Override
  public StreamManager getStreamManager() {
    return new AuthenticatedStreamManager(wrappedHandler.getStreamManager());
  }
  
  /**
   * Checks if a client has completed SASL authentication.
   * 
   * @param client The client to check
   * @return true if client is authenticated
   */
  public boolean isAuthenticated(TransportClient client);
}

Encryption Support

SaslEncryptionBackend

/**
 * Interface for SASL encryption backend implementations.
 * Provides encryption/decryption services when SASL QOP includes confidentiality.
 */
public interface SaslEncryptionBackend {
  /**
   * Gets the maximum size of data that can be encrypted in a single operation.
   * 
   * @return Maximum encryption block size in bytes
   */
  int maxEncryptedSize();
  
  /**
   * Encrypts data using the negotiated SASL security layer.
   * 
   * @param data The plaintext data to encrypt
   * @return Encrypted data
   * @throws IOException if encryption fails
   */
  byte[] encrypt(byte[] data) throws IOException;
  
  /**
   * Decrypts data using the negotiated SASL security layer.
   * 
   * @param encryptedData The encrypted data to decrypt
   * @return Plaintext data
   * @throws IOException if decryption fails
   */
  byte[] decrypt(byte[] encryptedData) throws IOException;
}

SaslEncryption

/**
 * Utility class for SASL encryption operations.
 * Provides helper methods for encrypting and decrypting data streams.
 */
public class SaslEncryption {
  /**
   * Creates an encryption backend from a completed SASL client.
   * 
   * @param saslClient The completed SASL client
   * @param maxOutboundBlockSize Maximum size for outbound encrypted blocks
   * @return SaslEncryptionBackend for encryption operations
   */
  public static SaslEncryptionBackend createEncryptionBackend(SparkSaslClient saslClient,
                                                            int maxOutboundBlockSize);
  
  /**
   * Creates an encryption backend from a completed SASL server.
   * 
   * @param saslServer The completed SASL server
   * @param maxOutboundBlockSize Maximum size for outbound encrypted blocks
   * @return SaslEncryptionBackend for encryption operations
   */
  public static SaslEncryptionBackend createEncryptionBackend(SparkSaslServer saslServer,
                                                            int maxOutboundBlockSize);
  
  /**
   * Encrypts a managed buffer using SASL encryption.
   * 
   * @param backend The encryption backend to use
   * @param buffer The buffer to encrypt
   * @return New managed buffer containing encrypted data
   * @throws IOException if encryption fails
   */
  public static ManagedBuffer encryptBuffer(SaslEncryptionBackend backend, ManagedBuffer buffer)
      throws IOException;
  
  /**
   * Decrypts a managed buffer using SASL encryption.
   * 
   * @param backend The encryption backend to use
   * @param encryptedBuffer The encrypted buffer to decrypt
   * @return New managed buffer containing decrypted data
   * @throws IOException if decryption fails
   */
  public static ManagedBuffer decryptBuffer(SaslEncryptionBackend backend, ManagedBuffer encryptedBuffer)
      throws IOException;
}

SASL Messages

SaslMessage

/**
 * Message wrapper for SASL protocol messages during authentication handshake.
 * Used internally by the SASL bootstrap implementations.
 */
public class SaslMessage extends AbstractMessage {
  /** The SASL message payload */
  public final ManagedBuffer body;
  
  /**
   * Creates a SASL protocol message.
   * 
   * @param body The SASL message data
   */
  public SaslMessage(ManagedBuffer body);
  
  @Override
  public Type type() {
    return Type.User; // SASL messages use User type
  }
}

Usage Examples

Basic SASL Setup

import org.apache.spark.network.sasl.*;
import org.apache.spark.network.TransportContext;
import java.util.Arrays;

// Implement SecretKeyHolder
public class SimpleSecretKeyHolder implements SecretKeyHolder {
  private final Map<String, String> secrets = new HashMap<>();
  
  public SimpleSecretKeyHolder() {
    // In practice, load from secure configuration
    secrets.put("spark-app-1", "secret-key-123");
    secrets.put("spark-app-2", "secret-key-456");
  }
  
  @Override
  public String getSaslUser(String appId) {
    return "spark-user-" + appId;
  }
  
  @Override
  public String getSecretKey(String appId) {
    return secrets.get(appId);
  }
}

// Create server with SASL authentication
SecretKeyHolder secretKeyHolder = new SimpleSecretKeyHolder();
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);

TransportContext context = new TransportContext(conf, rpcHandler);
TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));

// Create client with SASL authentication
String appId = "spark-app-1";
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, appId, secretKeyHolder);

TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));
TransportClient client = clientFactory.createClient("localhost", 8080);

Advanced SASL Configuration

import org.apache.spark.network.util.MapConfigProvider;

public class SaslConfiguration {
  public static TransportConf createSaslConfig() {
    Map<String, String> config = new HashMap<>();
    
    // Enable SASL authentication
    config.put("spark.authenticate", "true");
    config.put("spark.authenticate.secret", "default-secret");
    
    // Configure SASL properties
    config.put("spark.network.sasl.serverAlwaysEncrypt", "true");
    config.put("spark.network.sasl.maxEncryptedBlockSize", "65536");
    config.put("spark.network.sasl.timeout", "30s");
    
    // Configure encryption (QOP - Quality of Protection)
    config.put("javax.security.sasl.qop", "auth-conf"); // auth, auth-int, or auth-conf
    
    return new TransportConf("spark.network", new MapConfigProvider(config));
  }
}

Custom Secret Key Management

public class DatabaseSecretKeyHolder implements SecretKeyHolder {
  private final DataSource dataSource;
  private final Cache<String, String> secretCache;
  
  public DatabaseSecretKeyHolder(DataSource dataSource) {
    this.dataSource = dataSource;
    this.secretCache = CacheBuilder.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(10, TimeUnit.MINUTES)
        .build();
  }
  
  @Override
  public String getSaslUser(String appId) {
    return "spark-user"; // Could be app-specific
  }
  
  @Override
  public String getSecretKey(String appId) {
    try {
      return secretCache.get(appId, () -> loadSecretFromDatabase(appId));
    } catch (Exception e) {
      throw new RuntimeException("Failed to load secret for app: " + appId, e);
    }
  }
  
  private String loadSecretFromDatabase(String appId) throws SQLException {
    try (Connection conn = dataSource.getConnection();
         PreparedStatement stmt = conn.prepareStatement(
             "SELECT secret_key FROM app_secrets WHERE app_id = ?")) {
      
      stmt.setString(1, appId);
      
      try (ResultSet rs = stmt.executeQuery()) {
        if (rs.next()) {
          return rs.getString("secret_key");
        } else {
          throw new SecurityException("No secret found for app: " + appId);
        }
      }
    }
  }
}

Handling SASL Authentication Errors

public class SaslErrorHandler {
  public void handleClientCreation() {
    try {
      TransportClient client = clientFactory.createClient("remote-host", 9090);
      
      // Test connection with a simple RPC
      ByteBuffer testMessage = ByteBuffer.wrap("ping".getBytes());
      ByteBuffer response = client.sendRpcSync(testMessage, 5000);
      
      System.out.println("SASL authentication successful");
      
    } catch (Exception e) {
      if (e.getCause() instanceof SaslException) {
        System.err.println("SASL authentication failed: " + e.getCause().getMessage());
        handleSaslFailure((SaslException) e.getCause());
      } else {
        System.err.println("Connection failed: " + e.getMessage());
      }
    }
  }
  
  private void handleSaslFailure(SaslException e) {
    String message = e.getMessage();
    
    if (message.contains("authentication failed")) {
      System.err.println("Invalid credentials - check secret key");
    } else if (message.contains("timeout")) {
      System.err.println("SASL handshake timed out - check network connectivity");
    } else if (message.contains("mechanism")) {
      System.err.println("SASL mechanism not supported - check configuration");
    } else {
      System.err.println("Unknown SASL error: " + message);
    }
  }
}

SASL with Encryption

public class EncryptedCommunicationExample {
  public void setupEncryptedCommunication() {
    // Configure for encryption (confidentiality)
    Map<String, String> config = new HashMap<>();
    config.put("javax.security.sasl.qop", "auth-conf"); // Enable encryption
    config.put("spark.network.sasl.serverAlwaysEncrypt", "true");
    config.put("spark.network.sasl.maxEncryptedBlockSize", "65536"); // 64KB blocks
    
    TransportConf conf = new TransportConf("spark.network", new MapConfigProvider(config));
    
    // Rest of setup same as basic SASL
    SecretKeyHolder secretKeyHolder = new SimpleSecretKeyHolder();
    
    // Server with encryption
    SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
    TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));
    
    // Client with encryption
    SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, "app-1", secretKeyHolder);
    TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));
    
    // All communication is now encrypted automatically
    TransportClient client = clientFactory.createClient("localhost", 8080);
  }
}

SASL Authentication Monitoring

public class SaslMonitoring {
  private final AtomicLong successfulAuth = new AtomicLong(0);
  private final AtomicLong failedAuth = new AtomicLong(0);
  
  public class MonitoringSecretKeyHolder implements SecretKeyHolder {
    private final SecretKeyHolder delegate;
    
    public MonitoringSecretKeyHolder(SecretKeyHolder delegate) {
      this.delegate = delegate;
    }
    
    @Override
    public String getSaslUser(String appId) {
      try {
        String user = delegate.getSaslUser(appId);
        System.out.println("SASL user lookup for app " + appId + ": " + user);
        return user;
      } catch (Exception e) {
        System.err.println("Failed to get SASL user for app " + appId + ": " + e.getMessage());
        throw e;
      }
    }
    
    @Override
    public String getSecretKey(String appId) {
      try {
        String key = delegate.getSecretKey(appId);
        successfulAuth.incrementAndGet();
        System.out.println("SASL secret key retrieved for app: " + appId);
        return key;
      } catch (Exception e) {
        failedAuth.incrementAndGet();
        System.err.println("Failed to get secret key for app " + appId + ": " + e.getMessage());
        throw e;
      }
    }
  }
  
  public void printStats() {
    System.out.println("SASL Authentication Stats:");
    System.out.println("  Successful: " + successfulAuth.get());
    System.out.println("  Failed: " + failedAuth.get());
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10

docs

buffer-management.md

client-operations.md

configuration-utilities.md

index.md

message-protocol.md

sasl-authentication.md

server-operations.md

transport-setup.md

tile.json