Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.
RPC handler for serving shuffle blocks from external shuffle service.
/**
* RPC handler for serving shuffle blocks from external shuffle service
*/
public class ExternalShuffleBlockHandler extends RpcHandler {
/**
* Create an external shuffle block handler
* @param conf - Transport configuration
* @param registeredExecutorFile - File containing registered executor information
*/
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);
/**
* Create an external shuffle block handler for testing
* @param streamManager - Stream manager for handling streams
* @param blockManager - Block resolver for resolving block requests
*/
public ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager, ExternalShuffleBlockResolver blockManager
);
/**
* Handle incoming RPC messages
* @param client - Transport client that sent the message
* @param message - The message bytes
* @param callback - Callback for sending response
*/
@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
/**
* Get all metrics from the handler
* @return MetricSet containing all shuffle server metrics
*/
public MetricSet getAllMetrics();
/**
* Get the stream manager used by this handler
* @return StreamManager instance
*/
@Override
public StreamManager getStreamManager();
/**
* Handle application removal cleanup
* @param appId - Application ID to clean up
* @param cleanupLocalDirs - Whether to clean up local directories
*/
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
/**
* Handle executor removal cleanup
* @param executorId - Executor ID to clean up
* @param appId - Application ID the executor belongs to
*/
public void executorRemoved(String executorId, String appId);
/**
* Re-register an executor with updated information
* @param appExecId - Combined application and executor ID
* @param executorInfo - Updated executor shuffle information
*/
public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo);
/**
* Close the handler and clean up resources
*/
public void close();
}Manages converting shuffle block IDs to physical file segments.
/**
* Manages converting shuffle block IDs to physical file segments
*/
public class ExternalShuffleBlockResolver {
/**
* Create an external shuffle block resolver
* @param conf - Transport configuration
* @param registeredExecutorFile - File containing registered executor information
*/
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);
/**
* Get the number of registered executors
* @return Number of currently registered executors
*/
public int getRegisteredExecutorsSize();
/**
* Register an executor with its shuffle configuration
* @param appId - Application ID
* @param execId - Executor ID
* @param executorInfo - Executor shuffle configuration information
*/
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
/**
* Get block data for a specific shuffle block
* @param appId - Application ID
* @param execId - Executor ID
* @param shuffleId - Shuffle ID
* @param mapId - Map task ID
* @param reduceId - Reduce task ID
* @return ManagedBuffer containing the block data
*/
public ManagedBuffer getBlockData(
String appId, String execId, int shuffleId, int mapId, int reduceId
);
/**
* Handle application removal and cleanup
* @param appId - Application ID to remove
* @param cleanupLocalDirs - Whether to clean up local directories
*/
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
/**
* Handle executor removal and cleanup
* @param executorId - Executor ID to remove
* @param appId - Application ID the executor belongs to
*/
public void executorRemoved(String executorId, String appId);
/**
* Close the resolver and clean up resources
*/
public void close();
/**
* Combined application and executor ID
*/
public static class AppExecId {
public final String appId;
public final String execId;
public AppExecId(String appId, String execId);
public boolean equals(Object other);
public int hashCode();
public String toString();
}
}Keeps index information for map output as in-memory buffer.
/**
* Keeps index information for map output as in-memory buffer
*/
public class ShuffleIndexInformation {
/**
* Create shuffle index information from an index file
* @param indexFile - The shuffle index file to read
*/
public ShuffleIndexInformation(File indexFile);
/**
* Get the number of index entries
* @return Number of index entries
*/
public int getSize();
/**
* Get index record for a specific reduce ID
* @param reduceId - Reduce task ID
* @return ShuffleIndexRecord containing offset and length information
*/
public ShuffleIndexRecord getIndex(int reduceId);
}Contains offset and length of shuffle block data.
/**
* Contains offset and length of shuffle block data
*/
public class ShuffleIndexRecord {
/**
* Create a shuffle index record
* @param offset - Byte offset in the shuffle data file
* @param length - Length of the data block in bytes
*/
public ShuffleIndexRecord(long offset, long length);
/**
* Get the byte offset of the block
* @return Byte offset in the shuffle data file
*/
public long getOffset();
/**
* Get the length of the block
* @return Length of the data block in bytes
*/
public long getLength();
}Usage Examples:
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.TransportConf;
// Create transport configuration for shuffle server
TransportConf conf = new TransportConf("shuffle");
// Create file for storing registered executor information
File registeredExecutorFile = new File("/tmp/registered-executors.db");
// Create block resolver for handling block requests
ExternalShuffleBlockResolver blockResolver = new ExternalShuffleBlockResolver(conf, registeredExecutorFile);
// Create RPC handler
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(conf, registeredExecutorFile);
// Register an executor
String appId = "app-20231201-001";
String execId = "executor-1";
String[] localDirs = {"/tmp/spark-local-20231201-001/1", "/tmp/spark-local-20231201-001/2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
blockResolver.registerExecutor(appId, execId, executorInfo);
System.out.println("Registered executors: " + blockResolver.getRegisteredExecutorsSize());
// Retrieve block data
try {
ManagedBuffer blockData = blockResolver.getBlockData(appId, execId, 1, 0, 0);
System.out.println("Retrieved block data, size: " + blockData.size() + " bytes");
// Process the block data
try (InputStream dataStream = blockData.createInputStream()) {
// Process the shuffle block data
processShuffleBlock(dataStream);
}
// Important: release the buffer
blockData.release();
} catch (Exception e) {
System.err.println("Error retrieving block data: " + e.getMessage());
}
// Monitor server metrics
MetricSet serverMetrics = handler.getAllMetrics();
System.out.println("Server metrics: " + serverMetrics);
// Handle application cleanup
handler.applicationRemoved(appId, true);
blockResolver.applicationRemoved(appId, true);
// Clean up resources
handler.close();
blockResolver.close();The shuffle server components can be configured through various Transport configuration parameters:
spark.shuffle.service.enabled - Enable external shuffle servicespark.shuffle.service.port - Port for the external shuffle servicespark.shuffle.service.index.cache.size - Size of index cachespark.shuffle.service.db.backend - Database backend for executor registrationspark.shuffle.maxChunksBeingTransferred - Maximum chunks being transferred simultaneouslyThe server components provide comprehensive error handling and metrics:
getAllMetrics() to monitor server performanceThe block resolution process follows these steps: