CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Pending
Overview
Eval results
Files

message-protocol.mddocs/

Message Protocol

The message protocol API defines the comprehensive communication system for Apache Spark's networking layer. It provides a structured approach to different types of network communication including RPC calls, streaming operations, and chunk fetching through a type-safe message system built on Netty.

Capabilities

Message Interface

Base interface for all network messages in the Spark transport protocol.

public interface Message extends Encodable {
    /**
     * Get the type of this message
     * @return Type enum indicating the specific message type
     */
    Type type();
    
    /**
     * Get the body data of this message
     * @return ManagedBuffer containing the message payload
     */
    ManagedBuffer body();
    
    /**
     * Check if the message body is included in the frame
     * @return boolean indicating if body is in-frame (true) or separate (false)
     */
    boolean isBodyInFrame();
    
    /**
     * Enumeration of all supported message types
     */
    enum Type {
        ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
        RpcRequest(3), RpcResponse(4), RpcFailure(5),
        StreamRequest(6), StreamResponse(7), StreamFailure(8),
        OneWayMessage(9), UploadStream(10),
        MergedBlockMetaRequest(11), MergedBlockMetaSuccess(12),
        User(-1);
        
        private final byte id;
        Type(int id) { this.id = (byte) id; }
        public byte id() { return id; }
    }
}

Encodable Interface

Interface for objects that can be encoded to ByteBuf for network transmission.

public interface Encodable {
    /**
     * Get the encoded length of this object in bytes
     * @return int representing the number of bytes needed for encoding
     */
    int encodedLength();
    
    /**
     * Encode this object into the provided ByteBuf
     * @param buf - ByteBuf to write the encoded data to
     */
    void encode(ByteBuf buf);
}

Message Categories

Messages are categorized into request and response types for structured communication patterns.

/**
 * Marker interface for request messages
 */
public interface RequestMessage extends Message {
}

/**
 * Marker interface for response messages  
 */
public interface ResponseMessage extends Message {
}

RPC Messages

RpcRequest

Message for sending remote procedure calls to the server.

public final class RpcRequest extends AbstractMessage implements RequestMessage {
    /**
     * Create an RPC request message
     * @param requestId - Unique identifier for this RPC request
     * @param message - ManagedBuffer containing the RPC data
     */
    public RpcRequest(long requestId, ManagedBuffer message);
    
    /**
     * Get the request identifier
     * @return long representing the unique request ID
     */
    public long requestId();
    
    @Override
    public Type type();
    
    @Override
    public ManagedBuffer body();
    
    @Override
    public boolean isBodyInFrame();
}

RpcResponse

Message for returning successful RPC call results.

public final class RpcResponse extends AbstractResponseMessage {
    /**
     * Create an RPC response message
     * @param requestId - ID of the original RPC request
     * @param message - ManagedBuffer containing the response data
     */
    public RpcResponse(long requestId, ManagedBuffer message);
    
    /**
     * Get the request identifier this response corresponds to
     * @return long representing the original request ID
     */
    public long requestId();
    
    @Override
    public Type type();
    
    @Override
    public ManagedBuffer body();
}

RpcFailure

Message for returning RPC call failures and error information.

public final class RpcFailure extends AbstractResponseMessage {
    /**
     * Create an RPC failure message
     * @param requestId - ID of the failed RPC request
     * @param errorString - String describing the error that occurred
     */
    public RpcFailure(long requestId, String errorString);
    
    /**
     * Get the request identifier this failure corresponds to
     * @return long representing the original request ID
     */
    public long requestId();
    
    /**
     * Get the error message describing the failure
     * @return String containing error details
     */
    public String errorString();
    
    @Override
    public Type type();
}

Chunk Transfer Messages

ChunkFetchRequest

Message for requesting specific chunks of data from a stream.

public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
    /**
     * Create a chunk fetch request
     * @param streamChunkId - StreamChunkId identifying the chunk to fetch
     */
    public ChunkFetchRequest(StreamChunkId streamChunkId);
    
    /**
     * Get the stream chunk identifier
     * @return StreamChunkId specifying which chunk to fetch
     */
    public StreamChunkId streamChunkId();
    
    @Override
    public Type type();
}

ChunkFetchSuccess

Message for returning successfully fetched chunk data.

public final class ChunkFetchSuccess extends AbstractResponseMessage {
    /**
     * Create a successful chunk fetch response
     * @param streamChunkId - StreamChunkId of the fetched chunk
     * @param buffer - ManagedBuffer containing the chunk data
     */
    public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
    
    /**
     * Get the stream chunk identifier
     * @return StreamChunkId of the returned chunk
     */
    public StreamChunkId streamChunkId();
    
    @Override
    public Type type();
    
    @Override
    public ManagedBuffer body();
}

ChunkFetchFailure

Message for reporting chunk fetch failures.

public final class ChunkFetchFailure extends AbstractResponseMessage {
    /**
     * Create a chunk fetch failure response
     * @param streamChunkId - StreamChunkId of the failed chunk
     * @param errorString - String describing the fetch error
     */
    public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
    
    /**
     * Get the stream chunk identifier
     * @return StreamChunkId of the failed chunk
     */
    public StreamChunkId streamChunkId();
    
    /**
     * Get the error message describing the failure
     * @return String containing error details
     */
    public String errorString();
    
    @Override
    public Type type();
}

StreamChunkId

Identifier for specific chunks within streams.

public final class StreamChunkId implements Encodable {
    /**
     * Create a stream chunk identifier
     * @param streamId - Identifier of the stream
     * @param chunkIndex - Index of the chunk within the stream
     */
    public StreamChunkId(long streamId, int chunkIndex);
    
    /**
     * Get the stream identifier
     * @return long representing the stream ID
     */
    public long streamId();
    
    /**
     * Get the chunk index
     * @return int representing the chunk index within the stream
     */
    public int chunkIndex();
    
    @Override
    public int encodedLength();
    
    @Override
    public void encode(ByteBuf buf);
}

Streaming Messages

StreamRequest

Message for requesting to open a named stream.

public final class StreamRequest extends AbstractMessage implements RequestMessage {
    /**
     * Create a stream request
     * @param streamId - String identifier of the stream to open
     */
    public StreamRequest(String streamId);
    
    /**
     * Get the stream identifier
     * @return String representing the stream ID
     */
    public String streamId();
    
    @Override
    public Type type();
}

StreamResponse

Message for returning streaming data.

public final class StreamResponse extends AbstractResponseMessage {
    /**
     * Create a stream response
     * @param streamId - String identifier of the stream
     * @param byteCount - Number of bytes in the stream
     * @param body - ManagedBuffer containing the stream data
     */
    public StreamResponse(String streamId, long byteCount, ManagedBuffer body);
    
    /**
     * Get the stream identifier
     * @return String representing the stream ID
     */
    public String streamId();
    
    /**
     * Get the number of bytes in the stream
     * @return long representing the byte count
     */
    public long byteCount();
    
    @Override
    public Type type();
    
    @Override
    public ManagedBuffer body();
}

StreamFailure

Message for reporting stream operation failures.

public final class StreamFailure extends AbstractResponseMessage {
    /**
     * Create a stream failure response
     * @param streamId - String identifier of the failed stream
     * @param errorString - String describing the stream error
     */
    public StreamFailure(String streamId, String errorString);
    
    /**
     * Get the stream identifier
     * @return String representing the failed stream ID
     */
    public String streamId();
    
    /**
     * Get the error message
     * @return String containing error details
     */
    public String errorString();
    
    @Override
    public Type type();
}

Upload and One-Way Messages

UploadStream

Message for uploading stream data to the server.

public final class UploadStream extends AbstractMessage implements RequestMessage {
    /**
     * Create an upload stream message
     * @param requestId - Unique identifier for this upload request
     * @param meta - ManagedBuffer containing metadata about the upload
     * @param body - ManagedBuffer containing the data to upload
     */
    public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body);
    
    /**
     * Get the request identifier
     * @return long representing the upload request ID
     */
    public long requestId();
    
    /**
     * Get the metadata buffer
     * @return ManagedBuffer containing upload metadata
     */
    public ManagedBuffer meta();
    
    @Override
    public Type type();
    
    @Override
    public ManagedBuffer body();
}

OneWayMessage

Message for one-way communication where no response is expected.

public final class OneWayMessage extends AbstractMessage implements RequestMessage {
    /**
     * Create a one-way message
     * @param body - ManagedBuffer containing the message data
     */
    public OneWayMessage(ManagedBuffer body);
    
    @Override
    public Type type();
    
    @Override
    public ManagedBuffer body();
    
    @Override
    public boolean isBodyInFrame();
}

Merged Block Messages

MergedBlockMetaRequest

Message for requesting metadata about merged blocks in shuffle operations.

public final class MergedBlockMetaRequest extends AbstractMessage implements RequestMessage {
    /**
     * Create a merged block metadata request
     * @param requestId - Unique identifier for this request
     * @param appId - Application identifier
     * @param shuffleId - Shuffle operation identifier
     * @param shuffleMergeId - Shuffle merge identifier
     * @param reduceId - Reducer task identifier
     */
    public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, int shuffleMergeId, int reduceId);
    
    /**
     * Get the request identifier
     * @return long representing the request ID
     */
    public long requestId();
    
    /**
     * Get the application identifier
     * @return String representing the app ID
     */
    public String appId();
    
    /**
     * Get the shuffle identifier
     * @return int representing the shuffle ID
     */
    public int shuffleId();
    
    /**
     * Get the shuffle merge identifier
     * @return int representing the shuffle merge ID
     */
    public int shuffleMergeId();
    
    /**
     * Get the reduce task identifier
     * @return int representing the reduce ID
     */
    public int reduceId();
    
    @Override
    public Type type();
}

MergedBlockMetaSuccess

Message for returning successful merged block metadata.

public final class MergedBlockMetaSuccess extends AbstractResponseMessage {
    /**
     * Create a successful merged block metadata response
     * @param requestId - ID of the original request
     * @param numChunks - Number of chunks in the merged block
     * @param buffer - ManagedBuffer containing the metadata
     */
    public MergedBlockMetaSuccess(long requestId, int numChunks, ManagedBuffer buffer);
    
    /**
     * Get the request identifier
     * @return long representing the original request ID
     */
    public long requestId();
    
    /**
     * Get the number of chunks
     * @return int representing the chunk count
     */
    public int numChunks();
    
    @Override
    public Type type();
    
    @Override
    public ManagedBuffer body();
}

Message Encoding and Decoding

Encoders Utility Class

Utility class for encoding and decoding protocol messages.

public class Encoders {
    /**
     * Decode a message from ByteBuf
     * @param msgType - Message type byte
     * @param in - ByteBuf containing the encoded message
     * @return Message instance decoded from the buffer
     */
    public static Message decode(Message.Type msgType, ByteBuf in);
    
    /**
     * Encode a message to ByteBuf
     * @param msg - Message to encode
     * @param out - ByteBuf to write the encoded message to
     */
    public static void encode(Message msg, ByteBuf out);
}

MessageEncoder

Netty encoder for converting Message objects to ByteBuf for network transmission.

public final class MessageEncoder extends MessageToByteEncoder<Message> {
    public static final MessageEncoder INSTANCE = new MessageEncoder();
    
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;
}

MessageDecoder

Netty decoder for converting ByteBuf data into Message objects.

public final class MessageDecoder extends LengthFieldBasedFrameDecoder {
    public static final MessageDecoder INSTANCE = new MessageDecoder();
    
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
}

Usage Examples

Creating and Sending RPC Messages

import org.apache.spark.network.protocol.*;
import org.apache.spark.network.buffer.NioManagedBuffer;

// Create RPC request
String rpcData = "{ \"method\": \"process\", \"params\": [1, 2, 3] }";
ManagedBuffer requestBuffer = new NioManagedBuffer(ByteBuffer.wrap(rpcData.getBytes()));
long requestId = System.currentTimeMillis();
RpcRequest request = new RpcRequest(requestId, requestBuffer);

System.out.println("Created RPC request with ID: " + request.requestId());
System.out.println("Message type: " + request.type());
System.out.println("Body in frame: " + request.isBodyInFrame());

// Create RPC response
String responseData = "{ \"result\": 6, \"status\": \"success\" }";
ManagedBuffer responseBuffer = new NioManagedBuffer(ByteBuffer.wrap(responseData.getBytes()));
RpcResponse response = new RpcResponse(requestId, responseBuffer);

// Create RPC failure
RpcFailure failure = new RpcFailure(requestId, "Method not found: process");
System.out.println("Failure message: " + failure.errorString());

Working with Chunk Fetch Messages

import org.apache.spark.network.protocol.*;

// Create stream chunk identifier
long streamId = 12345L;
int chunkIndex = 0;
StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);

System.out.println("Stream ID: " + chunkId.streamId());
System.out.println("Chunk index: " + chunkId.chunkIndex());

// Create chunk fetch request
ChunkFetchRequest fetchRequest = new ChunkFetchRequest(chunkId);
System.out.println("Fetch request type: " + fetchRequest.type());

// Create successful chunk response
byte[] chunkData = "This is chunk data".getBytes();
ManagedBuffer chunkBuffer = new NioManagedBuffer(ByteBuffer.wrap(chunkData));
ChunkFetchSuccess fetchSuccess = new ChunkFetchSuccess(chunkId, chunkBuffer);

System.out.println("Success response type: " + fetchSuccess.type());
System.out.println("Chunk size: " + fetchSuccess.body().size());

// Create chunk fetch failure
ChunkFetchFailure fetchFailure = new ChunkFetchFailure(chunkId, "Chunk not found");
System.out.println("Failure error: " + fetchFailure.errorString());

Streaming Message Examples

// Create stream request
String streamId = "data-stream-001";
StreamRequest streamRequest = new StreamRequest(streamId);
System.out.println("Requesting stream: " + streamRequest.streamId());

// Create stream response
String streamData = "Line 1\nLine 2\nLine 3\n";
ManagedBuffer streamBuffer = new NioManagedBuffer(ByteBuffer.wrap(streamData.getBytes()));
StreamResponse streamResponse = new StreamResponse(streamId, streamData.length(), streamBuffer);

System.out.println("Stream response for: " + streamResponse.streamId());
System.out.println("Byte count: " + streamResponse.byteCount());

// Create stream failure
StreamFailure streamFailure = new StreamFailure(streamId, "Stream corrupted");
System.out.println("Stream failure: " + streamFailure.errorString());

Upload and One-Way Messages

// Create upload stream message
String metadata = "{ \"filename\": \"data.txt\", \"size\": 1024 }";
ManagedBuffer metaBuffer = new NioManagedBuffer(ByteBuffer.wrap(metadata.getBytes()));

String uploadData = "File content to upload";
ManagedBuffer dataBuffer = new NioManagedBuffer(ByteBuffer.wrap(uploadData.getBytes()));

long uploadRequestId = System.currentTimeMillis();
UploadStream uploadMessage = new UploadStream(uploadRequestId, metaBuffer, dataBuffer);

System.out.println("Upload request ID: " + uploadMessage.requestId());
System.out.println("Upload metadata size: " + uploadMessage.meta().size());
System.out.println("Upload data size: " + uploadMessage.body().size());

// Create one-way message
String oneWayData = "Fire and forget message";
ManagedBuffer oneWayBuffer = new NioManagedBuffer(ByteBuffer.wrap(oneWayData.getBytes()));
OneWayMessage oneWayMessage = new OneWayMessage(oneWayBuffer);

System.out.println("One-way message type: " + oneWayMessage.type());
System.out.println("One-way body in frame: " + oneWayMessage.isBodyInFrame());

Message Encoding and Decoding

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

// Encode a message
RpcRequest request = new RpcRequest(123L, new NioManagedBuffer(ByteBuffer.wrap("test".getBytes())));
ByteBuf encodedBuffer = Unpooled.buffer();

try {
    Encoders.encode(request, encodedBuffer);
    System.out.println("Encoded message size: " + encodedBuffer.readableBytes());
    
    // Decode the message back
    encodedBuffer.resetReaderIndex();
    byte msgTypeByte = encodedBuffer.readByte();
    Message.Type msgType = Message.Type.values()[msgTypeByte];
    
    Message decodedMessage = Encoders.decode(msgType, encodedBuffer);
    System.out.println("Decoded message type: " + decodedMessage.type());
    
    if (decodedMessage instanceof RpcRequest) {
        RpcRequest decodedRequest = (RpcRequest) decodedMessage;
        System.out.println("Decoded request ID: " + decodedRequest.requestId());
    }
    
} finally {
    encodedBuffer.release();
}

Custom Message Handling

// Process different message types
public void handleMessage(Message message) {
    switch (message.type()) {
        case RpcRequest:
            RpcRequest rpcReq = (RpcRequest) message;
            System.out.println("Handling RPC request: " + rpcReq.requestId());
            break;
            
        case ChunkFetchRequest:
            ChunkFetchRequest chunkReq = (ChunkFetchRequest) message;
            System.out.println("Handling chunk fetch for stream: " + chunkReq.streamChunkId().streamId());
            break;
            
        case StreamRequest:
            StreamRequest streamReq = (StreamRequest) message;
            System.out.println("Handling stream request: " + streamReq.streamId());
            break;
            
        case OneWayMessage:
            OneWayMessage oneWay = (OneWayMessage) message;
            System.out.println("Handling one-way message with body size: " + oneWay.body().size());
            break;
            
        default:
            System.out.println("Unknown message type: " + message.type());
    }
}

Abstract Base Classes

AbstractMessage

Base implementation for common message functionality.

public abstract class AbstractMessage implements Message {
    @Override
    public boolean isBodyInFrame();
    
    @Override
    public int encodedLength();
    
    @Override
    public void encode(ByteBuf buf);
}

AbstractResponseMessage

Base implementation for response messages.

public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {
    @Override
    public ManagedBuffer body();
    
    @Override
    public boolean isBodyInFrame();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12

docs

buffer-management.md

client-operations.md

configuration-management.md

index.md

message-protocol.md

security-authentication.md

server-operations.md

shuffle-database.md

transport-context.md

tile.json