The server-side components handle incoming shuffle requests, manage executor registrations, and resolve shuffle blocks on the local filesystem. These components run as part of the external shuffle service.
public class ExternalShuffleBlockHandler extends RpcHandler {
public ExternalShuffleBlockHandler(
TransportConf conf,
File registeredExecutorFile
) throws IOException;
public ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockResolver blockManager
);
public void receive(
TransportClient client,
ByteBuffer message,
RpcResponseCallback callback
);
public StreamManager getStreamManager();
}RPC handler for the external shuffle service that processes client requests for shuffle blocks.
Constructor Parameters:
conf (TransportConf): Network transport configurationregisteredExecutorFile (File): File used for persistent executor registration storagestreamManager (OneForOneStreamManager): Stream manager for block transfers (testing constructor)blockManager (ExternalShuffleBlockResolver): Block resolver instance (testing constructor)Key Methods:
Processes incoming RPC messages from shuffle clients. Handles executor registration, block open requests, and other protocol messages.
Parameters:
client (TransportClient): Client connection that sent the messagemessage (ByteBuffer): Serialized protocol messagecallback (RpcResponseCallback): Callback for sending responseReturns the stream manager used for managing block data streams.
Returns:
StreamManager: The stream manager instancepublic class ExternalShuffleBlockResolver {
public ExternalShuffleBlockResolver(
TransportConf conf,
File registeredExecutorFile
) throws IOException;
public void registerExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo
);
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
public ManagedBuffer getBlockData(
String appId,
String execId,
String blockId
) throws IOException;
public void close();
}Manages the mapping between shuffle block IDs and physical file segments on the local filesystem. Handles executor registration and block location resolution.
Constructor Parameters:
conf (TransportConf): Transport configuration for the resolverregisteredExecutorFile (File): File for persisting executor registrations across restartsThrows:
IOException: If unable to initialize persistent storageKey Methods:
Registers an executor's shuffle configuration, storing information about where it writes shuffle files.
Parameters:
appId (String): Spark application IDexecId (String): Executor IDexecutorInfo (ExecutorShuffleInfo): Configuration describing shuffle file locationsCleans up data for a removed Spark application, optionally removing local shuffle directories.
Parameters:
appId (String): Application ID to clean upcleanupLocalDirs (boolean): Whether to delete local shuffle directoriesRetrieves shuffle block data from the local filesystem.
Parameters:
appId (String): Application ID that owns the blockexecId (String): Executor ID that wrote the blockblockId (String): Block identifier to retrieveReturns:
ManagedBuffer: Buffer containing the block dataThrows:
IOException: If block cannot be found or readCloses the resolver and releases resources including persistent storage connections.
public static class AppExecId {
public final String appId;
public final String execId;
public AppExecId(String appId, String execId);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Internal identifier class combining application and executor IDs for tracking registered executors.
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.util.TransportConf;
import java.io.File;
// Create transport configuration
TransportConf conf = new TransportConf("shuffle", new SparkConf());
// Create block handler with persistent storage
File registrationFile = new File("/tmp/spark-shuffle-registrations");
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(
conf,
registrationFile
);
// The handler can now be used with a TransportServerimport org.apache.spark.network.server.OneForOneStreamManager;
// Create components separately for testing or custom configuration
OneForOneStreamManager streamManager = new OneForOneStreamManager();
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(
conf,
new File("/tmp/registrations")
);
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(
streamManager,
resolver
);// Register an executor manually
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
new String[]{"/tmp/spark-local-dir1", "/tmp/spark-local-dir2"},
64, // subdirs per local dir
"org.apache.spark.shuffle.sort.SortShuffleManager"
);
resolver.registerExecutor("app-123", "executor-1", executorInfo);
// Later, retrieve a block
try {
ManagedBuffer blockData = resolver.getBlockData(
"app-123",
"executor-1",
"shuffle_1_2_0"
);
System.out.println("Block data size: " + blockData.size());
// Process block data
byte[] data = new byte[(int) blockData.size()];
blockData.nioByteBuffer().get(data);
// Release buffer when done
blockData.release();
} catch (IOException e) {
System.err.println("Failed to retrieve block: " + e.getMessage());
}// Clean up after application completion
resolver.applicationRemoved("app-123", true); // true = cleanup local directories
// Or clean up without removing directories (for debugging)
resolver.applicationRemoved("app-123", false);import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.TransportServer;
// Create transport context with the block handler
TransportContext context = new TransportContext(conf, handler);
// Create and start server
TransportServer server = context.createServer(7337, Collections.emptyList());
System.out.println("Shuffle service started on port 7337");
// Server will now handle incoming shuffle client requests
// Remember to close when done:
// server.close();
// resolver.close();The block resolver uses LevelDB for persistent storage of executor registrations. This enables recovery of executor metadata across service restarts.
Persistent Storage:
Recovery Behavior:
Common error scenarios:
try {
ManagedBuffer block = resolver.getBlockData("app-1", "exec-1", "shuffle_1_0_0");
// Process block...
} catch (IOException e) {
if (e.getMessage().contains("not found")) {
// Handle missing block
System.err.println("Block not found, may have been cleaned up");
} else {
// Handle other IO errors
System.err.println("IO error reading block: " + e.getMessage());
}
}