or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md
tile.json

mesos.mddocs/

Mesos Integration

Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.

Capabilities

MesosExternalShuffleClient

External shuffle client for Mesos coarse-grained mode.

/**
 * External shuffle client for Mesos coarse-grained mode
 * Extends ExternalShuffleClient with Mesos-specific functionality
 */
public class MesosExternalShuffleClient extends ExternalShuffleClient {
    /**
     * Create a Mesos external shuffle client
     * @param conf - Transport configuration
     * @param secretKeyHolder - Secret key holder for authentication
     * @param authEnabled - Whether authentication is enabled
     * @param registrationTimeoutMs - Timeout for registration operations in milliseconds
     */
    public MesosExternalShuffleClient(
        TransportConf conf, SecretKeyHolder secretKeyHolder,
        boolean authEnabled, long registrationTimeoutMs
    );
    
    /**
     * Register driver with the Mesos external shuffle service
     * @param host - Host name of the Mesos shuffle service
     * @param port - Port number of the Mesos shuffle service
     * @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
     * @param heartbeatIntervalMs - Heartbeat interval in milliseconds
     * @throws IOException if connection fails
     * @throws InterruptedException if registration is interrupted
     */
    public void registerDriverWithShuffleService(
        String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs
    ) throws IOException, InterruptedException;
    
    /**
     * Close the client and clean up resources
     * Stops heartbeat mechanism and closes connections
     */
    @Override
    public void close();
}

Mesos Protocol Messages

RegisterDriver Message

Message for driver registration with Mesos external shuffle service.

/**
 * Message for driver registration with Mesos external shuffle service
 */
public class RegisterDriver extends BlockTransferMessage {
    /**
     * Create a driver registration message
     * @param appId - Application ID
     * @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
     */
    public RegisterDriver(String appId, long heartbeatTimeoutMs);
    
    /**
     * Get the application ID
     * @return Application ID
     */
    public String getAppId();
    
    /**
     * Get the heartbeat timeout
     * @return Heartbeat timeout in milliseconds
     */
    public long getHeartbeatTimeoutMs();
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

ShuffleServiceHeartbeat Message

Heartbeat message from driver to Mesos external shuffle service.

/**
 * Heartbeat message from driver to Mesos external shuffle service
 */
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
    /**
     * Create a heartbeat message
     * @param appId - Application ID
     */
    public ShuffleServiceHeartbeat(String appId);
    
    /**
     * Get the application ID
     * @return Application ID
     */
    public String getAppId();
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

Usage Examples:

import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient;
import org.apache.spark.network.shuffle.protocol.mesos.*;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;

// Example 1: Basic Mesos shuffle client setup
public class MesosShuffleClientExample {
    public void setupMesosShuffleClient() {
        // Create transport configuration for Mesos environment
        TransportConf conf = new TransportConf("shuffle");
        
        // Set up authentication for Mesos deployment
        ShuffleSecretManager secretManager = new ShuffleSecretManager();
        String appId = "mesos-app-20231201-001";
        String appSecret = "mesos-shuffle-secret-123";
        secretManager.registerApp(appId, appSecret);
        
        // Create Mesos external shuffle client
        MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(
            conf, secretManager, true, 10000  // 10 second registration timeout
        );
        
        // Initialize the client for the application
        mesosClient.init(appId);
        
        // Register driver with Mesos shuffle service
        String mesosShuffleHost = "mesos-shuffle-service.cluster.local";
        int mesosShufflePort = 7337;
        long heartbeatTimeoutMs = 60000;  // 60 seconds
        long heartbeatIntervalMs = 30000; // 30 seconds
        
        mesosClient.registerDriverWithShuffleService(
            mesosShuffleHost, mesosShufflePort, 
            heartbeatTimeoutMs, heartbeatIntervalMs
        );
        
        System.out.println("Mesos shuffle client registered with service at " + 
                          mesosShuffleHost + ":" + mesosShufflePort);
        
        // Use the client for normal shuffle operations
        performShuffleOperations(mesosClient, appId);
        
        // Clean up when done
        mesosClient.close();
        secretManager.unregisterApp(appId);
    }
    
    private void performShuffleOperations(MesosExternalShuffleClient client, String appId) {
        // Register executors
        String[] localDirs = {"/mesos/work/spark-1", "/mesos/work/spark-2"};
        ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
        
        client.registerWithShuffleServer("executor-host-1", 7337, "executor-1", executorInfo);
        client.registerWithShuffleServer("executor-host-2", 7337, "executor-2", executorInfo);
        
        // Fetch shuffle blocks
        BlockFetchingListener listener = new MesosBlockFetchingListener();
        String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
        
        client.fetchBlocks("executor-host-1", 7337, "executor-1", blockIds, listener, null);
    }
}

// Example 2: Custom Mesos block fetching listener
public class MesosBlockFetchingListener implements BlockFetchingListener {
    private final String mesosTaskId;
    private final MetricRegistry metrics;
    
    public MesosBlockFetchingListener(String mesosTaskId, MetricRegistry metrics) {
        this.mesosTaskId = mesosTaskId;
        this.metrics = metrics;
    }
    
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Mesos task " + mesosTaskId + " successfully fetched block: " + blockId);
        
        // Update Mesos-specific metrics
        metrics.counter("mesos.shuffle.blocks.success").inc();
        metrics.histogram("mesos.shuffle.block.size").update(data.size());
        
        try {
            // Process the shuffle data
            processShuffleBlock(blockId, data);
        } finally {
            data.release();
        }
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Mesos task " + mesosTaskId + " failed to fetch block: " + blockId);
        System.err.println("Error: " + exception.getMessage());
        
        // Update failure metrics
        metrics.counter("mesos.shuffle.blocks.failure").inc();
        
        // Handle Mesos-specific error scenarios
        if (exception instanceof IOException) {
            // Network issues in Mesos cluster
            handleMesosNetworkError(blockId, exception);
        } else if (exception.getMessage().contains("authentication")) {
            // Authentication issues with Mesos shuffle service
            handleMesosAuthError(blockId, exception);
        }
    }
    
    private void processShuffleBlock(String blockId, ManagedBuffer data) {
        // Mesos-specific block processing logic
        System.out.println("Processing block " + blockId + " in Mesos environment");
    }
    
    private void handleMesosNetworkError(String blockId, Throwable exception) {
        System.err.println("Mesos network error for block " + blockId + ": " + exception.getMessage());
        // Implement Mesos-specific retry or failover logic
    }
    
    private void handleMesosAuthError(String blockId, Throwable exception) {
        System.err.println("Mesos authentication error for block " + blockId + ": " + exception.getMessage());
        // Implement Mesos-specific authentication recovery
    }
}

// Example 3: Mesos protocol message handling
public class MesosProtocolExample {
    public void demonstrateProtocolMessages() {
        String appId = "mesos-app-001";
        
        // Create driver registration message
        long heartbeatTimeout = 60000; // 60 seconds
        RegisterDriver registerDriver = new RegisterDriver(appId, heartbeatTimeout);
        
        System.out.println("Driver registration message:");
        System.out.println("  App ID: " + registerDriver.getAppId());
        System.out.println("  Heartbeat Timeout: " + registerDriver.getHeartbeatTimeoutMs() + "ms");
        
        // Serialize for network transmission
        ByteBuffer serializedRegister = registerDriver.toByteBuffer();
        System.out.println("Serialized size: " + serializedRegister.remaining() + " bytes");
        
        // Create heartbeat message
        ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat(appId);
        System.out.println("Heartbeat message for app: " + heartbeat.getAppId());
        
        // Serialize heartbeat
        ByteBuffer serializedHeartbeat = heartbeat.toByteBuffer();
        System.out.println("Heartbeat serialized size: " + serializedHeartbeat.remaining() + " bytes");
        
        // Demonstrate message deserialization
        BlockTransferMessage deserializedRegister = 
            BlockTransferMessage.Decoder.fromByteBuffer(serializedRegister);
        
        if (deserializedRegister instanceof RegisterDriver) {
            RegisterDriver reg = (RegisterDriver) deserializedRegister;
            System.out.println("Deserialized driver registration: " + reg.getAppId());
        }
    }
}

// Example 4: Mesos deployment configuration
public class MesosDeploymentConfig {
    public MesosExternalShuffleClient createConfiguredClient() {
        // Transport configuration with Mesos-specific settings
        Properties props = new Properties();
        props.setProperty("spark.shuffle.io.connectionTimeout", "30s");
        props.setProperty("spark.shuffle.io.numConnectionsPerPeer", "2");
        props.setProperty("spark.mesos.executor.home", "/opt/spark");
        props.setProperty("spark.mesos.principal", "spark-principal");
        
        TransportConf conf = new TransportConf("shuffle", ConfigProvider.fromProperties(props));
        
        // Security configuration for Mesos
        ShuffleSecretManager secretManager = new ShuffleSecretManager();
        String appSecret = loadSecretFromMesosSecret();
        secretManager.registerApp("mesos-spark-app", appSecret);
        
        // Create client with Mesos-optimized settings
        return new MesosExternalShuffleClient(
            conf, secretManager, true, 15000  // Longer timeout for Mesos
        );
    }
    
    private String loadSecretFromMesosSecret() {
        // Load secret from Mesos secret store or environment
        return System.getenv("MESOS_SHUFFLE_SECRET");
    }
}

// Example 5: Mesos failure handling and recovery
public class MesosFailureHandling {
    private MesosExternalShuffleClient client;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    public void setupClientWithFailureHandling(String appId) {
        // Create client with failure detection
        client = createMesosClient(appId);
        
        // Schedule periodic health checks
        scheduler.scheduleAtFixedRate(
            () -> checkClientHealth(),
            30, 30, TimeUnit.SECONDS
        );
        
        // Register shutdown hook for cleanup
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down Mesos shuffle client...");
            scheduler.shutdown();
            if (client != null) {
                client.close();
            }
        }));
    }
    
    private void checkClientHealth() {
        try {
            // Check if client is still responsive
            MetricSet metrics = client.shuffleMetrics();
            System.out.println("Client health check passed, metrics: " + metrics);
        } catch (Exception e) {
            System.err.println("Client health check failed: " + e.getMessage());
            // Implement recovery logic
            recoverClient();
        }
    }
    
    private void recoverClient() {
        System.out.println("Attempting to recover Mesos shuffle client...");
        try {
            // Close existing client
            if (client != null) {
                client.close();
            }
            
            // Recreate client
            client = createMesosClient("recovered-app-id");
            
            // Re-register with shuffle service
            client.registerDriverWithShuffleService("mesos-shuffle", 7337, 60000, 30000);
            
            System.out.println("Mesos shuffle client recovery successful");
        } catch (Exception e) {
            System.err.println("Failed to recover Mesos shuffle client: " + e.getMessage());
        }
    }
    
    private MesosExternalShuffleClient createMesosClient(String appId) {
        // Implementation for creating configured Mesos client
        TransportConf conf = new TransportConf("shuffle");
        ShuffleSecretManager secretManager = new ShuffleSecretManager();
        secretManager.registerApp(appId, "recovery-secret");
        
        return new MesosExternalShuffleClient(conf, secretManager, true, 10000);
    }
}

Mesos-Specific Configuration

Key configuration parameters for Mesos deployments:

  • spark.mesos.executor.home - Spark home directory in Mesos executors
  • spark.mesos.principal - Mesos principal for authentication
  • spark.mesos.secret - Mesos secret for authentication
  • spark.shuffle.service.enabled - Enable external shuffle service
  • spark.dynamicAllocation.enabled - Enable dynamic allocation in Mesos

Deployment Considerations

  1. Driver Registration:

    • Driver must register with Mesos shuffle service before executor operations
    • Registration includes heartbeat configuration for connection monitoring
    • Failed registration prevents shuffle operations
  2. Heartbeat Mechanism:

    • Periodic heartbeats maintain connection with Mesos shuffle service
    • Heartbeat failures trigger connection recovery
    • Configurable timeout and interval settings
  3. Resource Management:

    • Integration with Mesos resource allocation
    • Proper cleanup when Mesos tasks are terminated
    • Handling of Mesos executor failures
  4. Security Integration:

    • Works with Mesos authentication mechanisms
    • Supports Mesos secret management
    • Compatible with Mesos SSL/TLS configuration

Troubleshooting Mesos Integration

Common issues and solutions:

  1. Registration Failures:

    • Verify Mesos shuffle service is running and accessible
    • Check network connectivity between driver and shuffle service
    • Validate authentication credentials
  2. Heartbeat Issues:

    • Monitor heartbeat timeout and interval settings
    • Check for network instability affecting heartbeats
    • Verify shuffle service heartbeat handling
  3. Task Failures:

    • Handle Mesos task preemption gracefully
    • Implement proper cleanup for failed executors
    • Monitor Mesos cluster resource availability
  4. Performance Issues:

    • Tune network settings for Mesos environment
    • Optimize shuffle block sizes for Mesos network
    • Monitor Mesos cluster network performance