CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-10

Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication

Pending
Overview
Eval results
Files

message-protocol.mddocs/

Message Protocol

Type-safe message protocol for network communication in Apache Spark. The protocol defines specific message types for RPC requests/responses, chunk fetching, and streaming operations, ensuring reliable and structured communication between clients and servers.

Capabilities

Core Message Interface

Base interface for all network messages in the Spark transport protocol.

/**
 * Message represents a unit of communication in the Spark transport protocol.
 * All network messages implement this interface to provide type safety and structure.
 */
public interface Message {
  /**
   * Gets the message type identifier.
   * 
   * @return The Type enum value for this message
   */
  Type type();
  
  /**
   * Gets the message body as a ManagedBuffer.
   * 
   * @return ManagedBuffer containing the message payload
   */
  ManagedBuffer body();
  
  /**
   * Indicates whether the message body is included in the message frame.
   * 
   * @return true if body is in frame, false if sent separately
   */
  boolean isBodyInFrame();
  
  /**
   * Enumeration of all message types in the protocol.
   */
  enum Type {
    ChunkFetchRequest(0),
    ChunkFetchSuccess(1), 
    ChunkFetchFailure(2),
    RpcRequest(3),
    RpcResponse(4),
    RpcFailure(5),
    StreamRequest(6),
    StreamResponse(7), 
    StreamFailure(8),
    OneWayMessage(9),
    User(-1);
    
    private final byte id;
    
    Type(int id) {
      this.id = (byte) id;
    }
    
    public byte id() { return id; }
  }
}

Message Category Interfaces

Marker interfaces for categorizing messages as requests or responses.

/**
 * Marker interface for request messages.
 * All messages that initiate communication implement this interface.
 */
public interface RequestMessage extends Message {}

/**
 * Marker interface for response messages.  
 * All messages that respond to requests implement this interface.
 */
public interface ResponseMessage extends Message {}

Encoding Interface

Interface for objects that can be encoded to ByteBuf for network transmission.

/**
 * Interface for objects that can be encoded to ByteBuf.
 * Used by the message protocol for efficient serialization.
 */
public interface Encodable {
  /**
   * Gets the encoded length of this object in bytes.
   * 
   * @return The number of bytes needed to encode this object
   */
  int encodedLength();
  
  /**
   * Encodes this object to the provided ByteBuf.
   * 
   * @param buf The ByteBuf to encode to
   */
  void encode(ByteBuf buf);
}

RPC Messages

Messages for Remote Procedure Call operations.

/**
 * RPC request message containing a request ID and payload.
 */
public class RpcRequest extends AbstractMessage implements RequestMessage {
  /** Unique identifier for this RPC request */
  public final long requestId;
  
  /**
   * Creates an RPC request message.
   * 
   * @param requestId Unique identifier for the request
   * @param message The request payload as ManagedBuffer
   */
  public RpcRequest(long requestId, ManagedBuffer message);
  
  @Override
  public Type type() { return Type.RpcRequest; }
  
  @Override
  public int encodedLength() { return 8; } // requestId is 8 bytes
  
  @Override
  public void encode(ByteBuf buf) {
    buf.writeLong(requestId);
  }
  
  public static RpcRequest decode(ByteBuf buf) {
    long requestId = buf.readLong();
    return new RpcRequest(requestId, null); // Body handled separately
  }
}

/**
 * RPC response message containing the response to a previous request.
 */
public class RpcResponse extends AbstractResponseMessage {
  /** Request ID this response corresponds to */
  public final long requestId;
  
  /**
   * Creates an RPC response message.
   * 
   * @param requestId The request ID this response corresponds to
   * @param message The response payload as ManagedBuffer
   */
  public RpcResponse(long requestId, ManagedBuffer message);
  
  @Override
  public Type type() { return Type.RpcResponse; }
}

/**
 * RPC failure message indicating an RPC request failed.
 */
public class RpcFailure extends AbstractResponseMessage {
  /** Request ID this failure corresponds to */  
  public final long requestId;
  
  /** Error message describing the failure */
  public final String errorString;
  
  /**
   * Creates an RPC failure message.
   * 
   * @param requestId The request ID that failed
   * @param errorString Description of the error
   */
  public RpcFailure(long requestId, String errorString);
  
  @Override
  public Type type() { return Type.RpcFailure; }
}

Chunk Fetch Messages

Messages for fetching individual chunks of data from streams.

/**
 * Request to fetch a specific chunk from a stream.
 */
public class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
  /** Identifier for the stream and chunk */
  public final StreamChunkId streamChunkId;
  
  /**
   * Creates a chunk fetch request.
   * 
   * @param streamChunkId Identifies the stream and chunk to fetch
   */
  public ChunkFetchRequest(StreamChunkId streamChunkId);
  
  @Override
  public Type type() { return Type.ChunkFetchRequest; }
  
  @Override
  public int encodedLength() { return 12; } // streamId (8) + chunkIndex (4)
  
  @Override
  public void encode(ByteBuf buf) {
    buf.writeLong(streamChunkId.streamId);
    buf.writeInt(streamChunkId.chunkIndex);
  }
  
  public static ChunkFetchRequest decode(ByteBuf buf) {
    long streamId = buf.readLong();
    int chunkIndex = buf.readInt();
    return new ChunkFetchRequest(new StreamChunkId(streamId, chunkIndex));
  }
}

/**
 * Successful response to a chunk fetch request containing the chunk data.
 */
public class ChunkFetchSuccess extends AbstractResponseMessage {
  /** Identifier for the stream and chunk */
  public final StreamChunkId streamChunkId;
  
  /**
   * Creates a successful chunk fetch response.
   * 
   * @param streamChunkId Identifies the stream and chunk
   * @param buffer The chunk data
   */
  public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
  
  @Override
  public Type type() { return Type.ChunkFetchSuccess; }
}

/**
 * Failure response to a chunk fetch request indicating the fetch failed.
 */
public class ChunkFetchFailure extends AbstractResponseMessage {
  /** Identifier for the stream and chunk that failed */
  public final StreamChunkId streamChunkId;
  
  /** Error message describing the failure */
  public final String errorString;
  
  /**
   * Creates a chunk fetch failure response.
   * 
   * @param streamChunkId Identifies the stream and chunk that failed  
   * @param errorString Description of the error
   */
  public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
  
  @Override
  public Type type() { return Type.ChunkFetchFailure; }
}

Stream Messages

Messages for stream-based data transfer operations.

/**
 * Request to open a stream for data transfer.
 */
public class StreamRequest extends AbstractMessage implements RequestMessage {
  /** String identifier for the stream to open */
  public final String streamId;
  
  /**
   * Creates a stream request.
   * 
   * @param streamId String identifier for the stream
   */
  public StreamRequest(String streamId);
  
  @Override
  public Type type() { return Type.StreamRequest; }
}

/**
 * Successful response to a stream request indicating the stream is ready.
 */
public class StreamResponse extends AbstractResponseMessage {
  /** String identifier for the opened stream */
  public final String streamId;
  
  /** Total number of bytes in the stream */
  public final long byteCount;
  
  /**
   * Creates a stream response.
   * 
   * @param streamId String identifier for the stream
   * @param byteCount Total number of bytes in the stream
   */
  public StreamResponse(String streamId, long byteCount);
  
  @Override
  public Type type() { return Type.StreamResponse; }
}

/**
 * Failure response to a stream request indicating the stream could not be opened.
 */
public class StreamFailure extends AbstractResponseMessage {
  /** String identifier for the stream that failed */
  public final String streamId;
  
  /** Error message describing the failure */
  public final String errorString;
  
  /**
   * Creates a stream failure response.
   * 
   * @param streamId String identifier for the stream that failed
   * @param errorString Description of the error
   */
  public StreamFailure(String streamId, String errorString);
  
  @Override
  public Type type() { return Type.StreamFailure; }
}

One-Way Messages

Messages that don't expect a response.

/**
 * One-way message that doesn't expect a response.
 * Used for notifications, heartbeats, or fire-and-forget operations.
 */
public class OneWayMessage extends AbstractMessage implements RequestMessage {
  /**
   * Creates a one-way message.
   * 
   * @param body The message payload as ManagedBuffer
   */
  public OneWayMessage(ManagedBuffer body);
  
  @Override
  public Type type() { return Type.OneWayMessage; }
  
  @Override
  public boolean isBodyInFrame() { return false; }
}

Stream and Chunk Identifiers

Utility classes for identifying streams and chunks.

/**
 * Identifier for a specific chunk within a stream.
 * Combines stream ID and chunk index for unique chunk identification.
 */
public class StreamChunkId {
  /** Numeric identifier for the stream */
  public final long streamId;
  
  /** Index of the chunk within the stream (0-based) */
  public final int chunkIndex;
  
  /**
   * Creates a stream chunk identifier.
   * 
   * @param streamId Numeric identifier for the stream
   * @param chunkIndex Index of the chunk within the stream
   */
  public StreamChunkId(long streamId, int chunkIndex);
  
  @Override
  public String toString() {
    return "StreamChunkId{streamId=" + streamId + ", chunkIndex=" + chunkIndex + "}";
  }
  
  @Override
  public boolean equals(Object other) {
    if (this == other) return true;
    if (other == null || getClass() != other.getClass()) return false;
    
    StreamChunkId that = (StreamChunkId) other;
    return streamId == that.streamId && chunkIndex == that.chunkIndex;
  }
  
  @Override
  public int hashCode() {
    return Objects.hash(streamId, chunkIndex);
  }
}

Message Encoding and Decoding

Utilities for message serialization and deserialization.

/**
 * Utility class providing encoding and decoding functions for protocol messages.
 */
public class Encoders {
  /**
   * Decodes a message from a ByteBuf based on message type.
   * 
   * @param msgType The message type to decode
   * @param in ByteBuf containing the encoded message
   * @return Decoded Message instance
   */
  public static Message decode(Message.Type msgType, ByteBuf in);
  
  /**
   * Encodes a message to a ByteBuf.
   * 
   * @param msg The message to encode
   * @param out ByteBuf to write the encoded message to
   */
  public static void encode(Message msg, ByteBuf out);
}

/**
 * Netty decoder for converting ByteBuf to Message objects.
 */
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out);
}

/**
 * Netty encoder for converting Message objects to ByteBuf.
 */
public class MessageEncoder extends MessageToByteEncoder<Message> {
  @Override
  protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out);
}

Usage Examples

Creating and Sending Messages

import org.apache.spark.network.protocol.*;
import org.apache.spark.network.buffer.NioManagedBuffer;
import java.nio.ByteBuffer;

// Create an RPC request
ByteBuffer requestData = ByteBuffer.wrap("Hello Server".getBytes());
ManagedBuffer requestBuffer = new NioManagedBuffer(requestData);
RpcRequest rpcRequest = new RpcRequest(12345L, requestBuffer);

// Create a chunk fetch request
StreamChunkId chunkId = new StreamChunkId(1001L, 5);
ChunkFetchRequest chunkRequest = new ChunkFetchRequest(chunkId);

// Create a stream request
StreamRequest streamRequest = new StreamRequest("data-stream-001");

// Create a one-way message
ByteBuffer notificationData = ByteBuffer.wrap("Status update".getBytes());
ManagedBuffer notificationBuffer = new NioManagedBuffer(notificationData);
OneWayMessage notification = new OneWayMessage(notificationBuffer);

Handling Responses

// Handle different response types
public void handleResponse(Message response) {
  switch (response.type()) {
    case RpcResponse:
      RpcResponse rpcResp = (RpcResponse) response;
      System.out.println("RPC response for request: " + rpcResp.requestId);
      processRpcResponse(rpcResp.body());
      break;
      
    case RpcFailure:
      RpcFailure rpcFail = (RpcFailure) response;
      System.err.println("RPC " + rpcFail.requestId + " failed: " + rpcFail.errorString);
      break;
      
    case ChunkFetchSuccess:
      ChunkFetchSuccess chunkSuccess = (ChunkFetchSuccess) response;
      System.out.println("Received chunk: " + chunkSuccess.streamChunkId);
      processChunkData(chunkSuccess.body());
      break;
      
    case ChunkFetchFailure:
      ChunkFetchFailure chunkFail = (ChunkFetchFailure) response;
      System.err.println("Chunk fetch failed: " + chunkFail.streamChunkId + 
                        " - " + chunkFail.errorString);
      break;
      
    case StreamResponse:
      StreamResponse streamResp = (StreamResponse) response;
      System.out.println("Stream " + streamResp.streamId + 
                        " opened with " + streamResp.byteCount + " bytes");
      break;
      
    case StreamFailure:
      StreamFailure streamFail = (StreamFailure) response;
      System.err.println("Stream " + streamFail.streamId + 
                        " failed: " + streamFail.errorString);
      break;
      
    default:
      System.err.println("Unknown response type: " + response.type());
  }
}

Custom Message Processing

public class MessageProcessor {
  public void processMessage(Message message) {
    // Check if message has body
    if (message.body() != null && message.body().size() > 0) {
      try {
        ByteBuffer bodyData = message.body().nioByteBuffer();
        
        // Process based on message type
        if (message instanceof RpcRequest) {
          processRpcRequest((RpcRequest) message, bodyData);
        } else if (message instanceof ChunkFetchRequest) {
          processChunkRequest((ChunkFetchRequest) message);
        } else if (message instanceof StreamRequest) {
          processStreamRequest((StreamRequest) message);
        } else if (message instanceof OneWayMessage) {
          processOneWayMessage(bodyData);
        }
        
      } catch (Exception e) {
        System.err.println("Error processing message: " + e.getMessage());
      } finally {
        // Important: Release buffer when done
        if (message.body() != null) {
          message.body().release();
        }
      }
    }
  }
  
  private void processRpcRequest(RpcRequest request, ByteBuffer body) {
    System.out.println("Processing RPC request " + request.requestId);
    
    // Extract request data
    byte[] requestBytes = new byte[body.remaining()];
    body.get(requestBytes);
    String requestString = new String(requestBytes);
    
    // Process request and generate response
    // (Response would be sent via callback in actual implementation)
  }
  
  private void processChunkRequest(ChunkFetchRequest request) {
    System.out.println("Processing chunk request: " + request.streamChunkId);
    
    // Look up chunk data and prepare response
    // (Would use StreamManager in actual implementation)
  }
  
  private void processStreamRequest(StreamRequest request) {
    System.out.println("Processing stream request: " + request.streamId);
    
    // Open stream and prepare response
    // (Would use StreamManager in actual implementation)  
  }
  
  private void processOneWayMessage(ByteBuffer body) {
    byte[] messageBytes = new byte[body.remaining()];
    body.get(messageBytes);
    String message = new String(messageBytes);
    
    System.out.println("Received one-way message: " + message);
    // Process notification without sending response
  }
}

Message Validation

public class MessageValidator {
  public static boolean validateMessage(Message message) {
    if (message == null) {
      return false;
    }
    
    // Validate message type
    if (message.type() == null) {
      return false;
    }
    
    // Type-specific validation
    switch (message.type()) {
      case RpcRequest:
        return validateRpcRequest((RpcRequest) message);
      case ChunkFetchRequest:
        return validateChunkFetchRequest((ChunkFetchRequest) message);
      case StreamRequest:
        return validateStreamRequest((StreamRequest) message);
      default:
        return true; // Other types are valid by default
    }
  }
  
  private static boolean validateRpcRequest(RpcRequest request) {
    // Validate request ID is positive
    if (request.requestId <= 0) {
      return false;
    }
    
    // Validate body exists and has reasonable size
    if (request.body() == null || request.body().size() > MAX_RPC_SIZE) {
      return false;
    }
    
    return true;
  }
  
  private static boolean validateChunkFetchRequest(ChunkFetchRequest request) {
    if (request.streamChunkId == null) {
      return false;
    }
    
    // Validate stream ID and chunk index
    return request.streamChunkId.streamId > 0 && 
           request.streamChunkId.chunkIndex >= 0;
  }
  
  private static boolean validateStreamRequest(StreamRequest request) {
    // Validate stream ID is not null or empty
    return request.streamId != null && !request.streamId.trim().isEmpty();
  }
  
  private static final long MAX_RPC_SIZE = 16 * 1024 * 1024; // 16MB
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10

docs

buffer-management.md

client-operations.md

configuration-utilities.md

index.md

message-protocol.md

sasl-authentication.md

server-operations.md

transport-setup.md

tile.json