Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication
—
Type-safe message protocol for network communication in Apache Spark. The protocol defines specific message types for RPC requests/responses, chunk fetching, and streaming operations, ensuring reliable and structured communication between clients and servers.
Base interface for all network messages in the Spark transport protocol.
/**
* Message represents a unit of communication in the Spark transport protocol.
* All network messages implement this interface to provide type safety and structure.
*/
public interface Message {
/**
* Gets the message type identifier.
*
* @return The Type enum value for this message
*/
Type type();
/**
* Gets the message body as a ManagedBuffer.
*
* @return ManagedBuffer containing the message payload
*/
ManagedBuffer body();
/**
* Indicates whether the message body is included in the message frame.
*
* @return true if body is in frame, false if sent separately
*/
boolean isBodyInFrame();
/**
* Enumeration of all message types in the protocol.
*/
enum Type {
ChunkFetchRequest(0),
ChunkFetchSuccess(1),
ChunkFetchFailure(2),
RpcRequest(3),
RpcResponse(4),
RpcFailure(5),
StreamRequest(6),
StreamResponse(7),
StreamFailure(8),
OneWayMessage(9),
User(-1);
private final byte id;
Type(int id) {
this.id = (byte) id;
}
public byte id() { return id; }
}
}Marker interfaces for categorizing messages as requests or responses.
/**
* Marker interface for request messages.
* All messages that initiate communication implement this interface.
*/
public interface RequestMessage extends Message {}
/**
* Marker interface for response messages.
* All messages that respond to requests implement this interface.
*/
public interface ResponseMessage extends Message {}Interface for objects that can be encoded to ByteBuf for network transmission.
/**
* Interface for objects that can be encoded to ByteBuf.
* Used by the message protocol for efficient serialization.
*/
public interface Encodable {
/**
* Gets the encoded length of this object in bytes.
*
* @return The number of bytes needed to encode this object
*/
int encodedLength();
/**
* Encodes this object to the provided ByteBuf.
*
* @param buf The ByteBuf to encode to
*/
void encode(ByteBuf buf);
}Messages for Remote Procedure Call operations.
/**
* RPC request message containing a request ID and payload.
*/
public class RpcRequest extends AbstractMessage implements RequestMessage {
/** Unique identifier for this RPC request */
public final long requestId;
/**
* Creates an RPC request message.
*
* @param requestId Unique identifier for the request
* @param message The request payload as ManagedBuffer
*/
public RpcRequest(long requestId, ManagedBuffer message);
@Override
public Type type() { return Type.RpcRequest; }
@Override
public int encodedLength() { return 8; } // requestId is 8 bytes
@Override
public void encode(ByteBuf buf) {
buf.writeLong(requestId);
}
public static RpcRequest decode(ByteBuf buf) {
long requestId = buf.readLong();
return new RpcRequest(requestId, null); // Body handled separately
}
}
/**
* RPC response message containing the response to a previous request.
*/
public class RpcResponse extends AbstractResponseMessage {
/** Request ID this response corresponds to */
public final long requestId;
/**
* Creates an RPC response message.
*
* @param requestId The request ID this response corresponds to
* @param message The response payload as ManagedBuffer
*/
public RpcResponse(long requestId, ManagedBuffer message);
@Override
public Type type() { return Type.RpcResponse; }
}
/**
* RPC failure message indicating an RPC request failed.
*/
public class RpcFailure extends AbstractResponseMessage {
/** Request ID this failure corresponds to */
public final long requestId;
/** Error message describing the failure */
public final String errorString;
/**
* Creates an RPC failure message.
*
* @param requestId The request ID that failed
* @param errorString Description of the error
*/
public RpcFailure(long requestId, String errorString);
@Override
public Type type() { return Type.RpcFailure; }
}Messages for fetching individual chunks of data from streams.
/**
* Request to fetch a specific chunk from a stream.
*/
public class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
/** Identifier for the stream and chunk */
public final StreamChunkId streamChunkId;
/**
* Creates a chunk fetch request.
*
* @param streamChunkId Identifies the stream and chunk to fetch
*/
public ChunkFetchRequest(StreamChunkId streamChunkId);
@Override
public Type type() { return Type.ChunkFetchRequest; }
@Override
public int encodedLength() { return 12; } // streamId (8) + chunkIndex (4)
@Override
public void encode(ByteBuf buf) {
buf.writeLong(streamChunkId.streamId);
buf.writeInt(streamChunkId.chunkIndex);
}
public static ChunkFetchRequest decode(ByteBuf buf) {
long streamId = buf.readLong();
int chunkIndex = buf.readInt();
return new ChunkFetchRequest(new StreamChunkId(streamId, chunkIndex));
}
}
/**
* Successful response to a chunk fetch request containing the chunk data.
*/
public class ChunkFetchSuccess extends AbstractResponseMessage {
/** Identifier for the stream and chunk */
public final StreamChunkId streamChunkId;
/**
* Creates a successful chunk fetch response.
*
* @param streamChunkId Identifies the stream and chunk
* @param buffer The chunk data
*/
public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
@Override
public Type type() { return Type.ChunkFetchSuccess; }
}
/**
* Failure response to a chunk fetch request indicating the fetch failed.
*/
public class ChunkFetchFailure extends AbstractResponseMessage {
/** Identifier for the stream and chunk that failed */
public final StreamChunkId streamChunkId;
/** Error message describing the failure */
public final String errorString;
/**
* Creates a chunk fetch failure response.
*
* @param streamChunkId Identifies the stream and chunk that failed
* @param errorString Description of the error
*/
public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
@Override
public Type type() { return Type.ChunkFetchFailure; }
}Messages for stream-based data transfer operations.
/**
* Request to open a stream for data transfer.
*/
public class StreamRequest extends AbstractMessage implements RequestMessage {
/** String identifier for the stream to open */
public final String streamId;
/**
* Creates a stream request.
*
* @param streamId String identifier for the stream
*/
public StreamRequest(String streamId);
@Override
public Type type() { return Type.StreamRequest; }
}
/**
* Successful response to a stream request indicating the stream is ready.
*/
public class StreamResponse extends AbstractResponseMessage {
/** String identifier for the opened stream */
public final String streamId;
/** Total number of bytes in the stream */
public final long byteCount;
/**
* Creates a stream response.
*
* @param streamId String identifier for the stream
* @param byteCount Total number of bytes in the stream
*/
public StreamResponse(String streamId, long byteCount);
@Override
public Type type() { return Type.StreamResponse; }
}
/**
* Failure response to a stream request indicating the stream could not be opened.
*/
public class StreamFailure extends AbstractResponseMessage {
/** String identifier for the stream that failed */
public final String streamId;
/** Error message describing the failure */
public final String errorString;
/**
* Creates a stream failure response.
*
* @param streamId String identifier for the stream that failed
* @param errorString Description of the error
*/
public StreamFailure(String streamId, String errorString);
@Override
public Type type() { return Type.StreamFailure; }
}Messages that don't expect a response.
/**
* One-way message that doesn't expect a response.
* Used for notifications, heartbeats, or fire-and-forget operations.
*/
public class OneWayMessage extends AbstractMessage implements RequestMessage {
/**
* Creates a one-way message.
*
* @param body The message payload as ManagedBuffer
*/
public OneWayMessage(ManagedBuffer body);
@Override
public Type type() { return Type.OneWayMessage; }
@Override
public boolean isBodyInFrame() { return false; }
}Utility classes for identifying streams and chunks.
/**
* Identifier for a specific chunk within a stream.
* Combines stream ID and chunk index for unique chunk identification.
*/
public class StreamChunkId {
/** Numeric identifier for the stream */
public final long streamId;
/** Index of the chunk within the stream (0-based) */
public final int chunkIndex;
/**
* Creates a stream chunk identifier.
*
* @param streamId Numeric identifier for the stream
* @param chunkIndex Index of the chunk within the stream
*/
public StreamChunkId(long streamId, int chunkIndex);
@Override
public String toString() {
return "StreamChunkId{streamId=" + streamId + ", chunkIndex=" + chunkIndex + "}";
}
@Override
public boolean equals(Object other) {
if (this == other) return true;
if (other == null || getClass() != other.getClass()) return false;
StreamChunkId that = (StreamChunkId) other;
return streamId == that.streamId && chunkIndex == that.chunkIndex;
}
@Override
public int hashCode() {
return Objects.hash(streamId, chunkIndex);
}
}Utilities for message serialization and deserialization.
/**
* Utility class providing encoding and decoding functions for protocol messages.
*/
public class Encoders {
/**
* Decodes a message from a ByteBuf based on message type.
*
* @param msgType The message type to decode
* @param in ByteBuf containing the encoded message
* @return Decoded Message instance
*/
public static Message decode(Message.Type msgType, ByteBuf in);
/**
* Encodes a message to a ByteBuf.
*
* @param msg The message to encode
* @param out ByteBuf to write the encoded message to
*/
public static void encode(Message msg, ByteBuf out);
}
/**
* Netty decoder for converting ByteBuf to Message objects.
*/
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out);
}
/**
* Netty encoder for converting Message objects to ByteBuf.
*/
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out);
}import org.apache.spark.network.protocol.*;
import org.apache.spark.network.buffer.NioManagedBuffer;
import java.nio.ByteBuffer;
// Create an RPC request
ByteBuffer requestData = ByteBuffer.wrap("Hello Server".getBytes());
ManagedBuffer requestBuffer = new NioManagedBuffer(requestData);
RpcRequest rpcRequest = new RpcRequest(12345L, requestBuffer);
// Create a chunk fetch request
StreamChunkId chunkId = new StreamChunkId(1001L, 5);
ChunkFetchRequest chunkRequest = new ChunkFetchRequest(chunkId);
// Create a stream request
StreamRequest streamRequest = new StreamRequest("data-stream-001");
// Create a one-way message
ByteBuffer notificationData = ByteBuffer.wrap("Status update".getBytes());
ManagedBuffer notificationBuffer = new NioManagedBuffer(notificationData);
OneWayMessage notification = new OneWayMessage(notificationBuffer);// Handle different response types
public void handleResponse(Message response) {
switch (response.type()) {
case RpcResponse:
RpcResponse rpcResp = (RpcResponse) response;
System.out.println("RPC response for request: " + rpcResp.requestId);
processRpcResponse(rpcResp.body());
break;
case RpcFailure:
RpcFailure rpcFail = (RpcFailure) response;
System.err.println("RPC " + rpcFail.requestId + " failed: " + rpcFail.errorString);
break;
case ChunkFetchSuccess:
ChunkFetchSuccess chunkSuccess = (ChunkFetchSuccess) response;
System.out.println("Received chunk: " + chunkSuccess.streamChunkId);
processChunkData(chunkSuccess.body());
break;
case ChunkFetchFailure:
ChunkFetchFailure chunkFail = (ChunkFetchFailure) response;
System.err.println("Chunk fetch failed: " + chunkFail.streamChunkId +
" - " + chunkFail.errorString);
break;
case StreamResponse:
StreamResponse streamResp = (StreamResponse) response;
System.out.println("Stream " + streamResp.streamId +
" opened with " + streamResp.byteCount + " bytes");
break;
case StreamFailure:
StreamFailure streamFail = (StreamFailure) response;
System.err.println("Stream " + streamFail.streamId +
" failed: " + streamFail.errorString);
break;
default:
System.err.println("Unknown response type: " + response.type());
}
}public class MessageProcessor {
public void processMessage(Message message) {
// Check if message has body
if (message.body() != null && message.body().size() > 0) {
try {
ByteBuffer bodyData = message.body().nioByteBuffer();
// Process based on message type
if (message instanceof RpcRequest) {
processRpcRequest((RpcRequest) message, bodyData);
} else if (message instanceof ChunkFetchRequest) {
processChunkRequest((ChunkFetchRequest) message);
} else if (message instanceof StreamRequest) {
processStreamRequest((StreamRequest) message);
} else if (message instanceof OneWayMessage) {
processOneWayMessage(bodyData);
}
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
} finally {
// Important: Release buffer when done
if (message.body() != null) {
message.body().release();
}
}
}
}
private void processRpcRequest(RpcRequest request, ByteBuffer body) {
System.out.println("Processing RPC request " + request.requestId);
// Extract request data
byte[] requestBytes = new byte[body.remaining()];
body.get(requestBytes);
String requestString = new String(requestBytes);
// Process request and generate response
// (Response would be sent via callback in actual implementation)
}
private void processChunkRequest(ChunkFetchRequest request) {
System.out.println("Processing chunk request: " + request.streamChunkId);
// Look up chunk data and prepare response
// (Would use StreamManager in actual implementation)
}
private void processStreamRequest(StreamRequest request) {
System.out.println("Processing stream request: " + request.streamId);
// Open stream and prepare response
// (Would use StreamManager in actual implementation)
}
private void processOneWayMessage(ByteBuffer body) {
byte[] messageBytes = new byte[body.remaining()];
body.get(messageBytes);
String message = new String(messageBytes);
System.out.println("Received one-way message: " + message);
// Process notification without sending response
}
}public class MessageValidator {
public static boolean validateMessage(Message message) {
if (message == null) {
return false;
}
// Validate message type
if (message.type() == null) {
return false;
}
// Type-specific validation
switch (message.type()) {
case RpcRequest:
return validateRpcRequest((RpcRequest) message);
case ChunkFetchRequest:
return validateChunkFetchRequest((ChunkFetchRequest) message);
case StreamRequest:
return validateStreamRequest((StreamRequest) message);
default:
return true; // Other types are valid by default
}
}
private static boolean validateRpcRequest(RpcRequest request) {
// Validate request ID is positive
if (request.requestId <= 0) {
return false;
}
// Validate body exists and has reasonable size
if (request.body() == null || request.body().size() > MAX_RPC_SIZE) {
return false;
}
return true;
}
private static boolean validateChunkFetchRequest(ChunkFetchRequest request) {
if (request.streamChunkId == null) {
return false;
}
// Validate stream ID and chunk index
return request.streamChunkId.streamId > 0 &&
request.streamChunkId.chunkIndex >= 0;
}
private static boolean validateStreamRequest(StreamRequest request) {
// Validate stream ID is not null or empty
return request.streamId != null && !request.streamId.trim().isEmpty();
}
private static final long MAX_RPC_SIZE = 16 * 1024 * 1024; // 16MB
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10