Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication
—
Security framework providing SASL-based authentication and encryption for secure network communication in Apache Spark. The SASL integration enables secure connections with pluggable authentication mechanisms and optional data encryption.
Interface for providing SASL authentication credentials and secret keys.
/**
* SecretKeyHolder provides access to SASL credentials for authentication.
* Applications implement this interface to integrate with their credential management systems.
*/
public interface SecretKeyHolder {
/**
* Gets the SASL username for a specific application ID.
*
* @param appId The application identifier
* @return The SASL username to use for authentication
*/
String getSaslUser(String appId);
/**
* Gets the secret key for a specific application ID.
* The secret key is used for SASL authentication and optionally for encryption.
*
* @param appId The application identifier
* @return The secret key as a string
*/
String getSecretKey(String appId);
}Bootstrap implementations for integrating SASL authentication into the transport layer.
/**
* Client-side SASL authentication bootstrap that configures client channels
* for SASL authentication with the server.
*/
public class SaslClientBootstrap implements TransportClientBootstrap {
/**
* Creates a SASL client bootstrap.
*
* @param conf Transport configuration containing SASL settings
* @param appId Application identifier for credential lookup
* @param secretKeyHolder Provider for SASL credentials
*/
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
@Override
public void doBootstrap(TransportClient client, Channel channel) {
// Configures channel for SASL authentication
// Adds SASL handlers to the Netty pipeline
// Performs SASL handshake with server
}
}/**
* Server-side SASL authentication bootstrap that configures server channels
* to require SASL authentication from clients.
*/
public class SaslServerBootstrap implements TransportServerBootstrap {
/**
* Creates a SASL server bootstrap.
*
* @param conf Transport configuration containing SASL settings
* @param secretKeyHolder Provider for SASL credentials
*/
public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
@Override
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
// Configures channel for SASL authentication
// Wraps the original RPC handler with SASL authentication
// Returns a SaslRpcHandler that enforces authentication
return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
}
}/**
* Spark's SASL client implementation providing authentication capabilities.
* Handles the client side of SASL authentication protocols.
*/
public class SparkSaslClient {
/**
* Creates a SASL client for authentication.
*
* @param protocol The protocol name (typically "spark")
* @param serverName The server name for authentication
* @param props Additional SASL properties
* @param cbh Callback handler for authentication credentials
*/
public SparkSaslClient(String protocol, String serverName, Map<String, String> props,
CallbackHandler cbh);
/**
* Checks if the SASL client is complete (authentication finished).
*
* @return true if authentication is complete
*/
public boolean isComplete();
/**
* Gets the negotiated security layer (QOP - Quality of Protection).
*
* @return The negotiated QOP (auth, auth-int, or auth-conf)
*/
public String getNegotiatedProperty(String propName);
/**
* Processes a challenge from the server during authentication.
*
* @param challenge The challenge bytes from the server
* @return Response bytes to send back to server
* @throws SaslException if authentication fails
*/
public byte[] evaluateChallenge(byte[] challenge) throws SaslException;
/**
* Wraps outgoing data with security layer (encryption/integrity).
*
* @param outgoing The data to wrap
* @param offset Starting offset in the data
* @param len Length of data to wrap
* @return Wrapped data with security applied
* @throws SaslException if wrapping fails
*/
public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
/**
* Unwraps incoming data from security layer.
*
* @param incoming The wrapped data from server
* @param offset Starting offset in the data
* @param len Length of data to unwrap
* @return Unwrapped original data
* @throws SaslException if unwrapping fails
*/
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
/**
* Disposes of the SASL client and releases resources.
*
* @throws SaslException if disposal fails
*/
public void dispose() throws SaslException;
}/**
* Spark's SASL server implementation providing authentication capabilities.
* Handles the server side of SASL authentication protocols.
*/
public class SparkSaslServer {
/**
* Creates a SASL server for authentication.
*
* @param protocol The protocol name (typically "spark")
* @param serverName The server name for authentication
* @param props Additional SASL properties
* @param cbh Callback handler for authentication credentials
*/
public SparkSaslServer(String protocol, String serverName, Map<String, String> props,
CallbackHandler cbh);
/**
* Checks if the SASL server is complete (authentication finished).
*
* @return true if authentication is complete
*/
public boolean isComplete();
/**
* Gets the authenticated user's authorization ID.
*
* @return The authorization ID of the authenticated user
*/
public String getAuthorizationID();
/**
* Processes a response from the client during authentication.
*
* @param response The response bytes from the client
* @return Challenge bytes to send to client (null if complete)
* @throws SaslException if authentication fails
*/
public byte[] evaluateResponse(byte[] response) throws SaslException;
/**
* Wraps outgoing data with security layer (encryption/integrity).
*
* @param outgoing The data to wrap
* @param offset Starting offset in the data
* @param len Length of data to wrap
* @return Wrapped data with security applied
* @throws SaslException if wrapping fails
*/
public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
/**
* Unwraps incoming data from security layer.
*
* @param incoming The wrapped data from client
* @param offset Starting offset in the data
* @param len Length of data to unwrap
* @return Unwrapped original data
* @throws SaslException if unwrapping fails
*/
public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
/**
* Disposes of the SASL server and releases resources.
*
* @throws SaslException if disposal fails
*/
public void dispose() throws SaslException;
}/**
* RPC handler that enforces SASL authentication before delegating to the wrapped handler.
* All RPC requests must be authenticated before processing.
*/
public class SaslRpcHandler extends RpcHandler {
/**
* Creates a SASL-protected RPC handler.
*
* @param conf Transport configuration
* @param channel The Netty channel for this connection
* @param wrappedHandler The underlying RPC handler to protect
* @param secretKeyHolder Provider for authentication credentials
*/
public SaslRpcHandler(TransportConf conf, Channel channel, RpcHandler wrappedHandler,
SecretKeyHolder secretKeyHolder);
@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
// Ensures client is authenticated before processing RPC
if (!isAuthenticated(client)) {
callback.onFailure(new SecurityException("Client not authenticated"));
return;
}
// Delegate to wrapped handler if authenticated
wrappedHandler.receive(client, message, callback);
}
@Override
public StreamManager getStreamManager() {
return new AuthenticatedStreamManager(wrappedHandler.getStreamManager());
}
/**
* Checks if a client has completed SASL authentication.
*
* @param client The client to check
* @return true if client is authenticated
*/
public boolean isAuthenticated(TransportClient client);
}/**
* Interface for SASL encryption backend implementations.
* Provides encryption/decryption services when SASL QOP includes confidentiality.
*/
public interface SaslEncryptionBackend {
/**
* Gets the maximum size of data that can be encrypted in a single operation.
*
* @return Maximum encryption block size in bytes
*/
int maxEncryptedSize();
/**
* Encrypts data using the negotiated SASL security layer.
*
* @param data The plaintext data to encrypt
* @return Encrypted data
* @throws IOException if encryption fails
*/
byte[] encrypt(byte[] data) throws IOException;
/**
* Decrypts data using the negotiated SASL security layer.
*
* @param encryptedData The encrypted data to decrypt
* @return Plaintext data
* @throws IOException if decryption fails
*/
byte[] decrypt(byte[] encryptedData) throws IOException;
}/**
* Utility class for SASL encryption operations.
* Provides helper methods for encrypting and decrypting data streams.
*/
public class SaslEncryption {
/**
* Creates an encryption backend from a completed SASL client.
*
* @param saslClient The completed SASL client
* @param maxOutboundBlockSize Maximum size for outbound encrypted blocks
* @return SaslEncryptionBackend for encryption operations
*/
public static SaslEncryptionBackend createEncryptionBackend(SparkSaslClient saslClient,
int maxOutboundBlockSize);
/**
* Creates an encryption backend from a completed SASL server.
*
* @param saslServer The completed SASL server
* @param maxOutboundBlockSize Maximum size for outbound encrypted blocks
* @return SaslEncryptionBackend for encryption operations
*/
public static SaslEncryptionBackend createEncryptionBackend(SparkSaslServer saslServer,
int maxOutboundBlockSize);
/**
* Encrypts a managed buffer using SASL encryption.
*
* @param backend The encryption backend to use
* @param buffer The buffer to encrypt
* @return New managed buffer containing encrypted data
* @throws IOException if encryption fails
*/
public static ManagedBuffer encryptBuffer(SaslEncryptionBackend backend, ManagedBuffer buffer)
throws IOException;
/**
* Decrypts a managed buffer using SASL encryption.
*
* @param backend The encryption backend to use
* @param encryptedBuffer The encrypted buffer to decrypt
* @return New managed buffer containing decrypted data
* @throws IOException if decryption fails
*/
public static ManagedBuffer decryptBuffer(SaslEncryptionBackend backend, ManagedBuffer encryptedBuffer)
throws IOException;
}/**
* Message wrapper for SASL protocol messages during authentication handshake.
* Used internally by the SASL bootstrap implementations.
*/
public class SaslMessage extends AbstractMessage {
/** The SASL message payload */
public final ManagedBuffer body;
/**
* Creates a SASL protocol message.
*
* @param body The SASL message data
*/
public SaslMessage(ManagedBuffer body);
@Override
public Type type() {
return Type.User; // SASL messages use User type
}
}import org.apache.spark.network.sasl.*;
import org.apache.spark.network.TransportContext;
import java.util.Arrays;
// Implement SecretKeyHolder
public class SimpleSecretKeyHolder implements SecretKeyHolder {
private final Map<String, String> secrets = new HashMap<>();
public SimpleSecretKeyHolder() {
// In practice, load from secure configuration
secrets.put("spark-app-1", "secret-key-123");
secrets.put("spark-app-2", "secret-key-456");
}
@Override
public String getSaslUser(String appId) {
return "spark-user-" + appId;
}
@Override
public String getSecretKey(String appId) {
return secrets.get(appId);
}
}
// Create server with SASL authentication
SecretKeyHolder secretKeyHolder = new SimpleSecretKeyHolder();
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
TransportContext context = new TransportContext(conf, rpcHandler);
TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));
// Create client with SASL authentication
String appId = "spark-app-1";
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, appId, secretKeyHolder);
TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));
TransportClient client = clientFactory.createClient("localhost", 8080);import org.apache.spark.network.util.MapConfigProvider;
public class SaslConfiguration {
public static TransportConf createSaslConfig() {
Map<String, String> config = new HashMap<>();
// Enable SASL authentication
config.put("spark.authenticate", "true");
config.put("spark.authenticate.secret", "default-secret");
// Configure SASL properties
config.put("spark.network.sasl.serverAlwaysEncrypt", "true");
config.put("spark.network.sasl.maxEncryptedBlockSize", "65536");
config.put("spark.network.sasl.timeout", "30s");
// Configure encryption (QOP - Quality of Protection)
config.put("javax.security.sasl.qop", "auth-conf"); // auth, auth-int, or auth-conf
return new TransportConf("spark.network", new MapConfigProvider(config));
}
}public class DatabaseSecretKeyHolder implements SecretKeyHolder {
private final DataSource dataSource;
private final Cache<String, String> secretCache;
public DatabaseSecretKeyHolder(DataSource dataSource) {
this.dataSource = dataSource;
this.secretCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
}
@Override
public String getSaslUser(String appId) {
return "spark-user"; // Could be app-specific
}
@Override
public String getSecretKey(String appId) {
try {
return secretCache.get(appId, () -> loadSecretFromDatabase(appId));
} catch (Exception e) {
throw new RuntimeException("Failed to load secret for app: " + appId, e);
}
}
private String loadSecretFromDatabase(String appId) throws SQLException {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT secret_key FROM app_secrets WHERE app_id = ?")) {
stmt.setString(1, appId);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
return rs.getString("secret_key");
} else {
throw new SecurityException("No secret found for app: " + appId);
}
}
}
}
}public class SaslErrorHandler {
public void handleClientCreation() {
try {
TransportClient client = clientFactory.createClient("remote-host", 9090);
// Test connection with a simple RPC
ByteBuffer testMessage = ByteBuffer.wrap("ping".getBytes());
ByteBuffer response = client.sendRpcSync(testMessage, 5000);
System.out.println("SASL authentication successful");
} catch (Exception e) {
if (e.getCause() instanceof SaslException) {
System.err.println("SASL authentication failed: " + e.getCause().getMessage());
handleSaslFailure((SaslException) e.getCause());
} else {
System.err.println("Connection failed: " + e.getMessage());
}
}
}
private void handleSaslFailure(SaslException e) {
String message = e.getMessage();
if (message.contains("authentication failed")) {
System.err.println("Invalid credentials - check secret key");
} else if (message.contains("timeout")) {
System.err.println("SASL handshake timed out - check network connectivity");
} else if (message.contains("mechanism")) {
System.err.println("SASL mechanism not supported - check configuration");
} else {
System.err.println("Unknown SASL error: " + message);
}
}
}public class EncryptedCommunicationExample {
public void setupEncryptedCommunication() {
// Configure for encryption (confidentiality)
Map<String, String> config = new HashMap<>();
config.put("javax.security.sasl.qop", "auth-conf"); // Enable encryption
config.put("spark.network.sasl.serverAlwaysEncrypt", "true");
config.put("spark.network.sasl.maxEncryptedBlockSize", "65536"); // 64KB blocks
TransportConf conf = new TransportConf("spark.network", new MapConfigProvider(config));
// Rest of setup same as basic SASL
SecretKeyHolder secretKeyHolder = new SimpleSecretKeyHolder();
// Server with encryption
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));
// Client with encryption
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, "app-1", secretKeyHolder);
TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));
// All communication is now encrypted automatically
TransportClient client = clientFactory.createClient("localhost", 8080);
}
}public class SaslMonitoring {
private final AtomicLong successfulAuth = new AtomicLong(0);
private final AtomicLong failedAuth = new AtomicLong(0);
public class MonitoringSecretKeyHolder implements SecretKeyHolder {
private final SecretKeyHolder delegate;
public MonitoringSecretKeyHolder(SecretKeyHolder delegate) {
this.delegate = delegate;
}
@Override
public String getSaslUser(String appId) {
try {
String user = delegate.getSaslUser(appId);
System.out.println("SASL user lookup for app " + appId + ": " + user);
return user;
} catch (Exception e) {
System.err.println("Failed to get SASL user for app " + appId + ": " + e.getMessage());
throw e;
}
}
@Override
public String getSecretKey(String appId) {
try {
String key = delegate.getSecretKey(appId);
successfulAuth.incrementAndGet();
System.out.println("SASL secret key retrieved for app: " + appId);
return key;
} catch (Exception e) {
failedAuth.incrementAndGet();
System.err.println("Failed to get secret key for app " + appId + ": " + e.getMessage());
throw e;
}
}
}
public void printStats() {
System.out.println("SASL Authentication Stats:");
System.out.println(" Successful: " + successfulAuth.get());
System.out.println(" Failed: " + failedAuth.get());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10