External shuffle service client for Apache Spark that enables reading shuffle blocks from external servers instead of executors
The protocol message classes define the communication format between shuffle clients and servers. All messages extend BlockTransferMessage and support serialization to/from byte buffers for network transmission.
public abstract class BlockTransferMessage implements Encodable {
protected abstract Type type();
public ByteBuffer toByteBuffer();
public static class Decoder {
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
}
public static enum Type {
OPEN_BLOCKS(0),
UPLOAD_BLOCK(1),
REGISTER_EXECUTOR(2),
STREAM_HANDLE(3),
REGISTER_DRIVER(4);
public byte id();
}
}Abstract base class for all network protocol messages. Provides serialization and deserialization capabilities.
Key Methods:
Serializes the message to a byte buffer for network transmission.
Returns:
ByteBuffer: Serialized message with type byte prefixDeserializes a message from a byte buffer received from the network.
Parameters:
msg (ByteBuffer): Serialized message dataReturns:
BlockTransferMessage: Deserialized message instanceThrows:
IllegalArgumentException: If message type is unknownpublic class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;
public final int subDirsPerLocalDir;
public final String shuffleManager;
public ExecutorShuffleInfo(
String[] localDirs,
int subDirsPerLocalDir,
String shuffleManager
);
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static ExecutorShuffleInfo decode(ByteBuf buf);
}Configuration data describing where an executor stores its shuffle files and how they are organized.
Fields:
localDirs (String[]): Base local directories where shuffle files are storedsubDirsPerLocalDir (int): Number of subdirectories created within each local directoryshuffleManager (String): Fully qualified class name of the shuffle manager (e.g., "org.apache.spark.shuffle.sort.SortShuffleManager")public class RegisterExecutor extends BlockTransferMessage {
public final String appId;
public final String execId;
public final ExecutorShuffleInfo executorInfo;
public RegisterExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo
);
protected Type type(); // Returns REGISTER_EXECUTOR
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static RegisterExecutor decode(ByteBuf buf);
}Initial registration message sent from executor to shuffle server. Contains information needed to locate the executor's shuffle files.
Fields:
appId (String): Spark application identifierexecId (String): Executor identifier within the applicationexecutorInfo (ExecutorShuffleInfo): Configuration describing shuffle file locationspublic class RegisterDriver extends BlockTransferMessage {
public final String appId;
public RegisterDriver(String appId);
protected Type type(); // Returns REGISTER_DRIVER
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static RegisterDriver decode(ByteBuf buf);
}Mesos-specific message for registering the Spark driver with the shuffle service for proper cleanup.
Fields:
appId (String): Spark application identifierpublic class OpenBlocks extends BlockTransferMessage {
public final String appId;
public final String execId;
public final String[] blockIds;
public OpenBlocks(String appId, String execId, String[] blockIds);
protected Type type(); // Returns OPEN_BLOCKS
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static OpenBlocks decode(ByteBuf buf);
}Request to open and read a set of shuffle blocks. The server responds with a StreamHandle containing stream information.
Fields:
appId (String): Application that owns the blocksexecId (String): Executor that wrote the blocksblockIds (String[]): Array of block identifiers to readpublic class StreamHandle extends BlockTransferMessage {
public final long streamId;
public final int numChunks;
public StreamHandle(long streamId, int numChunks);
protected Type type(); // Returns STREAM_HANDLE
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static StreamHandle decode(ByteBuf buf);
}Response to OpenBlocks request containing information about the opened stream for reading block data.
Fields:
streamId (long): Unique identifier for the data streamnumChunks (int): Number of chunks (blocks) available in the streampublic class UploadBlock extends BlockTransferMessage {
public final String appId;
public final String execId;
public final String blockId;
public final byte[] metadata;
public final byte[] blockData;
public UploadBlock(
String appId,
String execId,
String blockId,
byte[] metadata,
byte[] blockData
);
protected Type type(); // Returns UPLOAD_BLOCK
public boolean equals(Object other);
public int hashCode();
public String toString();
public int encodedLength();
public void encode(ByteBuf buf);
public static UploadBlock decode(ByteBuf buf);
}Request to upload a block with associated metadata. Used by NettyBlockTransferService but not typically by external shuffle service.
Fields:
appId (String): Application identifierexecId (String): Executor identifierblockId (String): Block identifiermetadata (byte[]): Serialized block metadata (typically StorageLevel)blockData (byte[]): Raw block data bytes// Create executor shuffle info
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
new String[]{"/tmp/spark-shuffle"},
64,
"org.apache.spark.shuffle.sort.SortShuffleManager"
);
// Create registration message
RegisterExecutor registerMsg = new RegisterExecutor("app-123", "executor-1", info);
// Serialize for network transmission
ByteBuffer serialized = registerMsg.toByteBuffer();// Request multiple blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
OpenBlocks openMsg = new OpenBlocks("app-123", "executor-1", blockIds);
// Serialize message
ByteBuffer requestData = openMsg.toByteBuffer();// Received message from network
ByteBuffer receivedData = ...; // from network
// Deserialize
BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(receivedData);
// Handle based on type
if (message instanceof OpenBlocks) {
OpenBlocks openBlocks = (OpenBlocks) message;
System.out.println("Request for blocks: " + Arrays.toString(openBlocks.blockIds));
} else if (message instanceof RegisterExecutor) {
RegisterExecutor register = (RegisterExecutor) message;
System.out.println("Executor registration: " + register.execId);
} else if (message instanceof StreamHandle) {
StreamHandle handle = (StreamHandle) message;
System.out.println("Stream " + handle.streamId + " with " + handle.numChunks + " chunks");
}// Mesos-specific driver registration
RegisterDriver driverMsg = new RegisterDriver("spark-app-123");
ByteBuffer driverRegistration = driverMsg.toByteBuffer();
// Send to shuffle service for cleanup coordinationNormal Block Fetch Flow:
RegisterExecutor (one-time setup)OpenBlocks with block IDs to fetchStreamHandle containing stream infoMesos Cleanup Flow:
RegisterDriver for cleanup coordinationBlock Upload Flow (for NettyBlockTransferService):
UploadBlock with block data and metadataProtocol-level errors are typically handled through:
IllegalArgumentExceptiontry {
BlockTransferMessage msg = BlockTransferMessage.Decoder.fromByteBuffer(data);
// Process message...
} catch (IllegalArgumentException e) {
System.err.println("Unknown message type or corrupted data: " + e.getMessage());
// Handle protocol error
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-shuffle-2-10