or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdprotocol-messages.mdsecurity.mdserver-components.mdshuffle-client.md
tile.json

protocol-messages.mddocs/

Network Protocol Messages

The protocol message classes define the communication format between shuffle clients and servers. All messages extend BlockTransferMessage and support serialization to/from byte buffers for network transmission.

Base Protocol Class

BlockTransferMessage

public abstract class BlockTransferMessage implements Encodable {
    protected abstract Type type();
    
    public ByteBuffer toByteBuffer();
    
    public static class Decoder {
        public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
    }
    
    public static enum Type {
        OPEN_BLOCKS(0),
        UPLOAD_BLOCK(1), 
        REGISTER_EXECUTOR(2),
        STREAM_HANDLE(3),
        REGISTER_DRIVER(4);
        
        public byte id();
    }
}

Abstract base class for all network protocol messages. Provides serialization and deserialization capabilities.

Key Methods:

toByteBuffer

Serializes the message to a byte buffer for network transmission.

Returns:

  • ByteBuffer: Serialized message with type byte prefix

fromByteBuffer (static)

Deserializes a message from a byte buffer received from the network.

Parameters:

  • msg (ByteBuffer): Serialized message data

Returns:

  • BlockTransferMessage: Deserialized message instance

Throws:

  • IllegalArgumentException: If message type is unknown

Configuration Messages

ExecutorShuffleInfo

public class ExecutorShuffleInfo implements Encodable {
    public final String[] localDirs;
    public final int subDirsPerLocalDir;
    public final String shuffleManager;
    
    public ExecutorShuffleInfo(
        String[] localDirs,
        int subDirsPerLocalDir,
        String shuffleManager
    );
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
    
    public int encodedLength();
    public void encode(ByteBuf buf);
    public static ExecutorShuffleInfo decode(ByteBuf buf);
}

Configuration data describing where an executor stores its shuffle files and how they are organized.

Fields:

  • localDirs (String[]): Base local directories where shuffle files are stored
  • subDirsPerLocalDir (int): Number of subdirectories created within each local directory
  • shuffleManager (String): Fully qualified class name of the shuffle manager (e.g., "org.apache.spark.shuffle.sort.SortShuffleManager")

Registration Messages

RegisterExecutor

public class RegisterExecutor extends BlockTransferMessage {
    public final String appId;
    public final String execId;
    public final ExecutorShuffleInfo executorInfo;
    
    public RegisterExecutor(
        String appId,
        String execId,
        ExecutorShuffleInfo executorInfo
    );
    
    protected Type type(); // Returns REGISTER_EXECUTOR
    
    public boolean equals(Object other);
    public int hashCode(); 
    public String toString();
    
    public int encodedLength();
    public void encode(ByteBuf buf);
    public static RegisterExecutor decode(ByteBuf buf);
}

Initial registration message sent from executor to shuffle server. Contains information needed to locate the executor's shuffle files.

Fields:

  • appId (String): Spark application identifier
  • execId (String): Executor identifier within the application
  • executorInfo (ExecutorShuffleInfo): Configuration describing shuffle file locations

RegisterDriver (Mesos)

public class RegisterDriver extends BlockTransferMessage {
    public final String appId;
    
    public RegisterDriver(String appId);
    
    protected Type type(); // Returns REGISTER_DRIVER
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
    
    public int encodedLength();
    public void encode(ByteBuf buf);
    public static RegisterDriver decode(ByteBuf buf);
}

Mesos-specific message for registering the Spark driver with the shuffle service for proper cleanup.

Fields:

  • appId (String): Spark application identifier

Block Access Messages

OpenBlocks

public class OpenBlocks extends BlockTransferMessage {
    public final String appId;
    public final String execId;
    public final String[] blockIds;
    
    public OpenBlocks(String appId, String execId, String[] blockIds);
    
    protected Type type(); // Returns OPEN_BLOCKS
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
    
    public int encodedLength();
    public void encode(ByteBuf buf);
    public static OpenBlocks decode(ByteBuf buf);
}

Request to open and read a set of shuffle blocks. The server responds with a StreamHandle containing stream information.

Fields:

  • appId (String): Application that owns the blocks
  • execId (String): Executor that wrote the blocks
  • blockIds (String[]): Array of block identifiers to read

StreamHandle

public class StreamHandle extends BlockTransferMessage {
    public final long streamId;
    public final int numChunks;
    
    public StreamHandle(long streamId, int numChunks);
    
    protected Type type(); // Returns STREAM_HANDLE
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
    
    public int encodedLength();
    public void encode(ByteBuf buf);
    public static StreamHandle decode(ByteBuf buf);
}

Response to OpenBlocks request containing information about the opened stream for reading block data.

Fields:

  • streamId (long): Unique identifier for the data stream
  • numChunks (int): Number of chunks (blocks) available in the stream

UploadBlock

public class UploadBlock extends BlockTransferMessage {
    public final String appId;
    public final String execId; 
    public final String blockId;
    public final byte[] metadata;
    public final byte[] blockData;
    
    public UploadBlock(
        String appId,
        String execId,
        String blockId,
        byte[] metadata,
        byte[] blockData
    );
    
    protected Type type(); // Returns UPLOAD_BLOCK
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
    
    public int encodedLength();
    public void encode(ByteBuf buf);
    public static UploadBlock decode(ByteBuf buf);
}

Request to upload a block with associated metadata. Used by NettyBlockTransferService but not typically by external shuffle service.

Fields:

  • appId (String): Application identifier
  • execId (String): Executor identifier
  • blockId (String): Block identifier
  • metadata (byte[]): Serialized block metadata (typically StorageLevel)
  • blockData (byte[]): Raw block data bytes

Usage Examples

Creating Registration Message

// Create executor shuffle info
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
    new String[]{"/tmp/spark-shuffle"},
    64,
    "org.apache.spark.shuffle.sort.SortShuffleManager"
);

// Create registration message
RegisterExecutor registerMsg = new RegisterExecutor("app-123", "executor-1", info);

// Serialize for network transmission
ByteBuffer serialized = registerMsg.toByteBuffer();

Creating Block Request

// Request multiple blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
OpenBlocks openMsg = new OpenBlocks("app-123", "executor-1", blockIds);

// Serialize message
ByteBuffer requestData = openMsg.toByteBuffer();

Deserializing Messages

// Received message from network
ByteBuffer receivedData = ...; // from network

// Deserialize
BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(receivedData);

// Handle based on type
if (message instanceof OpenBlocks) {
    OpenBlocks openBlocks = (OpenBlocks) message;
    System.out.println("Request for blocks: " + Arrays.toString(openBlocks.blockIds));
    
} else if (message instanceof RegisterExecutor) {
    RegisterExecutor register = (RegisterExecutor) message;
    System.out.println("Executor registration: " + register.execId);
    
} else if (message instanceof StreamHandle) {
    StreamHandle handle = (StreamHandle) message;
    System.out.println("Stream " + handle.streamId + " with " + handle.numChunks + " chunks");
}

Mesos Driver Registration

// Mesos-specific driver registration
RegisterDriver driverMsg = new RegisterDriver("spark-app-123");
ByteBuffer driverRegistration = driverMsg.toByteBuffer();

// Send to shuffle service for cleanup coordination

Message Flow

Normal Block Fetch Flow:

  1. Client sends RegisterExecutor (one-time setup)
  2. Client sends OpenBlocks with block IDs to fetch
  3. Server responds with StreamHandle containing stream info
  4. Client reads block data from the stream using the handle

Mesos Cleanup Flow:

  1. Driver sends RegisterDriver for cleanup coordination
  2. Shuffle service tracks driver registration
  3. On application completion, service can properly clean up shuffle data

Block Upload Flow (for NettyBlockTransferService):

  1. Client sends UploadBlock with block data and metadata
  2. Server stores the block and responds with acknowledgment

Error Handling

Protocol-level errors are typically handled through:

  • Message validation: Invalid message formats throw IllegalArgumentException
  • Serialization errors: Encoding/decoding failures indicate protocol version mismatches
  • Network errors: Transport layer handles connection failures and retries
try {
    BlockTransferMessage msg = BlockTransferMessage.Decoder.fromByteBuffer(data);
    // Process message...
} catch (IllegalArgumentException e) {
    System.err.println("Unknown message type or corrupted data: " + e.getMessage());
    // Handle protocol error
}