Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.
External shuffle client for Mesos coarse-grained mode.
/**
* External shuffle client for Mesos coarse-grained mode
* Extends ExternalShuffleClient with Mesos-specific functionality
*/
public class MesosExternalShuffleClient extends ExternalShuffleClient {
/**
* Create a Mesos external shuffle client
* @param conf - Transport configuration
* @param secretKeyHolder - Secret key holder for authentication
* @param authEnabled - Whether authentication is enabled
* @param registrationTimeoutMs - Timeout for registration operations in milliseconds
*/
public MesosExternalShuffleClient(
TransportConf conf, SecretKeyHolder secretKeyHolder,
boolean authEnabled, long registrationTimeoutMs
);
/**
* Register driver with the Mesos external shuffle service
* @param host - Host name of the Mesos shuffle service
* @param port - Port number of the Mesos shuffle service
* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
* @param heartbeatIntervalMs - Heartbeat interval in milliseconds
* @throws IOException if connection fails
* @throws InterruptedException if registration is interrupted
*/
public void registerDriverWithShuffleService(
String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs
) throws IOException, InterruptedException;
/**
* Close the client and clean up resources
* Stops heartbeat mechanism and closes connections
*/
@Override
public void close();
}Message for driver registration with Mesos external shuffle service.
/**
* Message for driver registration with Mesos external shuffle service
*/
public class RegisterDriver extends BlockTransferMessage {
/**
* Create a driver registration message
* @param appId - Application ID
* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
*/
public RegisterDriver(String appId, long heartbeatTimeoutMs);
/**
* Get the application ID
* @return Application ID
*/
public String getAppId();
/**
* Get the heartbeat timeout
* @return Heartbeat timeout in milliseconds
*/
public long getHeartbeatTimeoutMs();
public boolean equals(Object other);
public int hashCode();
public String toString();
}Heartbeat message from driver to Mesos external shuffle service.
/**
* Heartbeat message from driver to Mesos external shuffle service
*/
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
/**
* Create a heartbeat message
* @param appId - Application ID
*/
public ShuffleServiceHeartbeat(String appId);
/**
* Get the application ID
* @return Application ID
*/
public String getAppId();
public boolean equals(Object other);
public int hashCode();
public String toString();
}Usage Examples:
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient;
import org.apache.spark.network.shuffle.protocol.mesos.*;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;
// Example 1: Basic Mesos shuffle client setup
public class MesosShuffleClientExample {
public void setupMesosShuffleClient() {
// Create transport configuration for Mesos environment
TransportConf conf = new TransportConf("shuffle");
// Set up authentication for Mesos deployment
ShuffleSecretManager secretManager = new ShuffleSecretManager();
String appId = "mesos-app-20231201-001";
String appSecret = "mesos-shuffle-secret-123";
secretManager.registerApp(appId, appSecret);
// Create Mesos external shuffle client
MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(
conf, secretManager, true, 10000 // 10 second registration timeout
);
// Initialize the client for the application
mesosClient.init(appId);
// Register driver with Mesos shuffle service
String mesosShuffleHost = "mesos-shuffle-service.cluster.local";
int mesosShufflePort = 7337;
long heartbeatTimeoutMs = 60000; // 60 seconds
long heartbeatIntervalMs = 30000; // 30 seconds
mesosClient.registerDriverWithShuffleService(
mesosShuffleHost, mesosShufflePort,
heartbeatTimeoutMs, heartbeatIntervalMs
);
System.out.println("Mesos shuffle client registered with service at " +
mesosShuffleHost + ":" + mesosShufflePort);
// Use the client for normal shuffle operations
performShuffleOperations(mesosClient, appId);
// Clean up when done
mesosClient.close();
secretManager.unregisterApp(appId);
}
private void performShuffleOperations(MesosExternalShuffleClient client, String appId) {
// Register executors
String[] localDirs = {"/mesos/work/spark-1", "/mesos/work/spark-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
client.registerWithShuffleServer("executor-host-1", 7337, "executor-1", executorInfo);
client.registerWithShuffleServer("executor-host-2", 7337, "executor-2", executorInfo);
// Fetch shuffle blocks
BlockFetchingListener listener = new MesosBlockFetchingListener();
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
client.fetchBlocks("executor-host-1", 7337, "executor-1", blockIds, listener, null);
}
}
// Example 2: Custom Mesos block fetching listener
public class MesosBlockFetchingListener implements BlockFetchingListener {
private final String mesosTaskId;
private final MetricRegistry metrics;
public MesosBlockFetchingListener(String mesosTaskId, MetricRegistry metrics) {
this.mesosTaskId = mesosTaskId;
this.metrics = metrics;
}
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Mesos task " + mesosTaskId + " successfully fetched block: " + blockId);
// Update Mesos-specific metrics
metrics.counter("mesos.shuffle.blocks.success").inc();
metrics.histogram("mesos.shuffle.block.size").update(data.size());
try {
// Process the shuffle data
processShuffleBlock(blockId, data);
} finally {
data.release();
}
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Mesos task " + mesosTaskId + " failed to fetch block: " + blockId);
System.err.println("Error: " + exception.getMessage());
// Update failure metrics
metrics.counter("mesos.shuffle.blocks.failure").inc();
// Handle Mesos-specific error scenarios
if (exception instanceof IOException) {
// Network issues in Mesos cluster
handleMesosNetworkError(blockId, exception);
} else if (exception.getMessage().contains("authentication")) {
// Authentication issues with Mesos shuffle service
handleMesosAuthError(blockId, exception);
}
}
private void processShuffleBlock(String blockId, ManagedBuffer data) {
// Mesos-specific block processing logic
System.out.println("Processing block " + blockId + " in Mesos environment");
}
private void handleMesosNetworkError(String blockId, Throwable exception) {
System.err.println("Mesos network error for block " + blockId + ": " + exception.getMessage());
// Implement Mesos-specific retry or failover logic
}
private void handleMesosAuthError(String blockId, Throwable exception) {
System.err.println("Mesos authentication error for block " + blockId + ": " + exception.getMessage());
// Implement Mesos-specific authentication recovery
}
}
// Example 3: Mesos protocol message handling
public class MesosProtocolExample {
public void demonstrateProtocolMessages() {
String appId = "mesos-app-001";
// Create driver registration message
long heartbeatTimeout = 60000; // 60 seconds
RegisterDriver registerDriver = new RegisterDriver(appId, heartbeatTimeout);
System.out.println("Driver registration message:");
System.out.println(" App ID: " + registerDriver.getAppId());
System.out.println(" Heartbeat Timeout: " + registerDriver.getHeartbeatTimeoutMs() + "ms");
// Serialize for network transmission
ByteBuffer serializedRegister = registerDriver.toByteBuffer();
System.out.println("Serialized size: " + serializedRegister.remaining() + " bytes");
// Create heartbeat message
ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat(appId);
System.out.println("Heartbeat message for app: " + heartbeat.getAppId());
// Serialize heartbeat
ByteBuffer serializedHeartbeat = heartbeat.toByteBuffer();
System.out.println("Heartbeat serialized size: " + serializedHeartbeat.remaining() + " bytes");
// Demonstrate message deserialization
BlockTransferMessage deserializedRegister =
BlockTransferMessage.Decoder.fromByteBuffer(serializedRegister);
if (deserializedRegister instanceof RegisterDriver) {
RegisterDriver reg = (RegisterDriver) deserializedRegister;
System.out.println("Deserialized driver registration: " + reg.getAppId());
}
}
}
// Example 4: Mesos deployment configuration
public class MesosDeploymentConfig {
public MesosExternalShuffleClient createConfiguredClient() {
// Transport configuration with Mesos-specific settings
Properties props = new Properties();
props.setProperty("spark.shuffle.io.connectionTimeout", "30s");
props.setProperty("spark.shuffle.io.numConnectionsPerPeer", "2");
props.setProperty("spark.mesos.executor.home", "/opt/spark");
props.setProperty("spark.mesos.principal", "spark-principal");
TransportConf conf = new TransportConf("shuffle", ConfigProvider.fromProperties(props));
// Security configuration for Mesos
ShuffleSecretManager secretManager = new ShuffleSecretManager();
String appSecret = loadSecretFromMesosSecret();
secretManager.registerApp("mesos-spark-app", appSecret);
// Create client with Mesos-optimized settings
return new MesosExternalShuffleClient(
conf, secretManager, true, 15000 // Longer timeout for Mesos
);
}
private String loadSecretFromMesosSecret() {
// Load secret from Mesos secret store or environment
return System.getenv("MESOS_SHUFFLE_SECRET");
}
}
// Example 5: Mesos failure handling and recovery
public class MesosFailureHandling {
private MesosExternalShuffleClient client;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void setupClientWithFailureHandling(String appId) {
// Create client with failure detection
client = createMesosClient(appId);
// Schedule periodic health checks
scheduler.scheduleAtFixedRate(
() -> checkClientHealth(),
30, 30, TimeUnit.SECONDS
);
// Register shutdown hook for cleanup
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down Mesos shuffle client...");
scheduler.shutdown();
if (client != null) {
client.close();
}
}));
}
private void checkClientHealth() {
try {
// Check if client is still responsive
MetricSet metrics = client.shuffleMetrics();
System.out.println("Client health check passed, metrics: " + metrics);
} catch (Exception e) {
System.err.println("Client health check failed: " + e.getMessage());
// Implement recovery logic
recoverClient();
}
}
private void recoverClient() {
System.out.println("Attempting to recover Mesos shuffle client...");
try {
// Close existing client
if (client != null) {
client.close();
}
// Recreate client
client = createMesosClient("recovered-app-id");
// Re-register with shuffle service
client.registerDriverWithShuffleService("mesos-shuffle", 7337, 60000, 30000);
System.out.println("Mesos shuffle client recovery successful");
} catch (Exception e) {
System.err.println("Failed to recover Mesos shuffle client: " + e.getMessage());
}
}
private MesosExternalShuffleClient createMesosClient(String appId) {
// Implementation for creating configured Mesos client
TransportConf conf = new TransportConf("shuffle");
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp(appId, "recovery-secret");
return new MesosExternalShuffleClient(conf, secretManager, true, 10000);
}
}Key configuration parameters for Mesos deployments:
spark.mesos.executor.home - Spark home directory in Mesos executorsspark.mesos.principal - Mesos principal for authenticationspark.mesos.secret - Mesos secret for authenticationspark.shuffle.service.enabled - Enable external shuffle servicespark.dynamicAllocation.enabled - Enable dynamic allocation in MesosDriver Registration:
Heartbeat Mechanism:
Resource Management:
Security Integration:
Common issues and solutions:
Registration Failures:
Heartbeat Issues:
Task Failures:
Performance Issues: