Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.
Base class for all shuffle protocol messages.
/**
* Base class for all shuffle protocol messages
*/
public abstract class BlockTransferMessage implements Encodable {
/**
* Convert the message to a ByteBuffer for network transmission
* @return ByteBuffer containing the serialized message
*/
public ByteBuffer toByteBuffer();
/**
* Enumeration of all supported message types
*/
public enum Type {
OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,
REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM
}
/**
* Decoder for deserializing messages from ByteBuffer
*/
public static class Decoder {
/**
* Deserialize a message from ByteBuffer
* @param msg - ByteBuffer containing serialized message
* @return Deserialized BlockTransferMessage
*/
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
}
}Contains executor configuration for locating shuffle files.
/**
* Contains executor configuration for locating shuffle files
*/
public class ExecutorShuffleInfo implements Encodable {
/**
* Local directories where shuffle files are stored
*/
public final String[] localDirs;
/**
* Number of subdirectories per local directory
*/
public final int subDirsPerLocalDir;
/**
* Shuffle manager class name
*/
public final String shuffleManager;
/**
* Create executor shuffle information
* @param localDirs - Array of local directory paths for shuffle files
* @param subDirsPerLocalDir - Number of subdirectories per local directory
* @param shuffleManager - Name of the shuffle manager implementation
*/
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Request to read a set of blocks.
/**
* Request to read a set of blocks from the shuffle service
*/
public class OpenBlocks extends BlockTransferMessage {
/**
* Application ID
*/
public final String appId;
/**
* Executor ID
*/
public final String execId;
/**
* Array of block IDs to open
*/
public final String[] blockIds;
/**
* Create an open blocks request
* @param appId - Application ID
* @param execId - Executor ID
* @param blockIds - Array of block IDs to request
*/
public OpenBlocks(String appId, String execId, String[] blockIds);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Initial registration message between executor and shuffle server.
/**
* Initial registration message between executor and shuffle server
*/
public class RegisterExecutor extends BlockTransferMessage {
/**
* Application ID
*/
public final String appId;
/**
* Executor ID
*/
public final String execId;
/**
* Executor shuffle configuration information
*/
public final ExecutorShuffleInfo executorInfo;
/**
* Create an executor registration message
* @param appId - Application ID
* @param execId - Executor ID
* @param executorInfo - Executor shuffle configuration
*/
public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Identifier for fixed number of chunks in a stream.
/**
* Identifier for a fixed number of chunks in a stream
*/
public class StreamHandle extends BlockTransferMessage {
/**
* Stream identifier
*/
public final long streamId;
/**
* Number of chunks in the stream
*/
public final int numChunks;
/**
* Create a stream handle
* @param streamId - Unique stream identifier
* @param numChunks - Number of chunks in the stream
*/
public StreamHandle(long streamId, int numChunks);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Request to upload a block with storage level.
/**
* Request to upload a block with storage level
*/
public class UploadBlock extends BlockTransferMessage {
/**
* Application ID
*/
public final String appId;
/**
* Executor ID
*/
public final String execId;
/**
* Block ID
*/
public final String blockId;
/**
* Block metadata
*/
public final byte[] metadata;
/**
* Block data
*/
public final byte[] blockData;
/**
* Create an upload block request
* @param appId - Application ID
* @param execId - Executor ID
* @param blockId - Block ID to upload
* @param metadata - Block metadata
* @param blockData - Block data bytes
*/
public UploadBlock(String appId, String execId, String blockId, byte[] metadata, byte[] blockData);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Request to upload block as stream.
/**
* Request to upload block as stream
*/
public class UploadBlockStream extends BlockTransferMessage {
/**
* Block ID
*/
public final String blockId;
/**
* Block metadata
*/
public final byte[] metadata;
/**
* Create an upload block stream request
* @param blockId - Block ID to upload
* @param metadata - Block metadata
*/
public UploadBlockStream(String blockId, byte[] metadata);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Message for driver registration with Mesos external shuffle service.
/**
* Message for driver registration with Mesos external shuffle service
*/
public class RegisterDriver extends BlockTransferMessage {
/**
* Create a driver registration message
* @param appId - Application ID
* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
*/
public RegisterDriver(String appId, long heartbeatTimeoutMs);
/**
* Get the application ID
* @return Application ID
*/
public String getAppId();
/**
* Get the heartbeat timeout
* @return Heartbeat timeout in milliseconds
*/
public long getHeartbeatTimeoutMs();
public boolean equals(Object other);
public int hashCode();
public String toString();
}Heartbeat message from driver to Mesos external shuffle service.
/**
* Heartbeat message from driver to Mesos external shuffle service
*/
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
/**
* Create a heartbeat message
* @param appId - Application ID
*/
public ShuffleServiceHeartbeat(String appId);
/**
* Get the application ID
* @return Application ID
*/
public String getAppId();
public boolean equals(Object other);
public int hashCode();
public String toString();
}Usage Examples:
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.shuffle.protocol.mesos.*;
// Example 1: Executor registration
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
RegisterExecutor registerMsg = new RegisterExecutor("app-001", "executor-1", executorInfo);
// Serialize for network transmission
ByteBuffer serialized = registerMsg.toByteBuffer();
System.out.println("Serialized registration message: " + serialized.remaining() + " bytes");
// Deserialize received message
BlockTransferMessage received = BlockTransferMessage.Decoder.fromByteBuffer(serialized);
if (received instanceof RegisterExecutor) {
RegisterExecutor regMsg = (RegisterExecutor) received;
System.out.println("Received registration for app: " + regMsg.appId +
", executor: " + regMsg.execId);
}
// Example 2: Block request
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};
OpenBlocks openBlocks = new OpenBlocks("app-001", "executor-1", blockIds);
ByteBuffer openBlocksBuffer = openBlocks.toByteBuffer();
System.out.println("Open blocks request size: " + openBlocksBuffer.remaining() + " bytes");
// Example 3: Stream handling
StreamHandle streamHandle = new StreamHandle(12345L, 3);
System.out.println("Stream handle: ID=" + streamHandle.streamId +
", chunks=" + streamHandle.numChunks);
// Example 4: Block upload
byte[] metadata = "block-metadata".getBytes();
byte[] blockData = "actual-block-data-here".getBytes();
UploadBlock uploadBlock = new UploadBlock("app-001", "executor-1", "block-123", metadata, blockData);
System.out.println("Upload block: " + uploadBlock.blockId +
", metadata size: " + uploadBlock.metadata.length +
", data size: " + uploadBlock.blockData.length);
// Example 5: Mesos driver registration
RegisterDriver driverReg = new RegisterDriver("app-001", 30000L);
System.out.println("Mesos driver registration: app=" + driverReg.getAppId() +
", timeout=" + driverReg.getHeartbeatTimeoutMs() + "ms");
// Example 6: Heartbeat
ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat("app-001");
System.out.println("Heartbeat for app: " + heartbeat.getAppId());The typical message flow between shuffle clients and servers:
Registration Phase:
RegisterExecutor with executor configurationBlock Request Phase:
OpenBlocks with list of required block IDsStreamHandle containing stream informationData Transfer Phase:
Upload Operations (if needed):
UploadBlock or UploadBlockStream for block storageMesos-Specific Flow:
RegisterDriver to establish connectionShuffleServiceHeartbeat messages maintain connectionAll protocol messages implement the Encodable interface and provide:
Protocol messages include built-in error handling: