or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md
tile.json

protocol.mddocs/

Protocol Messages

Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.

Capabilities

BlockTransferMessage Base Class

Base class for all shuffle protocol messages.

/**
 * Base class for all shuffle protocol messages
 */
public abstract class BlockTransferMessage implements Encodable {
    /**
     * Convert the message to a ByteBuffer for network transmission
     * @return ByteBuffer containing the serialized message
     */
    public ByteBuffer toByteBuffer();
    
    /**
     * Enumeration of all supported message types
     */
    public enum Type {
        OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,
        REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM
    }
    
    /**
     * Decoder for deserializing messages from ByteBuffer
     */
    public static class Decoder {
        /**
         * Deserialize a message from ByteBuffer
         * @param msg - ByteBuffer containing serialized message
         * @return Deserialized BlockTransferMessage
         */
        public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
    }
}

ExecutorShuffleInfo

Contains executor configuration for locating shuffle files.

/**
 * Contains executor configuration for locating shuffle files
 */
public class ExecutorShuffleInfo implements Encodable {
    /**
     * Local directories where shuffle files are stored
     */
    public final String[] localDirs;
    
    /**
     * Number of subdirectories per local directory
     */
    public final int subDirsPerLocalDir;
    
    /**
     * Shuffle manager class name
     */
    public final String shuffleManager;
    
    /**
     * Create executor shuffle information
     * @param localDirs - Array of local directory paths for shuffle files
     * @param subDirsPerLocalDir - Number of subdirectories per local directory
     * @param shuffleManager - Name of the shuffle manager implementation
     */
    public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

OpenBlocks Message

Request to read a set of blocks.

/**
 * Request to read a set of blocks from the shuffle service
 */
public class OpenBlocks extends BlockTransferMessage {
    /**
     * Application ID
     */
    public final String appId;
    
    /**
     * Executor ID
     */
    public final String execId;
    
    /**
     * Array of block IDs to open
     */
    public final String[] blockIds;
    
    /**
     * Create an open blocks request
     * @param appId - Application ID
     * @param execId - Executor ID  
     * @param blockIds - Array of block IDs to request
     */
    public OpenBlocks(String appId, String execId, String[] blockIds);
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

RegisterExecutor Message

Initial registration message between executor and shuffle server.

/**
 * Initial registration message between executor and shuffle server
 */
public class RegisterExecutor extends BlockTransferMessage {
    /**
     * Application ID
     */
    public final String appId;
    
    /**
     * Executor ID
     */
    public final String execId;
    
    /**
     * Executor shuffle configuration information
     */
    public final ExecutorShuffleInfo executorInfo;
    
    /**
     * Create an executor registration message
     * @param appId - Application ID
     * @param execId - Executor ID
     * @param executorInfo - Executor shuffle configuration
     */
    public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

StreamHandle Message

Identifier for fixed number of chunks in a stream.

/**
 * Identifier for a fixed number of chunks in a stream
 */
public class StreamHandle extends BlockTransferMessage {
    /**
     * Stream identifier
     */
    public final long streamId;
    
    /**
     * Number of chunks in the stream
     */
    public final int numChunks;
    
    /**
     * Create a stream handle
     * @param streamId - Unique stream identifier
     * @param numChunks - Number of chunks in the stream
     */
    public StreamHandle(long streamId, int numChunks);
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

UploadBlock Message

Request to upload a block with storage level.

/**
 * Request to upload a block with storage level
 */
public class UploadBlock extends BlockTransferMessage {
    /**
     * Application ID
     */
    public final String appId;
    
    /**
     * Executor ID
     */
    public final String execId;
    
    /**
     * Block ID
     */
    public final String blockId;
    
    /**
     * Block metadata
     */
    public final byte[] metadata;
    
    /**
     * Block data
     */
    public final byte[] blockData;
    
    /**
     * Create an upload block request
     * @param appId - Application ID
     * @param execId - Executor ID
     * @param blockId - Block ID to upload
     * @param metadata - Block metadata
     * @param blockData - Block data bytes
     */
    public UploadBlock(String appId, String execId, String blockId, byte[] metadata, byte[] blockData);
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

UploadBlockStream Message

Request to upload block as stream.

/**
 * Request to upload block as stream
 */
public class UploadBlockStream extends BlockTransferMessage {
    /**
     * Block ID
     */
    public final String blockId;
    
    /**
     * Block metadata
     */
    public final byte[] metadata;
    
    /**
     * Create an upload block stream request
     * @param blockId - Block ID to upload
     * @param metadata - Block metadata
     */
    public UploadBlockStream(String blockId, byte[] metadata);
    
    public boolean equals(Object other);
    public int hashCode();  
    public String toString();
}

Mesos Protocol Messages

RegisterDriver Message

Message for driver registration with Mesos external shuffle service.

/**
 * Message for driver registration with Mesos external shuffle service
 */
public class RegisterDriver extends BlockTransferMessage {
    /**
     * Create a driver registration message
     * @param appId - Application ID
     * @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
     */
    public RegisterDriver(String appId, long heartbeatTimeoutMs);
    
    /**
     * Get the application ID
     * @return Application ID
     */
    public String getAppId();
    
    /**
     * Get the heartbeat timeout
     * @return Heartbeat timeout in milliseconds
     */
    public long getHeartbeatTimeoutMs();
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

ShuffleServiceHeartbeat Message

Heartbeat message from driver to Mesos external shuffle service.

/**
 * Heartbeat message from driver to Mesos external shuffle service
 */
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
    /**
     * Create a heartbeat message
     * @param appId - Application ID
     */
    public ShuffleServiceHeartbeat(String appId);
    
    /**
     * Get the application ID
     * @return Application ID
     */
    public String getAppId();
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

Usage Examples:

import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.shuffle.protocol.mesos.*;

// Example 1: Executor registration
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
RegisterExecutor registerMsg = new RegisterExecutor("app-001", "executor-1", executorInfo);

// Serialize for network transmission
ByteBuffer serialized = registerMsg.toByteBuffer();
System.out.println("Serialized registration message: " + serialized.remaining() + " bytes");

// Deserialize received message
BlockTransferMessage received = BlockTransferMessage.Decoder.fromByteBuffer(serialized);
if (received instanceof RegisterExecutor) {
    RegisterExecutor regMsg = (RegisterExecutor) received;
    System.out.println("Received registration for app: " + regMsg.appId + 
                      ", executor: " + regMsg.execId);
}

// Example 2: Block request
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};
OpenBlocks openBlocks = new OpenBlocks("app-001", "executor-1", blockIds);

ByteBuffer openBlocksBuffer = openBlocks.toByteBuffer();
System.out.println("Open blocks request size: " + openBlocksBuffer.remaining() + " bytes");

// Example 3: Stream handling
StreamHandle streamHandle = new StreamHandle(12345L, 3);
System.out.println("Stream handle: ID=" + streamHandle.streamId + 
                  ", chunks=" + streamHandle.numChunks);

// Example 4: Block upload
byte[] metadata = "block-metadata".getBytes();
byte[] blockData = "actual-block-data-here".getBytes();
UploadBlock uploadBlock = new UploadBlock("app-001", "executor-1", "block-123", metadata, blockData);

System.out.println("Upload block: " + uploadBlock.blockId + 
                  ", metadata size: " + uploadBlock.metadata.length + 
                  ", data size: " + uploadBlock.blockData.length);

// Example 5: Mesos driver registration
RegisterDriver driverReg = new RegisterDriver("app-001", 30000L);
System.out.println("Mesos driver registration: app=" + driverReg.getAppId() + 
                  ", timeout=" + driverReg.getHeartbeatTimeoutMs() + "ms");

// Example 6: Heartbeat
ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat("app-001");
System.out.println("Heartbeat for app: " + heartbeat.getAppId());

Protocol Message Flow

The typical message flow between shuffle clients and servers:

  1. Registration Phase:

    • Client sends RegisterExecutor with executor configuration
    • Server acknowledges and stores executor information
  2. Block Request Phase:

    • Client sends OpenBlocks with list of required block IDs
    • Server responds with StreamHandle containing stream information
  3. Data Transfer Phase:

    • Client receives block data through established stream
    • Multiple blocks can be transferred through a single stream
  4. Upload Operations (if needed):

    • Client sends UploadBlock or UploadBlockStream for block storage
    • Server stores blocks according to shuffle configuration
  5. Mesos-Specific Flow:

    • Driver sends RegisterDriver to establish connection
    • Periodic ShuffleServiceHeartbeat messages maintain connection

Message Serialization

All protocol messages implement the Encodable interface and provide:

  • Encoding: Convert message objects to ByteBuffer for network transmission
  • Decoding: Reconstruct message objects from received ByteBuffer
  • Type Safety: Message type identification for proper deserialization
  • Efficiency: Optimized serialization for high-throughput shuffle operations

Error Handling

Protocol messages include built-in error handling:

  • Validation: Input parameter validation during message creation
  • Serialization Errors: Proper exception handling during encoding/decoding
  • Version Compatibility: Forward/backward compatibility for protocol evolution
  • Corruption Detection: Built-in mechanisms to detect corrupted messages