The protocol message classes define the communication format between shuffle clients and servers. All messages extend BlockTransferMessage and support serialization to/from byte buffers for network transmission.
public abstract class BlockTransferMessage implements Encodable {
protected abstract Type type();
public ByteBuffer toByteBuffer();
public static class Decoder {
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
}
public static enum Type {
OPEN_BLOCKS(0),
UPLOAD_BLOCK(1),
REGISTER_EXECUTOR(2),
STREAM_HANDLE(3),
REGISTER_DRIVER(4);
public byte id();
}
}Abstract base class for all network protocol messages. Provides serialization and deserialization capabilities.
Key Methods:
Serializes the message to a byte buffer for network transmission.
Returns:
ByteBuffer: Serialized message with type byte prefixDeserializes a message from a byte buffer received from the network.
Parameters:
msg (ByteBuffer): Serialized message dataReturns:
BlockTransferMessage: Deserialized message instanceThrows:
IllegalArgumentException: If message type is unknownpublic class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;
public final int subDirsPerLocalDir;
public final String shuffleManager;
public ExecutorShuffleInfo(
String[] localDirs,
int subDirsPerLocalDir,
String shuffleManager
);
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static ExecutorShuffleInfo decode(ByteBuf buf);
}Configuration data describing where an executor stores its shuffle files and how they are organized.
Fields:
localDirs (String[]): Base local directories where shuffle files are storedsubDirsPerLocalDir (int): Number of subdirectories created within each local directoryshuffleManager (String): Fully qualified class name of the shuffle manager (e.g., "org.apache.spark.shuffle.sort.SortShuffleManager")public class RegisterExecutor extends BlockTransferMessage {
public final String appId;
public final String execId;
public final ExecutorShuffleInfo executorInfo;
public RegisterExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo
);
protected Type type(); // Returns REGISTER_EXECUTOR
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static RegisterExecutor decode(ByteBuf buf);
}Initial registration message sent from executor to shuffle server. Contains information needed to locate the executor's shuffle files.
Fields:
appId (String): Spark application identifierexecId (String): Executor identifier within the applicationexecutorInfo (ExecutorShuffleInfo): Configuration describing shuffle file locationspublic class RegisterDriver extends BlockTransferMessage {
public final String appId;
public RegisterDriver(String appId);
protected Type type(); // Returns REGISTER_DRIVER
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static RegisterDriver decode(ByteBuf buf);
}Mesos-specific message for registering the Spark driver with the shuffle service for proper cleanup.
Fields:
appId (String): Spark application identifierpublic class OpenBlocks extends BlockTransferMessage {
public final String appId;
public final String execId;
public final String[] blockIds;
public OpenBlocks(String appId, String execId, String[] blockIds);
protected Type type(); // Returns OPEN_BLOCKS
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static OpenBlocks decode(ByteBuf buf);
}Request to open and read a set of shuffle blocks. The server responds with a StreamHandle containing stream information.
Fields:
appId (String): Application that owns the blocksexecId (String): Executor that wrote the blocksblockIds (String[]): Array of block identifiers to readpublic class StreamHandle extends BlockTransferMessage {
public final long streamId;
public final int numChunks;
public StreamHandle(long streamId, int numChunks);
protected Type type(); // Returns STREAM_HANDLE
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static StreamHandle decode(ByteBuf buf);
}Response to OpenBlocks request containing information about the opened stream for reading block data.
Fields:
streamId (long): Unique identifier for the data streamnumChunks (int): Number of chunks (blocks) available in the streampublic class UploadBlock extends BlockTransferMessage {
public final String appId;
public final String execId;
public final String blockId;
public final byte[] metadata;
public final byte[] blockData;
public UploadBlock(
String appId,
String execId,
String blockId,
byte[] metadata,
byte[] blockData
);
protected Type type(); // Returns UPLOAD_BLOCK
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static UploadBlock decode(ByteBuf buf);
}Request to upload a block with associated metadata. Used by NettyBlockTransferService but not typically by external shuffle service.
Fields:
appId (String): Application identifierexecId (String): Executor identifierblockId (String): Block identifiermetadata (byte[]): Serialized block metadata (typically StorageLevel)blockData (byte[]): Raw block data bytes// Create executor shuffle info
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
new String[]{"/tmp/spark-shuffle"},
64,
"org.apache.spark.shuffle.sort.SortShuffleManager"
);
// Create registration message
RegisterExecutor registerMsg = new RegisterExecutor("app-123", "executor-1", info);
// Serialize for network transmission
ByteBuffer serialized = registerMsg.toByteBuffer();// Request multiple blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
OpenBlocks openMsg = new OpenBlocks("app-123", "executor-1", blockIds);
// Serialize message
ByteBuffer requestData = openMsg.toByteBuffer();// Received message from network
ByteBuffer receivedData = ...; // from network
// Deserialize
BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(receivedData);
// Handle based on type
if (message instanceof OpenBlocks) {
OpenBlocks openBlocks = (OpenBlocks) message;
System.out.println("Request for blocks: " + Arrays.toString(openBlocks.blockIds));
} else if (message instanceof RegisterExecutor) {
RegisterExecutor register = (RegisterExecutor) message;
System.out.println("Executor registration: " + register.execId);
} else if (message instanceof StreamHandle) {
StreamHandle handle = (StreamHandle) message;
System.out.println("Stream " + handle.streamId + " with " + handle.numChunks + " chunks");
}// Mesos-specific driver registration
RegisterDriver driverMsg = new RegisterDriver("spark-app-123");
ByteBuffer driverRegistration = driverMsg.toByteBuffer();
// Send to shuffle service for cleanup coordinationNormal Block Fetch Flow:
RegisterExecutor (one-time setup)OpenBlocks with block IDs to fetchStreamHandle containing stream infoMesos Cleanup Flow:
RegisterDriver for cleanup coordinationBlock Upload Flow (for NettyBlockTransferService):
UploadBlock with block data and metadataProtocol-level errors are typically handled through:
IllegalArgumentExceptiontry {
BlockTransferMessage msg = BlockTransferMessage.Decoder.fromByteBuffer(data);
// Process message...
} catch (IllegalArgumentException e) {
System.err.println("Unknown message type or corrupted data: " + e.getMessage());
// Handle protocol error
}