CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-10

Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication

Pending
Overview
Eval results
Files

server-operations.mddocs/

Server Operations

Server-side networking functionality for handling inbound connections, processing RPC requests, and managing data streams. The server API provides extensible handlers for custom application logic.

Capabilities

TransportServer

Main server class for accepting and handling inbound network connections.

/**
 * TransportServer handles inbound network connections and delegates
 * request processing to configured RPC handlers and stream managers.
 */
public class TransportServer implements Closeable {
  public TransportServer(
    TransportContext context,
    String hostToBind,
    int portToBind,
    RpcHandler appRpcHandler,
    List<TransportServerBootstrap> bootstraps
  );
  
  /**
   * Gets the port the server is bound to.
   * Useful when binding to port 0 to get an available port.
   * 
   * @return The actual port number the server is listening on
   */
  public int getPort();
  
  /** Closes the server and releases all resources */
  public void close();
}

RpcHandler

Abstract base class for handling RPC requests and managing application-specific logic.

/**
 * RpcHandler defines the interface for processing RPC requests on the server side.
 * Applications must implement this class to provide custom request handling logic.
 */
public abstract class RpcHandler {
  /**
   * Processes an RPC request and provides a response via callback.
   * This is the main method for handling client requests.
   * 
   * @param client The client that sent the request
   * @param message The request message as ByteBuffer
   * @param callback Callback to send the response back to the client
   */
  public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
  
  /**
   * Gets the stream manager for handling chunk and stream requests.
   * 
   * @return StreamManager instance for this handler
   */
  public abstract StreamManager getStreamManager();
  
  /**
   * Handles one-way messages (no response expected).
   * Default implementation does nothing.
   * 
   * @param client The client that sent the message
   * @param message The one-way message as ByteBuffer
   */
  public void receive(TransportClient client, ByteBuffer message) {
    // Default: no-op
  }
  
  /**
   * Called when a client connection is terminated.
   * Applications can override this for cleanup logic.
   * 
   * @param client The client whose connection was terminated
   */
  public void connectionTerminated(TransportClient client) {
    // Default: no-op
  }
  
  /**
   * Called when an exception occurs during request processing.
   * Applications can override this for custom error handling.
   * 
   * @param cause The exception that occurred
   * @param client The client associated with the exception
   */
  public void exceptionCaught(Throwable cause, TransportClient client) {
    // Default: no-op
  }
}

StreamManager

Abstract base class for managing data streams and chunk access.

/**
 * StreamManager handles stream-based data access for chunk fetching and streaming operations.
 * Applications implement this to provide access to their data.
 */
public abstract class StreamManager {
  /**
   * Gets a specific chunk from a stream.
   * This is called when clients request chunks via fetchChunk().
   * 
   * @param streamId The stream identifier
   * @param chunkIndex The index of the requested chunk
   * @return ManagedBuffer containing the chunk data
   * @throws IllegalArgumentException if stream or chunk doesn't exist
   */
  public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
  
  /**
   * Opens a stream by string identifier.
   * This is called when clients request streams via stream().
   * Default implementation throws UnsupportedOperationException.
   * 
   * @param streamId The string stream identifier
   * @return ManagedBuffer for the entire stream
   * @throws UnsupportedOperationException if not implemented
   */
  public ManagedBuffer openStream(String streamId) {
    throw new UnsupportedOperationException("Stream opening is not supported");
  }
  
  /**
   * Registers a channel for a specific stream.
   * Called when a stream is opened to track which channels are using it.
   * 
   * @param channel The Netty channel
   * @param streamId The stream identifier
   */
  public void registerChannel(Channel channel, long streamId) {
    // Default: no-op
  }
  
  /**
   * Called when a connection/channel is terminated.
   * Applications can override this for stream cleanup.
   * 
   * @param channel The terminated channel
   */
  public void connectionTerminated(Channel channel) {
    // Default: no-op
  }
  
  /**
   * Checks if a client is authorized to access a stream.
   * Applications can override this for access control.
   * 
   * @param client The client requesting access
   * @param streamId The stream identifier
   * @throws SecurityException if access is denied
   */
  public void checkAuthorization(TransportClient client, long streamId) {
    // Default: allow all access
  }
}

Built-in Implementations

NoOpRpcHandler

/**
 * No-operation RPC handler that provides basic functionality without custom logic.
 * Useful for testing or when only stream operations are needed.
 */
public class NoOpRpcHandler extends RpcHandler {
  public NoOpRpcHandler();
  
  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    // Responds with empty message
  }
  
  @Override
  public StreamManager getStreamManager() {
    // Returns a basic OneForOneStreamManager
  }
}

OneForOneStreamManager

/**
 * Stream manager that maintains a one-to-one mapping between streams and buffers.
 * Useful for simple use cases where each stream corresponds to a single data source.
 */
public class OneForOneStreamManager extends StreamManager {
  public OneForOneStreamManager();
  
  /**
   * Registers a single buffer as a stream.
   * 
   * @param appId Application identifier
   * @param buffer The buffer to serve as stream data
   * @return The assigned stream ID
   */
  public long registerStream(String appId, ManagedBuffer buffer);
  
  /**
   * Registers multiple buffers as a chunked stream.
   * 
   * @param appId Application identifier  
   * @param buffers Iterator of buffers to serve as stream chunks
   * @return The assigned stream ID
   */
  public long registerStream(String appId, Iterator<ManagedBuffer> buffers);
  
  @Override
  public ManagedBuffer getChunk(long streamId, int chunkIndex) {
    // Returns the appropriate chunk for the stream
  }
}

Usage Examples

Basic Server Setup

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.NoOpRpcHandler;

// Create a basic server with no-op handler
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());

// Create server on specific port
TransportServer server = context.createServer("localhost", 8080, new ArrayList<>());

System.out.println("Server started on port: " + server.getPort());

// Server is now accepting connections
// Don't forget to close when done
Runtime.getRuntime().addShutdownHook(new Thread(server::close));

Custom RPC Handler

import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;

public class CustomRpcHandler extends RpcHandler {
  private final StreamManager streamManager = new OneForOneStreamManager();
  
  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    try {
      // Parse the request
      String request = new String(message.array());
      System.out.println("Received RPC: " + request);
      
      // Process the request
      String response = processRequest(request);
      
      // Send response back
      ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
      callback.onSuccess(responseBuffer);
      
    } catch (Exception e) {
      System.err.println("Error processing RPC: " + e.getMessage());
      callback.onFailure(e);
    }
  }
  
  @Override
  public void receive(TransportClient client, ByteBuffer message) {
    // Handle one-way messages
    String notification = new String(message.array());
    System.out.println("Received notification: " + notification);
  }
  
  @Override
  public StreamManager getStreamManager() {
    return streamManager;
  }
  
  @Override
  public void connectionTerminated(TransportClient client) {
    System.out.println("Client disconnected: " + client.getSocketAddress());
  }
  
  @Override
  public void exceptionCaught(Throwable cause, TransportClient client) {
    System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());
  }
  
  private String processRequest(String request) {
    // Custom request processing logic
    if (request.startsWith("PING")) {
      return "PONG";
    } else if (request.startsWith("GET_TIME")) {
      return String.valueOf(System.currentTimeMillis());
    } else {
      return "UNKNOWN_COMMAND";
    }
  }
}

Custom Stream Manager

import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import java.io.File;
import java.util.concurrent.ConcurrentHashMap;

public class FileStreamManager extends StreamManager {
  private final ConcurrentHashMap<Long, StreamInfo> streams = new ConcurrentHashMap<>();
  private final String dataDirectory;
  
  public FileStreamManager(String dataDirectory) {
    this.dataDirectory = dataDirectory;
  }
  
  @Override
  public ManagedBuffer getChunk(long streamId, int chunkIndex) {
    StreamInfo streamInfo = streams.get(streamId);
    if (streamInfo == null) {
      throw new IllegalArgumentException("Unknown stream: " + streamId);
    }
    
    if (chunkIndex >= streamInfo.getChunkCount()) {
      throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
    }
    
    // Calculate chunk offset and size
    long chunkSize = streamInfo.getChunkSize();
    long offset = chunkIndex * chunkSize;
    long remainingSize = streamInfo.getTotalSize() - offset;
    long actualChunkSize = Math.min(chunkSize, remainingSize);
    
    File file = new File(dataDirectory, streamInfo.getFileName());
    return new FileSegmentManagedBuffer(transportConf, file, offset, actualChunkSize);
  }
  
  @Override
  public ManagedBuffer openStream(String streamId) {
    // Open entire file as stream
    File file = new File(dataDirectory, streamId);
    if (!file.exists()) {
      throw new IllegalArgumentException("File not found: " + streamId);
    }
    
    return new FileSegmentManagedBuffer(transportConf, file, 0, file.length());
  }
  
  @Override
  public void checkAuthorization(TransportClient client, long streamId) {
    StreamInfo streamInfo = streams.get(streamId);
    if (streamInfo == null) {
      throw new SecurityException("Stream not found: " + streamId);
    }
    
    // Custom authorization logic
    if (!isAuthorized(client, streamInfo)) {
      throw new SecurityException("Access denied to stream: " + streamId);
    }
  }
  
  public long registerFileStream(String fileName, long chunkSize) {
    File file = new File(dataDirectory, fileName);
    if (!file.exists()) {
      throw new IllegalArgumentException("File not found: " + fileName);
    }
    
    long streamId = generateStreamId();
    StreamInfo streamInfo = new StreamInfo(fileName, file.length(), chunkSize);
    streams.put(streamId, streamInfo);
    
    return streamId;
  }
  
  private boolean isAuthorized(TransportClient client, StreamInfo streamInfo) {
    // Implement custom authorization logic
    return true; // Allow all for this example
  }
  
  private long generateStreamId() {
    return System.currentTimeMillis() + (long)(Math.random() * 1000);
  }
  
  private static class StreamInfo {
    private final String fileName;
    private final long totalSize;
    private final long chunkSize;
    
    public StreamInfo(String fileName, long totalSize, long chunkSize) {
      this.fileName = fileName;
      this.totalSize = totalSize;
      this.chunkSize = chunkSize;
    }
    
    public String getFileName() { return fileName; }
    public long getTotalSize() { return totalSize; }
    public long getChunkSize() { return chunkSize; }
    public int getChunkCount() { return (int) Math.ceil((double) totalSize / chunkSize); }
  }
}

Server with Custom Stream Manager

public class FileServerExample {
  public static void main(String[] args) throws Exception {
    // Create custom handlers
    FileStreamManager streamManager = new FileStreamManager("/data/files");
    
    RpcHandler rpcHandler = new RpcHandler() {
      @Override
      public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        try {
          String request = new String(message.array());
          
          if (request.startsWith("REGISTER_FILE:")) {
            String fileName = request.substring("REGISTER_FILE:".length());
            long streamId = streamManager.registerFileStream(fileName, 64 * 1024); // 64KB chunks
            
            String response = "STREAM_ID:" + streamId;
            callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
          } else {
            callback.onFailure(new IllegalArgumentException("Unknown command: " + request));
          }
        } catch (Exception e) {
          callback.onFailure(e);
        }
      }
      
      @Override
      public StreamManager getStreamManager() {
        return streamManager;
      }
    };
    
    // Create and start server
    TransportContext context = new TransportContext(conf, rpcHandler);
    TransportServer server = context.createServer(8080, new ArrayList<>());
    
    System.out.println("File server started on port: " + server.getPort());
    
    // Keep server running
    Thread.currentThread().join();
  }
}

Error Handling in Handlers

@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
  try {
    // Process request
    String request = new String(message.array());
    
    // Validate request
    if (request.length() > MAX_REQUEST_SIZE) {
      callback.onFailure(new IllegalArgumentException("Request too large"));
      return;
    }
    
    // Process and respond
    String response = processRequest(request);
    callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
    
  } catch (IllegalArgumentException e) {
    // Client error - return error response
    callback.onFailure(e);
  } catch (Exception e) {
    // Server error - log and return generic error
    System.err.println("Internal server error: " + e.getMessage());
    callback.onFailure(new RuntimeException("Internal server error"));
  }
}

@Override
public void exceptionCaught(Throwable cause, TransportClient client) {
  System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());
  
  // Log for debugging
  if (cause instanceof IOException) {
    System.out.println("Network issue with client, connection may be lost");
  } else {
    System.err.println("Unexpected exception type: " + cause.getClass().getSimpleName());
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10

docs

buffer-management.md

client-operations.md

configuration-utilities.md

index.md

message-protocol.md

sasl-authentication.md

server-operations.md

transport-setup.md

tile.json