Core networking library for Apache Spark providing transport layer abstractions and utilities
—
Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication, providing a type-safe and efficient binary protocol for network communication.
Base interfaces defining the protocol structure with encoding capabilities and message type markers.
/**
* Base interface for all protocol messages, extends Encodable for serialization
*/
public interface Message extends Encodable {
}
/**
* Marker interface for request messages
*/
public interface RequestMessage extends Message {
}
/**
* Marker interface for response messages
*/
public interface ResponseMessage extends Message {
}
/**
* Interface for objects that can be encoded to ByteBuf for network transmission
*/
public interface Encodable {
/**
* Number of bytes this object would take up in encoding
* @return Encoded length in bytes
*/
int encodedLength();
/**
* Serializes this object by writing to the provided ByteBuf
* @param buf ByteBuf to write encoded data to
*/
void encode(ByteBuf buf);
}Abstract base classes providing common functionality for message implementations.
/**
* Base implementation for all message types providing common functionality
*/
public abstract class AbstractMessage implements Message {
/**
* Get the message type identifier
* @return Message.Type enum value
*/
public abstract Message.Type type();
/**
* Get the message body if present
* @return ManagedBuffer containing message body, or null
*/
public abstract ManagedBuffer body();
/**
* Whether this message has a body
* @return true if message has body, false otherwise
*/
public abstract boolean isBodyInFrame();
}
/**
* Base class for response messages providing response-specific functionality
*/
public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {
}Protocol messages for initiating operations, including RPC calls, stream requests, and data uploads.
/**
* Request to fetch a specific chunk from a stream
*/
public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
/**
* Create a chunk fetch request
* @param streamChunkId Identifier for the specific chunk to fetch
*/
public ChunkFetchRequest(StreamChunkId streamChunkId);
/**
* Get the stream chunk identifier
* @return StreamChunkId for the requested chunk
*/
public StreamChunkId streamChunkId();
}
/**
* RPC request with message payload
*/
public final class RpcRequest extends AbstractMessage implements RequestMessage {
/**
* Create an RPC request
* @param requestId Unique identifier for this request
* @param message Message payload as ManagedBuffer
*/
public RpcRequest(long requestId, ManagedBuffer message);
/**
* Get the request identifier
* @return Request ID
*/
public long requestId();
/**
* Get the message payload
* @return ManagedBuffer containing the message data
*/
public ManagedBuffer message();
}
/**
* Request to stream data with given stream ID
*/
public final class StreamRequest extends AbstractMessage implements RequestMessage {
/**
* Create a stream request
* @param streamId Identifier for the stream to request
*/
public StreamRequest(String streamId);
/**
* Get the stream identifier
* @return Stream ID string
*/
public String streamId();
}
/**
* One-way message that expects no response
*/
public final class OneWayMessage extends AbstractMessage implements RequestMessage {
/**
* Create a one-way message
* @param body Message body as ManagedBuffer
*/
public OneWayMessage(ManagedBuffer body);
/**
* Get the message body
* @return ManagedBuffer containing message data
*/
public ManagedBuffer body();
}
/**
* Request to upload streaming data with metadata
*/
public final class UploadStream extends AbstractMessage implements RequestMessage {
/**
* Create an upload stream request
* @param requestId Unique identifier for this upload request
* @param meta Metadata for the upload as ManagedBuffer
* @param data Data to upload as ManagedBuffer
*/
public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer data);
/**
* Get the request identifier
* @return Request ID
*/
public long requestId();
/**
* Get the upload metadata
* @return ManagedBuffer containing metadata
*/
public ManagedBuffer meta();
/**
* Get the data to upload
* @return ManagedBuffer containing data
*/
public ManagedBuffer data();
}Usage Examples:
// RPC request
long requestId = System.currentTimeMillis();
ManagedBuffer payload = new NioManagedBuffer(ByteBuffer.wrap(requestData));
RpcRequest rpcReq = new RpcRequest(requestId, payload);
// Stream request
StreamRequest streamReq = new StreamRequest("my-stream-123");
// Chunk fetch request
StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);
ChunkFetchRequest chunkReq = new ChunkFetchRequest(chunkId);
// One-way message
ManagedBuffer notificationData = new NioManagedBuffer(ByteBuffer.wrap(statusUpdate));
OneWayMessage oneWay = new OneWayMessage(notificationData);
// Upload stream
ManagedBuffer metadata = new NioManagedBuffer(ByteBuffer.wrap(metaBytes));
ManagedBuffer data = new FileSegmentManagedBuffer(conf, file, 0, file.length());
UploadStream upload = new UploadStream(requestId, metadata, data);Protocol messages for responding to requests, including successful responses and error conditions.
/**
* Successful response to chunk fetch with data
*/
public final class ChunkFetchSuccess extends AbstractResponseMessage {
/**
* Create a successful chunk fetch response
* @param streamChunkId Identifier for the fetched chunk
* @param buffer Buffer containing the chunk data
*/
public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
/**
* Get the stream chunk identifier
* @return StreamChunkId for the fetched chunk
*/
public StreamChunkId streamChunkId();
/**
* Get the chunk data
* @return ManagedBuffer containing chunk data
*/
public ManagedBuffer buffer();
}
/**
* Failed chunk fetch response with error message
*/
public final class ChunkFetchFailure extends AbstractResponseMessage {
/**
* Create a failed chunk fetch response
* @param streamChunkId Identifier for the chunk that failed
* @param errorString Error message describing the failure
*/
public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
/**
* Get the stream chunk identifier
* @return StreamChunkId for the failed chunk
*/
public StreamChunkId streamChunkId();
/**
* Get the error message
* @return Error description string
*/
public String errorString();
}
/**
* RPC response with result data
*/
public final class RpcResponse extends AbstractResponseMessage {
/**
* Create an RPC response
* @param requestId Request ID this response corresponds to
* @param message Response payload as ManagedBuffer
*/
public RpcResponse(long requestId, ManagedBuffer message);
/**
* Get the request identifier this response is for
* @return Request ID
*/
public long requestId();
/**
* Get the response payload
* @return ManagedBuffer containing response data
*/
public ManagedBuffer message();
}
/**
* RPC failure response with error message
*/
public final class RpcFailure extends AbstractResponseMessage {
/**
* Create an RPC failure response
* @param requestId Request ID this failure corresponds to
* @param errorString Error message describing the failure
*/
public RpcFailure(long requestId, String errorString);
/**
* Get the request identifier this failure is for
* @return Request ID
*/
public long requestId();
/**
* Get the error message
* @return Error description string
*/
public String errorString();
}
/**
* Response to stream request with stream metadata
*/
public final class StreamResponse extends AbstractResponseMessage {
/**
* Create a stream response
* @param streamId Stream identifier
* @param byteCount Total number of bytes in the stream
* @param buffer Initial stream data buffer
*/
public StreamResponse(String streamId, long byteCount, ManagedBuffer buffer);
/**
* Get the stream identifier
* @return Stream ID string
*/
public String streamId();
/**
* Get the total byte count for the stream
* @return Total bytes in stream
*/
public long byteCount();
/**
* Get the initial stream data
* @return ManagedBuffer containing initial data
*/
public ManagedBuffer buffer();
}
/**
* Stream failure response with error message
*/
public final class StreamFailure extends AbstractResponseMessage {
/**
* Create a stream failure response
* @param streamId Stream identifier that failed
* @param errorString Error message describing the failure
*/
public StreamFailure(String streamId, String errorString);
/**
* Get the stream identifier
* @return Stream ID string
*/
public String streamId();
/**
* Get the error message
* @return Error description string
*/
public String errorString();
}Usage Examples:
// Successful RPC response
ManagedBuffer responseData = new NioManagedBuffer(ByteBuffer.wrap(resultBytes));
RpcResponse rpcResp = new RpcResponse(originalRequestId, responseData);
// RPC failure
RpcFailure rpcFail = new RpcFailure(originalRequestId, "Processing failed: invalid input");
// Successful chunk fetch
ManagedBuffer chunkData = new FileSegmentManagedBuffer(conf, file, offset, length);
ChunkFetchSuccess chunkSuccess = new ChunkFetchSuccess(streamChunkId, chunkData);
// Chunk fetch failure
ChunkFetchFailure chunkFail = new ChunkFetchFailure(streamChunkId, "Chunk not found");
// Stream response
ManagedBuffer streamData = // ... initial stream data
StreamResponse streamResp = new StreamResponse("stream-456", totalBytes, streamData);
// Stream failure
StreamFailure streamFail = new StreamFailure("stream-456", "Stream source unavailable");Netty-based encoder and decoder for efficient message serialization and deserialization with frame-based transport.
/**
* Netty encoder for Message objects, converts messages to ByteBuf for transmission
*/
public final class MessageEncoder extends MessageToByteEncoder<Message> {
/**
* Singleton instance for reuse
*/
public static final MessageEncoder INSTANCE = new MessageEncoder();
/**
* Encode a message to ByteBuf for transmission
* @param ctx Channel handler context
* @param msg Message to encode
* @param out Output ByteBuf to write encoded data
* @throws Exception if encoding fails
*/
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;
}
/**
* Netty decoder for Message objects, converts ByteBuf frames to Message instances
*/
public final class MessageDecoder extends LengthFieldBasedFrameDecoder {
/**
* Singleton instance for reuse
*/
public static final MessageDecoder INSTANCE = new MessageDecoder();
/**
* Decode a ByteBuf frame to Message object
* @param ctx Channel handler context
* @param in Input ByteBuf containing encoded message
* @return Decoded Message object
* @throws Exception if decoding fails
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
}Usage Examples:
// Adding to Netty pipeline
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("messageDecoder", MessageDecoder.INSTANCE);
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("messageEncoder", MessageEncoder.INSTANCE);
// Messages are automatically encoded/decoded by the pipelineUtility classes for encoding common data types with consistent byte representation.
/**
* Utility classes for encoding common data types
*/
public class Encoders {
/**
* String encoding utilities
*/
public static class Strings {
/**
* Encode string length and content to ByteBuf
* @param buf ByteBuf to write to
* @param s String to encode
*/
public static void encode(ByteBuf buf, String s);
/**
* Decode string from ByteBuf
* @param buf ByteBuf to read from
* @return Decoded string
*/
public static String decode(ByteBuf buf);
/**
* Calculate encoded length of string
* @param s String to measure
* @return Encoded length in bytes
*/
public static int encodedLength(String s);
}
/**
* Byte array encoding utilities
*/
public static class ByteArrays {
/**
* Encode byte array length and content to ByteBuf
* @param buf ByteBuf to write to
* @param arr Byte array to encode
*/
public static void encode(ByteBuf buf, byte[] arr);
/**
* Decode byte array from ByteBuf
* @param buf ByteBuf to read from
* @return Decoded byte array
*/
public static byte[] decode(ByteBuf buf);
/**
* Calculate encoded length of byte array
* @param arr Byte array to measure
* @return Encoded length in bytes
*/
public static int encodedLength(byte[] arr);
}
/**
* String array encoding utilities
*/
public static class StringArrays {
/**
* Encode string array to ByteBuf
* @param buf ByteBuf to write to
* @param strings String array to encode
*/
public static void encode(ByteBuf buf, String[] strings);
/**
* Decode string array from ByteBuf
* @param buf ByteBuf to read from
* @return Decoded string array
*/
public static String[] decode(ByteBuf buf);
/**
* Calculate encoded length of string array
* @param strings String array to measure
* @return Encoded length in bytes
*/
public static int encodedLength(String[] strings);
}
}Utility for identifying specific chunks within streams for efficient chunk-based data transfer.
/**
* Identifies a specific chunk within a stream for chunk-based data transfer
*/
public final class StreamChunkId implements Encodable {
/**
* Create a stream chunk identifier
* @param streamId Numeric stream identifier
* @param chunkIndex Index of the chunk within the stream
*/
public StreamChunkId(long streamId, int chunkIndex);
/**
* Get the stream identifier
* @return Stream ID
*/
public long streamId();
/**
* Get the chunk index
* @return Chunk index within the stream
*/
public int chunkIndex();
/**
* Calculate encoded length of this identifier
* @return Encoded length in bytes
*/
public int encodedLength();
/**
* Encode this identifier to ByteBuf
* @param buf ByteBuf to write encoded data to
*/
public void encode(ByteBuf buf);
/**
* Decode a StreamChunkId from ByteBuffer
* @param buffer ByteBuffer containing encoded data
* @return Decoded StreamChunkId
*/
public static StreamChunkId decode(ByteBuffer buffer);
}Usage Examples:
// Create chunk identifier
StreamChunkId chunkId = new StreamChunkId(streamId, 5); // 5th chunk of the stream
// Use in requests
ChunkFetchRequest request = new ChunkFetchRequest(chunkId);
// Encoding for transmission
ByteBuf buf = Unpooled.buffer(chunkId.encodedLength());
chunkId.encode(buf);
// Decoding from received data
StreamChunkId decoded = StreamChunkId.decode(receivedBuffer);// Creating and sending an RPC request
long requestId = generateRequestId();
ManagedBuffer requestData = new NioManagedBuffer(ByteBuffer.wrap(payload));
RpcRequest request = new RpcRequest(requestId, requestData);
// Handle RPC response
if (response instanceof RpcResponse) {
RpcResponse rpcResp = (RpcResponse) response;
if (rpcResp.requestId() == requestId) {
ManagedBuffer responseData = rpcResp.message();
processResponse(responseData);
}
} else if (response instanceof RpcFailure) {
RpcFailure failure = (RpcFailure) response;
handleError(failure.errorString());
}// Comprehensive error handling
public void handleMessage(Message message) {
if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure failure = (ChunkFetchFailure) message;
logger.error("Chunk fetch failed for {}: {}",
failure.streamChunkId(), failure.errorString());
retryChunkFetch(failure.streamChunkId());
} else if (message instanceof RpcFailure) {
RpcFailure failure = (RpcFailure) message;
logger.error("RPC {} failed: {}", failure.requestId(), failure.errorString());
handleRpcFailure(failure.requestId(), failure.errorString());
} else if (message instanceof StreamFailure) {
StreamFailure failure = (StreamFailure) message;
logger.error("Stream {} failed: {}", failure.streamId(), failure.errorString());
cleanupStream(failure.streamId());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common