Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.
—
The message protocol API defines the comprehensive communication system for Apache Spark's networking layer. It provides a structured approach to different types of network communication including RPC calls, streaming operations, and chunk fetching through a type-safe message system built on Netty.
Base interface for all network messages in the Spark transport protocol.
public interface Message extends Encodable {
/**
* Get the type of this message
* @return Type enum indicating the specific message type
*/
Type type();
/**
* Get the body data of this message
* @return ManagedBuffer containing the message payload
*/
ManagedBuffer body();
/**
* Check if the message body is included in the frame
* @return boolean indicating if body is in-frame (true) or separate (false)
*/
boolean isBodyInFrame();
/**
* Enumeration of all supported message types
*/
enum Type {
ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
RpcRequest(3), RpcResponse(4), RpcFailure(5),
StreamRequest(6), StreamResponse(7), StreamFailure(8),
OneWayMessage(9), UploadStream(10),
MergedBlockMetaRequest(11), MergedBlockMetaSuccess(12),
User(-1);
private final byte id;
Type(int id) { this.id = (byte) id; }
public byte id() { return id; }
}
}Interface for objects that can be encoded to ByteBuf for network transmission.
public interface Encodable {
/**
* Get the encoded length of this object in bytes
* @return int representing the number of bytes needed for encoding
*/
int encodedLength();
/**
* Encode this object into the provided ByteBuf
* @param buf - ByteBuf to write the encoded data to
*/
void encode(ByteBuf buf);
}Messages are categorized into request and response types for structured communication patterns.
/**
* Marker interface for request messages
*/
public interface RequestMessage extends Message {
}
/**
* Marker interface for response messages
*/
public interface ResponseMessage extends Message {
}Message for sending remote procedure calls to the server.
public final class RpcRequest extends AbstractMessage implements RequestMessage {
/**
* Create an RPC request message
* @param requestId - Unique identifier for this RPC request
* @param message - ManagedBuffer containing the RPC data
*/
public RpcRequest(long requestId, ManagedBuffer message);
/**
* Get the request identifier
* @return long representing the unique request ID
*/
public long requestId();
@Override
public Type type();
@Override
public ManagedBuffer body();
@Override
public boolean isBodyInFrame();
}Message for returning successful RPC call results.
public final class RpcResponse extends AbstractResponseMessage {
/**
* Create an RPC response message
* @param requestId - ID of the original RPC request
* @param message - ManagedBuffer containing the response data
*/
public RpcResponse(long requestId, ManagedBuffer message);
/**
* Get the request identifier this response corresponds to
* @return long representing the original request ID
*/
public long requestId();
@Override
public Type type();
@Override
public ManagedBuffer body();
}Message for returning RPC call failures and error information.
public final class RpcFailure extends AbstractResponseMessage {
/**
* Create an RPC failure message
* @param requestId - ID of the failed RPC request
* @param errorString - String describing the error that occurred
*/
public RpcFailure(long requestId, String errorString);
/**
* Get the request identifier this failure corresponds to
* @return long representing the original request ID
*/
public long requestId();
/**
* Get the error message describing the failure
* @return String containing error details
*/
public String errorString();
@Override
public Type type();
}Message for requesting specific chunks of data from a stream.
public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
/**
* Create a chunk fetch request
* @param streamChunkId - StreamChunkId identifying the chunk to fetch
*/
public ChunkFetchRequest(StreamChunkId streamChunkId);
/**
* Get the stream chunk identifier
* @return StreamChunkId specifying which chunk to fetch
*/
public StreamChunkId streamChunkId();
@Override
public Type type();
}Message for returning successfully fetched chunk data.
public final class ChunkFetchSuccess extends AbstractResponseMessage {
/**
* Create a successful chunk fetch response
* @param streamChunkId - StreamChunkId of the fetched chunk
* @param buffer - ManagedBuffer containing the chunk data
*/
public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
/**
* Get the stream chunk identifier
* @return StreamChunkId of the returned chunk
*/
public StreamChunkId streamChunkId();
@Override
public Type type();
@Override
public ManagedBuffer body();
}Message for reporting chunk fetch failures.
public final class ChunkFetchFailure extends AbstractResponseMessage {
/**
* Create a chunk fetch failure response
* @param streamChunkId - StreamChunkId of the failed chunk
* @param errorString - String describing the fetch error
*/
public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
/**
* Get the stream chunk identifier
* @return StreamChunkId of the failed chunk
*/
public StreamChunkId streamChunkId();
/**
* Get the error message describing the failure
* @return String containing error details
*/
public String errorString();
@Override
public Type type();
}Identifier for specific chunks within streams.
public final class StreamChunkId implements Encodable {
/**
* Create a stream chunk identifier
* @param streamId - Identifier of the stream
* @param chunkIndex - Index of the chunk within the stream
*/
public StreamChunkId(long streamId, int chunkIndex);
/**
* Get the stream identifier
* @return long representing the stream ID
*/
public long streamId();
/**
* Get the chunk index
* @return int representing the chunk index within the stream
*/
public int chunkIndex();
@Override
public int encodedLength();
@Override
public void encode(ByteBuf buf);
}Message for requesting to open a named stream.
public final class StreamRequest extends AbstractMessage implements RequestMessage {
/**
* Create a stream request
* @param streamId - String identifier of the stream to open
*/
public StreamRequest(String streamId);
/**
* Get the stream identifier
* @return String representing the stream ID
*/
public String streamId();
@Override
public Type type();
}Message for returning streaming data.
public final class StreamResponse extends AbstractResponseMessage {
/**
* Create a stream response
* @param streamId - String identifier of the stream
* @param byteCount - Number of bytes in the stream
* @param body - ManagedBuffer containing the stream data
*/
public StreamResponse(String streamId, long byteCount, ManagedBuffer body);
/**
* Get the stream identifier
* @return String representing the stream ID
*/
public String streamId();
/**
* Get the number of bytes in the stream
* @return long representing the byte count
*/
public long byteCount();
@Override
public Type type();
@Override
public ManagedBuffer body();
}Message for reporting stream operation failures.
public final class StreamFailure extends AbstractResponseMessage {
/**
* Create a stream failure response
* @param streamId - String identifier of the failed stream
* @param errorString - String describing the stream error
*/
public StreamFailure(String streamId, String errorString);
/**
* Get the stream identifier
* @return String representing the failed stream ID
*/
public String streamId();
/**
* Get the error message
* @return String containing error details
*/
public String errorString();
@Override
public Type type();
}Message for uploading stream data to the server.
public final class UploadStream extends AbstractMessage implements RequestMessage {
/**
* Create an upload stream message
* @param requestId - Unique identifier for this upload request
* @param meta - ManagedBuffer containing metadata about the upload
* @param body - ManagedBuffer containing the data to upload
*/
public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body);
/**
* Get the request identifier
* @return long representing the upload request ID
*/
public long requestId();
/**
* Get the metadata buffer
* @return ManagedBuffer containing upload metadata
*/
public ManagedBuffer meta();
@Override
public Type type();
@Override
public ManagedBuffer body();
}Message for one-way communication where no response is expected.
public final class OneWayMessage extends AbstractMessage implements RequestMessage {
/**
* Create a one-way message
* @param body - ManagedBuffer containing the message data
*/
public OneWayMessage(ManagedBuffer body);
@Override
public Type type();
@Override
public ManagedBuffer body();
@Override
public boolean isBodyInFrame();
}Message for requesting metadata about merged blocks in shuffle operations.
public final class MergedBlockMetaRequest extends AbstractMessage implements RequestMessage {
/**
* Create a merged block metadata request
* @param requestId - Unique identifier for this request
* @param appId - Application identifier
* @param shuffleId - Shuffle operation identifier
* @param shuffleMergeId - Shuffle merge identifier
* @param reduceId - Reducer task identifier
*/
public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, int shuffleMergeId, int reduceId);
/**
* Get the request identifier
* @return long representing the request ID
*/
public long requestId();
/**
* Get the application identifier
* @return String representing the app ID
*/
public String appId();
/**
* Get the shuffle identifier
* @return int representing the shuffle ID
*/
public int shuffleId();
/**
* Get the shuffle merge identifier
* @return int representing the shuffle merge ID
*/
public int shuffleMergeId();
/**
* Get the reduce task identifier
* @return int representing the reduce ID
*/
public int reduceId();
@Override
public Type type();
}Message for returning successful merged block metadata.
public final class MergedBlockMetaSuccess extends AbstractResponseMessage {
/**
* Create a successful merged block metadata response
* @param requestId - ID of the original request
* @param numChunks - Number of chunks in the merged block
* @param buffer - ManagedBuffer containing the metadata
*/
public MergedBlockMetaSuccess(long requestId, int numChunks, ManagedBuffer buffer);
/**
* Get the request identifier
* @return long representing the original request ID
*/
public long requestId();
/**
* Get the number of chunks
* @return int representing the chunk count
*/
public int numChunks();
@Override
public Type type();
@Override
public ManagedBuffer body();
}Utility class for encoding and decoding protocol messages.
public class Encoders {
/**
* Decode a message from ByteBuf
* @param msgType - Message type byte
* @param in - ByteBuf containing the encoded message
* @return Message instance decoded from the buffer
*/
public static Message decode(Message.Type msgType, ByteBuf in);
/**
* Encode a message to ByteBuf
* @param msg - Message to encode
* @param out - ByteBuf to write the encoded message to
*/
public static void encode(Message msg, ByteBuf out);
}Netty encoder for converting Message objects to ByteBuf for network transmission.
public final class MessageEncoder extends MessageToByteEncoder<Message> {
public static final MessageEncoder INSTANCE = new MessageEncoder();
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;
}Netty decoder for converting ByteBuf data into Message objects.
public final class MessageDecoder extends LengthFieldBasedFrameDecoder {
public static final MessageDecoder INSTANCE = new MessageDecoder();
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
}import org.apache.spark.network.protocol.*;
import org.apache.spark.network.buffer.NioManagedBuffer;
// Create RPC request
String rpcData = "{ \"method\": \"process\", \"params\": [1, 2, 3] }";
ManagedBuffer requestBuffer = new NioManagedBuffer(ByteBuffer.wrap(rpcData.getBytes()));
long requestId = System.currentTimeMillis();
RpcRequest request = new RpcRequest(requestId, requestBuffer);
System.out.println("Created RPC request with ID: " + request.requestId());
System.out.println("Message type: " + request.type());
System.out.println("Body in frame: " + request.isBodyInFrame());
// Create RPC response
String responseData = "{ \"result\": 6, \"status\": \"success\" }";
ManagedBuffer responseBuffer = new NioManagedBuffer(ByteBuffer.wrap(responseData.getBytes()));
RpcResponse response = new RpcResponse(requestId, responseBuffer);
// Create RPC failure
RpcFailure failure = new RpcFailure(requestId, "Method not found: process");
System.out.println("Failure message: " + failure.errorString());import org.apache.spark.network.protocol.*;
// Create stream chunk identifier
long streamId = 12345L;
int chunkIndex = 0;
StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);
System.out.println("Stream ID: " + chunkId.streamId());
System.out.println("Chunk index: " + chunkId.chunkIndex());
// Create chunk fetch request
ChunkFetchRequest fetchRequest = new ChunkFetchRequest(chunkId);
System.out.println("Fetch request type: " + fetchRequest.type());
// Create successful chunk response
byte[] chunkData = "This is chunk data".getBytes();
ManagedBuffer chunkBuffer = new NioManagedBuffer(ByteBuffer.wrap(chunkData));
ChunkFetchSuccess fetchSuccess = new ChunkFetchSuccess(chunkId, chunkBuffer);
System.out.println("Success response type: " + fetchSuccess.type());
System.out.println("Chunk size: " + fetchSuccess.body().size());
// Create chunk fetch failure
ChunkFetchFailure fetchFailure = new ChunkFetchFailure(chunkId, "Chunk not found");
System.out.println("Failure error: " + fetchFailure.errorString());// Create stream request
String streamId = "data-stream-001";
StreamRequest streamRequest = new StreamRequest(streamId);
System.out.println("Requesting stream: " + streamRequest.streamId());
// Create stream response
String streamData = "Line 1\nLine 2\nLine 3\n";
ManagedBuffer streamBuffer = new NioManagedBuffer(ByteBuffer.wrap(streamData.getBytes()));
StreamResponse streamResponse = new StreamResponse(streamId, streamData.length(), streamBuffer);
System.out.println("Stream response for: " + streamResponse.streamId());
System.out.println("Byte count: " + streamResponse.byteCount());
// Create stream failure
StreamFailure streamFailure = new StreamFailure(streamId, "Stream corrupted");
System.out.println("Stream failure: " + streamFailure.errorString());// Create upload stream message
String metadata = "{ \"filename\": \"data.txt\", \"size\": 1024 }";
ManagedBuffer metaBuffer = new NioManagedBuffer(ByteBuffer.wrap(metadata.getBytes()));
String uploadData = "File content to upload";
ManagedBuffer dataBuffer = new NioManagedBuffer(ByteBuffer.wrap(uploadData.getBytes()));
long uploadRequestId = System.currentTimeMillis();
UploadStream uploadMessage = new UploadStream(uploadRequestId, metaBuffer, dataBuffer);
System.out.println("Upload request ID: " + uploadMessage.requestId());
System.out.println("Upload metadata size: " + uploadMessage.meta().size());
System.out.println("Upload data size: " + uploadMessage.body().size());
// Create one-way message
String oneWayData = "Fire and forget message";
ManagedBuffer oneWayBuffer = new NioManagedBuffer(ByteBuffer.wrap(oneWayData.getBytes()));
OneWayMessage oneWayMessage = new OneWayMessage(oneWayBuffer);
System.out.println("One-way message type: " + oneWayMessage.type());
System.out.println("One-way body in frame: " + oneWayMessage.isBodyInFrame());import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
// Encode a message
RpcRequest request = new RpcRequest(123L, new NioManagedBuffer(ByteBuffer.wrap("test".getBytes())));
ByteBuf encodedBuffer = Unpooled.buffer();
try {
Encoders.encode(request, encodedBuffer);
System.out.println("Encoded message size: " + encodedBuffer.readableBytes());
// Decode the message back
encodedBuffer.resetReaderIndex();
byte msgTypeByte = encodedBuffer.readByte();
Message.Type msgType = Message.Type.values()[msgTypeByte];
Message decodedMessage = Encoders.decode(msgType, encodedBuffer);
System.out.println("Decoded message type: " + decodedMessage.type());
if (decodedMessage instanceof RpcRequest) {
RpcRequest decodedRequest = (RpcRequest) decodedMessage;
System.out.println("Decoded request ID: " + decodedRequest.requestId());
}
} finally {
encodedBuffer.release();
}// Process different message types
public void handleMessage(Message message) {
switch (message.type()) {
case RpcRequest:
RpcRequest rpcReq = (RpcRequest) message;
System.out.println("Handling RPC request: " + rpcReq.requestId());
break;
case ChunkFetchRequest:
ChunkFetchRequest chunkReq = (ChunkFetchRequest) message;
System.out.println("Handling chunk fetch for stream: " + chunkReq.streamChunkId().streamId());
break;
case StreamRequest:
StreamRequest streamReq = (StreamRequest) message;
System.out.println("Handling stream request: " + streamReq.streamId());
break;
case OneWayMessage:
OneWayMessage oneWay = (OneWayMessage) message;
System.out.println("Handling one-way message with body size: " + oneWay.body().size());
break;
default:
System.out.println("Unknown message type: " + message.type());
}
}Base implementation for common message functionality.
public abstract class AbstractMessage implements Message {
@Override
public boolean isBodyInFrame();
@Override
public int encodedLength();
@Override
public void encode(ByteBuf buf);
}Base implementation for response messages.
public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {
@Override
public ManagedBuffer body();
@Override
public boolean isBodyInFrame();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12