or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md
tile.json

shuffle-server.mddocs/

Shuffle Server Components

Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.

Capabilities

ExternalShuffleBlockHandler

RPC handler for serving shuffle blocks from external shuffle service.

/**
 * RPC handler for serving shuffle blocks from external shuffle service
 */
public class ExternalShuffleBlockHandler extends RpcHandler {
    /**
     * Create an external shuffle block handler
     * @param conf - Transport configuration
     * @param registeredExecutorFile - File containing registered executor information
     */
    public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);
    
    /**
     * Create an external shuffle block handler for testing
     * @param streamManager - Stream manager for handling streams
     * @param blockManager - Block resolver for resolving block requests
     */
    public ExternalShuffleBlockHandler(
        OneForOneStreamManager streamManager, ExternalShuffleBlockResolver blockManager
    );
    
    /**
     * Handle incoming RPC messages
     * @param client - Transport client that sent the message
     * @param message - The message bytes
     * @param callback - Callback for sending response
     */
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
    
    /**
     * Get all metrics from the handler
     * @return MetricSet containing all shuffle server metrics
     */
    public MetricSet getAllMetrics();
    
    /**
     * Get the stream manager used by this handler
     * @return StreamManager instance
     */
    @Override
    public StreamManager getStreamManager();
    
    /**
     * Handle application removal cleanup
     * @param appId - Application ID to clean up
     * @param cleanupLocalDirs - Whether to clean up local directories
     */
    public void applicationRemoved(String appId, boolean cleanupLocalDirs);
    
    /**
     * Handle executor removal cleanup
     * @param executorId - Executor ID to clean up
     * @param appId - Application ID the executor belongs to
     */
    public void executorRemoved(String executorId, String appId);
    
    /**
     * Re-register an executor with updated information
     * @param appExecId - Combined application and executor ID
     * @param executorInfo - Updated executor shuffle information
     */
    public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo);
    
    /**
     * Close the handler and clean up resources
     */
    public void close();
}

ExternalShuffleBlockResolver

Manages converting shuffle block IDs to physical file segments.

/**
 * Manages converting shuffle block IDs to physical file segments
 */
public class ExternalShuffleBlockResolver {
    /**
     * Create an external shuffle block resolver
     * @param conf - Transport configuration
     * @param registeredExecutorFile - File containing registered executor information
     */
    public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);
    
    /**
     * Get the number of registered executors
     * @return Number of currently registered executors
     */
    public int getRegisteredExecutorsSize();
    
    /**
     * Register an executor with its shuffle configuration
     * @param appId - Application ID
     * @param execId - Executor ID
     * @param executorInfo - Executor shuffle configuration information
     */
    public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
    
    /**
     * Get block data for a specific shuffle block
     * @param appId - Application ID
     * @param execId - Executor ID
     * @param shuffleId - Shuffle ID
     * @param mapId - Map task ID
     * @param reduceId - Reduce task ID
     * @return ManagedBuffer containing the block data
     */
    public ManagedBuffer getBlockData(
        String appId, String execId, int shuffleId, int mapId, int reduceId
    );
    
    /**
     * Handle application removal and cleanup
     * @param appId - Application ID to remove
     * @param cleanupLocalDirs - Whether to clean up local directories
     */
    public void applicationRemoved(String appId, boolean cleanupLocalDirs);
    
    /**
     * Handle executor removal and cleanup
     * @param executorId - Executor ID to remove
     * @param appId - Application ID the executor belongs to
     */
    public void executorRemoved(String executorId, String appId);
    
    /**
     * Close the resolver and clean up resources
     */
    public void close();
    
    /**
     * Combined application and executor ID
     */
    public static class AppExecId {
        public final String appId;
        public final String execId;
        
        public AppExecId(String appId, String execId);
        public boolean equals(Object other);
        public int hashCode();
        public String toString();
    }
}

ShuffleIndexInformation

Keeps index information for map output as in-memory buffer.

/**
 * Keeps index information for map output as in-memory buffer
 */
public class ShuffleIndexInformation {
    /**
     * Create shuffle index information from an index file
     * @param indexFile - The shuffle index file to read
     */
    public ShuffleIndexInformation(File indexFile);
    
    /**
     * Get the number of index entries
     * @return Number of index entries
     */
    public int getSize();
    
    /**
     * Get index record for a specific reduce ID
     * @param reduceId - Reduce task ID
     * @return ShuffleIndexRecord containing offset and length information
     */
    public ShuffleIndexRecord getIndex(int reduceId);
}

ShuffleIndexRecord

Contains offset and length of shuffle block data.

/**
 * Contains offset and length of shuffle block data
 */
public class ShuffleIndexRecord {
    /**
     * Create a shuffle index record
     * @param offset - Byte offset in the shuffle data file
     * @param length - Length of the data block in bytes
     */
    public ShuffleIndexRecord(long offset, long length);
    
    /**
     * Get the byte offset of the block
     * @return Byte offset in the shuffle data file
     */
    public long getOffset();
    
    /**
     * Get the length of the block
     * @return Length of the data block in bytes
     */
    public long getLength();
}

Usage Examples:

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.TransportConf;

// Create transport configuration for shuffle server
TransportConf conf = new TransportConf("shuffle");

// Create file for storing registered executor information
File registeredExecutorFile = new File("/tmp/registered-executors.db");

// Create block resolver for handling block requests
ExternalShuffleBlockResolver blockResolver = new ExternalShuffleBlockResolver(conf, registeredExecutorFile);

// Create RPC handler
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(conf, registeredExecutorFile);

// Register an executor
String appId = "app-20231201-001";
String execId = "executor-1";
String[] localDirs = {"/tmp/spark-local-20231201-001/1", "/tmp/spark-local-20231201-001/2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");

blockResolver.registerExecutor(appId, execId, executorInfo);
System.out.println("Registered executors: " + blockResolver.getRegisteredExecutorsSize());

// Retrieve block data
try {
    ManagedBuffer blockData = blockResolver.getBlockData(appId, execId, 1, 0, 0);
    System.out.println("Retrieved block data, size: " + blockData.size() + " bytes");
    
    // Process the block data
    try (InputStream dataStream = blockData.createInputStream()) {
        // Process the shuffle block data
        processShuffleBlock(dataStream);
    }
    
    // Important: release the buffer
    blockData.release();
} catch (Exception e) {
    System.err.println("Error retrieving block data: " + e.getMessage());
}

// Monitor server metrics
MetricSet serverMetrics = handler.getAllMetrics();
System.out.println("Server metrics: " + serverMetrics);

// Handle application cleanup
handler.applicationRemoved(appId, true);
blockResolver.applicationRemoved(appId, true);

// Clean up resources
handler.close();
blockResolver.close();

Server Configuration and Deployment

The shuffle server components can be configured through various Transport configuration parameters:

  • spark.shuffle.service.enabled - Enable external shuffle service
  • spark.shuffle.service.port - Port for the external shuffle service
  • spark.shuffle.service.index.cache.size - Size of index cache
  • spark.shuffle.service.db.backend - Database backend for executor registration
  • spark.shuffle.maxChunksBeingTransferred - Maximum chunks being transferred simultaneously

Error Handling and Monitoring

The server components provide comprehensive error handling and metrics:

  1. Metrics Collection: Use getAllMetrics() to monitor server performance
  2. Resource Cleanup: Properly handle application and executor removal
  3. File Management: Automatic cleanup of local directories when configured
  4. Exception Handling: Robust error handling for corrupt or missing files
  5. Authentication: Integration with SASL authentication for secure operations

Block Resolution Process

The block resolution process follows these steps:

  1. Registration: Executors register their shuffle configuration with the server
  2. Block Request: Clients request specific blocks using shuffle/map/reduce IDs
  3. File Location: Server resolves block IDs to physical file locations
  4. Index Lookup: Use shuffle index files to find byte ranges for blocks
  5. Data Retrieval: Read the specific byte range from shuffle data files
  6. Buffer Management: Return data as ManagedBuffer for efficient memory handling