CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common

Core networking library for Apache Spark providing transport layer abstractions and utilities

Pending
Overview
Eval results
Files

protocol.mddocs/

Protocol Handling

Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication, providing a type-safe and efficient binary protocol for network communication.

Capabilities

Core Protocol Interfaces

Base interfaces defining the protocol structure with encoding capabilities and message type markers.

/**
 * Base interface for all protocol messages, extends Encodable for serialization
 */
public interface Message extends Encodable {
}

/**
 * Marker interface for request messages
 */
public interface RequestMessage extends Message {
}

/**
 * Marker interface for response messages  
 */
public interface ResponseMessage extends Message {
}

/**
 * Interface for objects that can be encoded to ByteBuf for network transmission
 */
public interface Encodable {
    /**
     * Number of bytes this object would take up in encoding
     * @return Encoded length in bytes
     */
    int encodedLength();
    
    /**
     * Serializes this object by writing to the provided ByteBuf
     * @param buf ByteBuf to write encoded data to
     */
    void encode(ByteBuf buf);
}

Message Base Classes

Abstract base classes providing common functionality for message implementations.

/**
 * Base implementation for all message types providing common functionality
 */
public abstract class AbstractMessage implements Message {
    /**
     * Get the message type identifier
     * @return Message.Type enum value
     */
    public abstract Message.Type type();
    
    /**
     * Get the message body if present
     * @return ManagedBuffer containing message body, or null
     */
    public abstract ManagedBuffer body();
    
    /**
     * Whether this message has a body
     * @return true if message has body, false otherwise
     */
    public abstract boolean isBodyInFrame();
}

/**
 * Base class for response messages providing response-specific functionality
 */
public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {
}

Request Messages

Protocol messages for initiating operations, including RPC calls, stream requests, and data uploads.

/**
 * Request to fetch a specific chunk from a stream
 */
public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
    /**
     * Create a chunk fetch request
     * @param streamChunkId Identifier for the specific chunk to fetch
     */
    public ChunkFetchRequest(StreamChunkId streamChunkId);
    
    /**
     * Get the stream chunk identifier
     * @return StreamChunkId for the requested chunk
     */
    public StreamChunkId streamChunkId();
}

/**
 * RPC request with message payload
 */
public final class RpcRequest extends AbstractMessage implements RequestMessage {
    /**
     * Create an RPC request
     * @param requestId Unique identifier for this request
     * @param message Message payload as ManagedBuffer
     */
    public RpcRequest(long requestId, ManagedBuffer message);
    
    /**
     * Get the request identifier
     * @return Request ID
     */
    public long requestId();
    
    /**
     * Get the message payload
     * @return ManagedBuffer containing the message data
     */
    public ManagedBuffer message();
}

/**
 * Request to stream data with given stream ID
 */
public final class StreamRequest extends AbstractMessage implements RequestMessage {
    /**
     * Create a stream request
     * @param streamId Identifier for the stream to request
     */
    public StreamRequest(String streamId);
    
    /**
     * Get the stream identifier
     * @return Stream ID string
     */
    public String streamId();
}

/**
 * One-way message that expects no response
 */
public final class OneWayMessage extends AbstractMessage implements RequestMessage {
    /**
     * Create a one-way message
     * @param body Message body as ManagedBuffer
     */
    public OneWayMessage(ManagedBuffer body);
    
    /**
     * Get the message body
     * @return ManagedBuffer containing message data
     */
    public ManagedBuffer body();
}

/**
 * Request to upload streaming data with metadata
 */
public final class UploadStream extends AbstractMessage implements RequestMessage {
    /**
     * Create an upload stream request
     * @param requestId Unique identifier for this upload request
     * @param meta Metadata for the upload as ManagedBuffer
     * @param data Data to upload as ManagedBuffer
     */
    public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer data);
    
    /**
     * Get the request identifier
     * @return Request ID
     */
    public long requestId();
    
    /**
     * Get the upload metadata
     * @return ManagedBuffer containing metadata
     */
    public ManagedBuffer meta();
    
    /**
     * Get the data to upload
     * @return ManagedBuffer containing data
     */
    public ManagedBuffer data();
}

Usage Examples:

// RPC request
long requestId = System.currentTimeMillis();
ManagedBuffer payload = new NioManagedBuffer(ByteBuffer.wrap(requestData));
RpcRequest rpcReq = new RpcRequest(requestId, payload);

// Stream request
StreamRequest streamReq = new StreamRequest("my-stream-123");

// Chunk fetch request
StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);
ChunkFetchRequest chunkReq = new ChunkFetchRequest(chunkId);

// One-way message
ManagedBuffer notificationData = new NioManagedBuffer(ByteBuffer.wrap(statusUpdate));
OneWayMessage oneWay = new OneWayMessage(notificationData);

// Upload stream
ManagedBuffer metadata = new NioManagedBuffer(ByteBuffer.wrap(metaBytes));
ManagedBuffer data = new FileSegmentManagedBuffer(conf, file, 0, file.length());
UploadStream upload = new UploadStream(requestId, metadata, data);

Response Messages

Protocol messages for responding to requests, including successful responses and error conditions.

/**
 * Successful response to chunk fetch with data
 */
public final class ChunkFetchSuccess extends AbstractResponseMessage {
    /**
     * Create a successful chunk fetch response
     * @param streamChunkId Identifier for the fetched chunk
     * @param buffer Buffer containing the chunk data
     */
    public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
    
    /**
     * Get the stream chunk identifier
     * @return StreamChunkId for the fetched chunk
     */
    public StreamChunkId streamChunkId();
    
    /**
     * Get the chunk data
     * @return ManagedBuffer containing chunk data
     */
    public ManagedBuffer buffer();
}

/**
 * Failed chunk fetch response with error message
 */
public final class ChunkFetchFailure extends AbstractResponseMessage {
    /**
     * Create a failed chunk fetch response
     * @param streamChunkId Identifier for the chunk that failed
     * @param errorString Error message describing the failure
     */
    public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
    
    /**
     * Get the stream chunk identifier
     * @return StreamChunkId for the failed chunk
     */
    public StreamChunkId streamChunkId();
    
    /**
     * Get the error message
     * @return Error description string
     */
    public String errorString();
}

/**
 * RPC response with result data
 */
public final class RpcResponse extends AbstractResponseMessage {
    /**
     * Create an RPC response
     * @param requestId Request ID this response corresponds to
     * @param message Response payload as ManagedBuffer
     */
    public RpcResponse(long requestId, ManagedBuffer message);
    
    /**
     * Get the request identifier this response is for
     * @return Request ID
     */
    public long requestId();
    
    /**
     * Get the response payload
     * @return ManagedBuffer containing response data
     */
    public ManagedBuffer message();
}

/**
 * RPC failure response with error message
 */
public final class RpcFailure extends AbstractResponseMessage {
    /**
     * Create an RPC failure response
     * @param requestId Request ID this failure corresponds to
     * @param errorString Error message describing the failure
     */
    public RpcFailure(long requestId, String errorString);
    
    /**
     * Get the request identifier this failure is for
     * @return Request ID
     */
    public long requestId();
    
    /**
     * Get the error message
     * @return Error description string
     */
    public String errorString();
}

/**
 * Response to stream request with stream metadata
 */
public final class StreamResponse extends AbstractResponseMessage {
    /**
     * Create a stream response
     * @param streamId Stream identifier
     * @param byteCount Total number of bytes in the stream
     * @param buffer Initial stream data buffer
     */
    public StreamResponse(String streamId, long byteCount, ManagedBuffer buffer);
    
    /**
     * Get the stream identifier
     * @return Stream ID string
     */
    public String streamId();
    
    /**
     * Get the total byte count for the stream
     * @return Total bytes in stream
     */
    public long byteCount();
    
    /**
     * Get the initial stream data
     * @return ManagedBuffer containing initial data
     */
    public ManagedBuffer buffer();
}

/**
 * Stream failure response with error message
 */
public final class StreamFailure extends AbstractResponseMessage {
    /**
     * Create a stream failure response
     * @param streamId Stream identifier that failed
     * @param errorString Error message describing the failure
     */
    public StreamFailure(String streamId, String errorString);
    
    /**
     * Get the stream identifier
     * @return Stream ID string
     */
    public String streamId();
    
    /**
     * Get the error message
     * @return Error description string
     */
    public String errorString();
}

Usage Examples:

// Successful RPC response
ManagedBuffer responseData = new NioManagedBuffer(ByteBuffer.wrap(resultBytes));
RpcResponse rpcResp = new RpcResponse(originalRequestId, responseData);

// RPC failure
RpcFailure rpcFail = new RpcFailure(originalRequestId, "Processing failed: invalid input");

// Successful chunk fetch
ManagedBuffer chunkData = new FileSegmentManagedBuffer(conf, file, offset, length);
ChunkFetchSuccess chunkSuccess = new ChunkFetchSuccess(streamChunkId, chunkData);

// Chunk fetch failure
ChunkFetchFailure chunkFail = new ChunkFetchFailure(streamChunkId, "Chunk not found");

// Stream response
ManagedBuffer streamData = // ... initial stream data
StreamResponse streamResp = new StreamResponse("stream-456", totalBytes, streamData);

// Stream failure
StreamFailure streamFail = new StreamFailure("stream-456", "Stream source unavailable");

Encoding and Decoding

Netty-based encoder and decoder for efficient message serialization and deserialization with frame-based transport.

/**
 * Netty encoder for Message objects, converts messages to ByteBuf for transmission
 */
public final class MessageEncoder extends MessageToByteEncoder<Message> {
    /**
     * Singleton instance for reuse
     */
    public static final MessageEncoder INSTANCE = new MessageEncoder();
    
    /**
     * Encode a message to ByteBuf for transmission
     * @param ctx Channel handler context
     * @param msg Message to encode
     * @param out Output ByteBuf to write encoded data
     * @throws Exception if encoding fails
     */
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;
}

/**
 * Netty decoder for Message objects, converts ByteBuf frames to Message instances
 */
public final class MessageDecoder extends LengthFieldBasedFrameDecoder {
    /**
     * Singleton instance for reuse
     */
    public static final MessageDecoder INSTANCE = new MessageDecoder();
    
    /**
     * Decode a ByteBuf frame to Message object
     * @param ctx Channel handler context  
     * @param in Input ByteBuf containing encoded message
     * @return Decoded Message object
     * @throws Exception if decoding fails
     */
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
}

Usage Examples:

// Adding to Netty pipeline
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("messageDecoder", MessageDecoder.INSTANCE);
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("messageEncoder", MessageEncoder.INSTANCE);

// Messages are automatically encoded/decoded by the pipeline

Encoding Utilities

Utility classes for encoding common data types with consistent byte representation.

/**
 * Utility classes for encoding common data types
 */
public class Encoders {
    
    /**
     * String encoding utilities
     */
    public static class Strings {
        /**
         * Encode string length and content to ByteBuf
         * @param buf ByteBuf to write to
         * @param s String to encode
         */
        public static void encode(ByteBuf buf, String s);
        
        /**
         * Decode string from ByteBuf
         * @param buf ByteBuf to read from
         * @return Decoded string
         */
        public static String decode(ByteBuf buf);
        
        /**
         * Calculate encoded length of string
         * @param s String to measure
         * @return Encoded length in bytes
         */
        public static int encodedLength(String s);
    }
    
    /**
     * Byte array encoding utilities
     */
    public static class ByteArrays {
        /**
         * Encode byte array length and content to ByteBuf
         * @param buf ByteBuf to write to
         * @param arr Byte array to encode
         */
        public static void encode(ByteBuf buf, byte[] arr);
        
        /**
         * Decode byte array from ByteBuf
         * @param buf ByteBuf to read from
         * @return Decoded byte array
         */
        public static byte[] decode(ByteBuf buf);
        
        /**
         * Calculate encoded length of byte array
         * @param arr Byte array to measure
         * @return Encoded length in bytes
         */
        public static int encodedLength(byte[] arr);
    }
    
    /**
     * String array encoding utilities
     */
    public static class StringArrays {
        /**
         * Encode string array to ByteBuf
         * @param buf ByteBuf to write to
         * @param strings String array to encode
         */
        public static void encode(ByteBuf buf, String[] strings);
        
        /**
         * Decode string array from ByteBuf
         * @param buf ByteBuf to read from
         * @return Decoded string array
         */
        public static String[] decode(ByteBuf buf);
        
        /**
         * Calculate encoded length of string array
         * @param strings String array to measure
         * @return Encoded length in bytes
         */
        public static int encodedLength(String[] strings);
    }
}

Stream Chunk Identifier

Utility for identifying specific chunks within streams for efficient chunk-based data transfer.

/**
 * Identifies a specific chunk within a stream for chunk-based data transfer
 */
public final class StreamChunkId implements Encodable {
    /**
     * Create a stream chunk identifier
     * @param streamId Numeric stream identifier
     * @param chunkIndex Index of the chunk within the stream
     */
    public StreamChunkId(long streamId, int chunkIndex);
    
    /**
     * Get the stream identifier
     * @return Stream ID
     */
    public long streamId();
    
    /**
     * Get the chunk index
     * @return Chunk index within the stream
     */
    public int chunkIndex();
    
    /**
     * Calculate encoded length of this identifier
     * @return Encoded length in bytes
     */
    public int encodedLength();
    
    /**
     * Encode this identifier to ByteBuf
     * @param buf ByteBuf to write encoded data to
     */
    public void encode(ByteBuf buf);
    
    /**
     * Decode a StreamChunkId from ByteBuffer
     * @param buffer ByteBuffer containing encoded data
     * @return Decoded StreamChunkId
     */
    public static StreamChunkId decode(ByteBuffer buffer);
}

Usage Examples:

// Create chunk identifier
StreamChunkId chunkId = new StreamChunkId(streamId, 5); // 5th chunk of the stream

// Use in requests
ChunkFetchRequest request = new ChunkFetchRequest(chunkId);

// Encoding for transmission
ByteBuf buf = Unpooled.buffer(chunkId.encodedLength());
chunkId.encode(buf);

// Decoding from received data  
StreamChunkId decoded = StreamChunkId.decode(receivedBuffer);

Protocol Usage Patterns

Message Creation and Handling

// Creating and sending an RPC request
long requestId = generateRequestId();
ManagedBuffer requestData = new NioManagedBuffer(ByteBuffer.wrap(payload));
RpcRequest request = new RpcRequest(requestId, requestData);

// Handle RPC response
if (response instanceof RpcResponse) {
    RpcResponse rpcResp = (RpcResponse) response;
    if (rpcResp.requestId() == requestId) {
        ManagedBuffer responseData = rpcResp.message();
        processResponse(responseData);
    }
} else if (response instanceof RpcFailure) {
    RpcFailure failure = (RpcFailure) response;
    handleError(failure.errorString());
}

Error Handling

// Comprehensive error handling
public void handleMessage(Message message) {
    if (message instanceof ChunkFetchFailure) {
        ChunkFetchFailure failure = (ChunkFetchFailure) message;
        logger.error("Chunk fetch failed for {}: {}", 
                    failure.streamChunkId(), failure.errorString());
        retryChunkFetch(failure.streamChunkId());
        
    } else if (message instanceof RpcFailure) {
        RpcFailure failure = (RpcFailure) message;
        logger.error("RPC {} failed: {}", failure.requestId(), failure.errorString());
        handleRpcFailure(failure.requestId(), failure.errorString());
        
    } else if (message instanceof StreamFailure) {
        StreamFailure failure = (StreamFailure) message;
        logger.error("Stream {} failed: {}", failure.streamId(), failure.errorString());
        cleanupStream(failure.streamId());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common

docs

authentication.md

buffers.md

configuration.md

index.md

protocol.md

streaming.md

transport.md

tile.json