Spark Project Shuffle Streaming Service - provides network shuffle functionality for Apache Spark's distributed computing engine
Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.
Base class for all shuffle protocol messages.
/**
* Base class for all shuffle protocol messages
*/
public abstract class BlockTransferMessage implements Encodable {
/**
* Convert the message to a ByteBuffer for network transmission
* @return ByteBuffer containing the serialized message
*/
public ByteBuffer toByteBuffer();
/**
* Enumeration of all supported message types
*/
public enum Type {
OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,
REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM
}
/**
* Decoder for deserializing messages from ByteBuffer
*/
public static class Decoder {
/**
* Deserialize a message from ByteBuffer
* @param msg - ByteBuffer containing serialized message
* @return Deserialized BlockTransferMessage
*/
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
}
}Contains executor configuration for locating shuffle files.
/**
* Contains executor configuration for locating shuffle files
*/
public class ExecutorShuffleInfo implements Encodable {
/**
* Local directories where shuffle files are stored
*/
public final String[] localDirs;
/**
* Number of subdirectories per local directory
*/
public final int subDirsPerLocalDir;
/**
* Shuffle manager class name
*/
public final String shuffleManager;
/**
* Create executor shuffle information
* @param localDirs - Array of local directory paths for shuffle files
* @param subDirsPerLocalDir - Number of subdirectories per local directory
* @param shuffleManager - Name of the shuffle manager implementation
*/
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Request to read a set of blocks.
/**
* Request to read a set of blocks from the shuffle service
*/
public class OpenBlocks extends BlockTransferMessage {
/**
* Application ID
*/
public final String appId;
/**
* Executor ID
*/
public final String execId;
/**
* Array of block IDs to open
*/
public final String[] blockIds;
/**
* Create an open blocks request
* @param appId - Application ID
* @param execId - Executor ID
* @param blockIds - Array of block IDs to request
*/
public OpenBlocks(String appId, String execId, String[] blockIds);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Initial registration message between executor and shuffle server.
/**
* Initial registration message between executor and shuffle server
*/
public class RegisterExecutor extends BlockTransferMessage {
/**
* Application ID
*/
public final String appId;
/**
* Executor ID
*/
public final String execId;
/**
* Executor shuffle configuration information
*/
public final ExecutorShuffleInfo executorInfo;
/**
* Create an executor registration message
* @param appId - Application ID
* @param execId - Executor ID
* @param executorInfo - Executor shuffle configuration
*/
public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Identifier for fixed number of chunks in a stream.
/**
* Identifier for a fixed number of chunks in a stream
*/
public class StreamHandle extends BlockTransferMessage {
/**
* Stream identifier
*/
public final long streamId;
/**
* Number of chunks in the stream
*/
public final int numChunks;
/**
* Create a stream handle
* @param streamId - Unique stream identifier
* @param numChunks - Number of chunks in the stream
*/
public StreamHandle(long streamId, int numChunks);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Request to upload a block with storage level.
/**
* Request to upload a block with storage level
*/
public class UploadBlock extends BlockTransferMessage {
/**
* Application ID
*/
public final String appId;
/**
* Executor ID
*/
public final String execId;
/**
* Block ID
*/
public final String blockId;
/**
* Block metadata
*/
public final byte[] metadata;
/**
* Block data
*/
public final byte[] blockData;
/**
* Create an upload block request
* @param appId - Application ID
* @param execId - Executor ID
* @param blockId - Block ID to upload
* @param metadata - Block metadata
* @param blockData - Block data bytes
*/
public UploadBlock(String appId, String execId, String blockId, byte[] metadata, byte[] blockData);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Request to upload block as stream.
/**
* Request to upload block as stream
*/
public class UploadBlockStream extends BlockTransferMessage {
/**
* Block ID
*/
public final String blockId;
/**
* Block metadata
*/
public final byte[] metadata;
/**
* Create an upload block stream request
* @param blockId - Block ID to upload
* @param metadata - Block metadata
*/
public UploadBlockStream(String blockId, byte[] metadata);
public boolean equals(Object other);
public int hashCode();
public String toString();
}Message for driver registration with Mesos external shuffle service.
/**
* Message for driver registration with Mesos external shuffle service
*/
public class RegisterDriver extends BlockTransferMessage {
/**
* Create a driver registration message
* @param appId - Application ID
* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
*/
public RegisterDriver(String appId, long heartbeatTimeoutMs);
/**
* Get the application ID
* @return Application ID
*/
public String getAppId();
/**
* Get the heartbeat timeout
* @return Heartbeat timeout in milliseconds
*/
public long getHeartbeatTimeoutMs();
public boolean equals(Object other);
public int hashCode();
public String toString();
}Heartbeat message from driver to Mesos external shuffle service.
/**
* Heartbeat message from driver to Mesos external shuffle service
*/
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
/**
* Create a heartbeat message
* @param appId - Application ID
*/
public ShuffleServiceHeartbeat(String appId);
/**
* Get the application ID
* @return Application ID
*/
public String getAppId();
public boolean equals(Object other);
public int hashCode();
public String toString();
}Usage Examples:
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.shuffle.protocol.mesos.*;
// Example 1: Executor registration
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
RegisterExecutor registerMsg = new RegisterExecutor("app-001", "executor-1", executorInfo);
// Serialize for network transmission
ByteBuffer serialized = registerMsg.toByteBuffer();
System.out.println("Serialized registration message: " + serialized.remaining() + " bytes");
// Deserialize received message
BlockTransferMessage received = BlockTransferMessage.Decoder.fromByteBuffer(serialized);
if (received instanceof RegisterExecutor) {
RegisterExecutor regMsg = (RegisterExecutor) received;
System.out.println("Received registration for app: " + regMsg.appId +
", executor: " + regMsg.execId);
}
// Example 2: Block request
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};
OpenBlocks openBlocks = new OpenBlocks("app-001", "executor-1", blockIds);
ByteBuffer openBlocksBuffer = openBlocks.toByteBuffer();
System.out.println("Open blocks request size: " + openBlocksBuffer.remaining() + " bytes");
// Example 3: Stream handling
StreamHandle streamHandle = new StreamHandle(12345L, 3);
System.out.println("Stream handle: ID=" + streamHandle.streamId +
", chunks=" + streamHandle.numChunks);
// Example 4: Block upload
byte[] metadata = "block-metadata".getBytes();
byte[] blockData = "actual-block-data-here".getBytes();
UploadBlock uploadBlock = new UploadBlock("app-001", "executor-1", "block-123", metadata, blockData);
System.out.println("Upload block: " + uploadBlock.blockId +
", metadata size: " + uploadBlock.metadata.length +
", data size: " + uploadBlock.blockData.length);
// Example 5: Mesos driver registration
RegisterDriver driverReg = new RegisterDriver("app-001", 30000L);
System.out.println("Mesos driver registration: app=" + driverReg.getAppId() +
", timeout=" + driverReg.getHeartbeatTimeoutMs() + "ms");
// Example 6: Heartbeat
ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat("app-001");
System.out.println("Heartbeat for app: " + heartbeat.getAppId());The typical message flow between shuffle clients and servers:
Registration Phase:
RegisterExecutor with executor configurationBlock Request Phase:
OpenBlocks with list of required block IDsStreamHandle containing stream informationData Transfer Phase:
Upload Operations (if needed):
UploadBlock or UploadBlockStream for block storageMesos-Specific Flow:
RegisterDriver to establish connectionShuffleServiceHeartbeat messages maintain connectionAll protocol messages implement the Encodable interface and provide:
Protocol messages include built-in error handling:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-shuffle-2-11