Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication
—
Server-side networking functionality for handling inbound connections, processing RPC requests, and managing data streams. The server API provides extensible handlers for custom application logic.
Main server class for accepting and handling inbound network connections.
/**
* TransportServer handles inbound network connections and delegates
* request processing to configured RPC handlers and stream managers.
*/
public class TransportServer implements Closeable {
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps
);
/**
* Gets the port the server is bound to.
* Useful when binding to port 0 to get an available port.
*
* @return The actual port number the server is listening on
*/
public int getPort();
/** Closes the server and releases all resources */
public void close();
}Abstract base class for handling RPC requests and managing application-specific logic.
/**
* RpcHandler defines the interface for processing RPC requests on the server side.
* Applications must implement this class to provide custom request handling logic.
*/
public abstract class RpcHandler {
/**
* Processes an RPC request and provides a response via callback.
* This is the main method for handling client requests.
*
* @param client The client that sent the request
* @param message The request message as ByteBuffer
* @param callback Callback to send the response back to the client
*/
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
/**
* Gets the stream manager for handling chunk and stream requests.
*
* @return StreamManager instance for this handler
*/
public abstract StreamManager getStreamManager();
/**
* Handles one-way messages (no response expected).
* Default implementation does nothing.
*
* @param client The client that sent the message
* @param message The one-way message as ByteBuffer
*/
public void receive(TransportClient client, ByteBuffer message) {
// Default: no-op
}
/**
* Called when a client connection is terminated.
* Applications can override this for cleanup logic.
*
* @param client The client whose connection was terminated
*/
public void connectionTerminated(TransportClient client) {
// Default: no-op
}
/**
* Called when an exception occurs during request processing.
* Applications can override this for custom error handling.
*
* @param cause The exception that occurred
* @param client The client associated with the exception
*/
public void exceptionCaught(Throwable cause, TransportClient client) {
// Default: no-op
}
}Abstract base class for managing data streams and chunk access.
/**
* StreamManager handles stream-based data access for chunk fetching and streaming operations.
* Applications implement this to provide access to their data.
*/
public abstract class StreamManager {
/**
* Gets a specific chunk from a stream.
* This is called when clients request chunks via fetchChunk().
*
* @param streamId The stream identifier
* @param chunkIndex The index of the requested chunk
* @return ManagedBuffer containing the chunk data
* @throws IllegalArgumentException if stream or chunk doesn't exist
*/
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
/**
* Opens a stream by string identifier.
* This is called when clients request streams via stream().
* Default implementation throws UnsupportedOperationException.
*
* @param streamId The string stream identifier
* @return ManagedBuffer for the entire stream
* @throws UnsupportedOperationException if not implemented
*/
public ManagedBuffer openStream(String streamId) {
throw new UnsupportedOperationException("Stream opening is not supported");
}
/**
* Registers a channel for a specific stream.
* Called when a stream is opened to track which channels are using it.
*
* @param channel The Netty channel
* @param streamId The stream identifier
*/
public void registerChannel(Channel channel, long streamId) {
// Default: no-op
}
/**
* Called when a connection/channel is terminated.
* Applications can override this for stream cleanup.
*
* @param channel The terminated channel
*/
public void connectionTerminated(Channel channel) {
// Default: no-op
}
/**
* Checks if a client is authorized to access a stream.
* Applications can override this for access control.
*
* @param client The client requesting access
* @param streamId The stream identifier
* @throws SecurityException if access is denied
*/
public void checkAuthorization(TransportClient client, long streamId) {
// Default: allow all access
}
}/**
* No-operation RPC handler that provides basic functionality without custom logic.
* Useful for testing or when only stream operations are needed.
*/
public class NoOpRpcHandler extends RpcHandler {
public NoOpRpcHandler();
@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
// Responds with empty message
}
@Override
public StreamManager getStreamManager() {
// Returns a basic OneForOneStreamManager
}
}/**
* Stream manager that maintains a one-to-one mapping between streams and buffers.
* Useful for simple use cases where each stream corresponds to a single data source.
*/
public class OneForOneStreamManager extends StreamManager {
public OneForOneStreamManager();
/**
* Registers a single buffer as a stream.
*
* @param appId Application identifier
* @param buffer The buffer to serve as stream data
* @return The assigned stream ID
*/
public long registerStream(String appId, ManagedBuffer buffer);
/**
* Registers multiple buffers as a chunked stream.
*
* @param appId Application identifier
* @param buffers Iterator of buffers to serve as stream chunks
* @return The assigned stream ID
*/
public long registerStream(String appId, Iterator<ManagedBuffer> buffers);
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
// Returns the appropriate chunk for the stream
}
}import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.NoOpRpcHandler;
// Create a basic server with no-op handler
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
// Create server on specific port
TransportServer server = context.createServer("localhost", 8080, new ArrayList<>());
System.out.println("Server started on port: " + server.getPort());
// Server is now accepting connections
// Don't forget to close when done
Runtime.getRuntime().addShutdownHook(new Thread(server::close));import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;
public class CustomRpcHandler extends RpcHandler {
private final StreamManager streamManager = new OneForOneStreamManager();
@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
try {
// Parse the request
String request = new String(message.array());
System.out.println("Received RPC: " + request);
// Process the request
String response = processRequest(request);
// Send response back
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
callback.onSuccess(responseBuffer);
} catch (Exception e) {
System.err.println("Error processing RPC: " + e.getMessage());
callback.onFailure(e);
}
}
@Override
public void receive(TransportClient client, ByteBuffer message) {
// Handle one-way messages
String notification = new String(message.array());
System.out.println("Received notification: " + notification);
}
@Override
public StreamManager getStreamManager() {
return streamManager;
}
@Override
public void connectionTerminated(TransportClient client) {
System.out.println("Client disconnected: " + client.getSocketAddress());
}
@Override
public void exceptionCaught(Throwable cause, TransportClient client) {
System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());
}
private String processRequest(String request) {
// Custom request processing logic
if (request.startsWith("PING")) {
return "PONG";
} else if (request.startsWith("GET_TIME")) {
return String.valueOf(System.currentTimeMillis());
} else {
return "UNKNOWN_COMMAND";
}
}
}import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import java.io.File;
import java.util.concurrent.ConcurrentHashMap;
public class FileStreamManager extends StreamManager {
private final ConcurrentHashMap<Long, StreamInfo> streams = new ConcurrentHashMap<>();
private final String dataDirectory;
public FileStreamManager(String dataDirectory) {
this.dataDirectory = dataDirectory;
}
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
StreamInfo streamInfo = streams.get(streamId);
if (streamInfo == null) {
throw new IllegalArgumentException("Unknown stream: " + streamId);
}
if (chunkIndex >= streamInfo.getChunkCount()) {
throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
}
// Calculate chunk offset and size
long chunkSize = streamInfo.getChunkSize();
long offset = chunkIndex * chunkSize;
long remainingSize = streamInfo.getTotalSize() - offset;
long actualChunkSize = Math.min(chunkSize, remainingSize);
File file = new File(dataDirectory, streamInfo.getFileName());
return new FileSegmentManagedBuffer(transportConf, file, offset, actualChunkSize);
}
@Override
public ManagedBuffer openStream(String streamId) {
// Open entire file as stream
File file = new File(dataDirectory, streamId);
if (!file.exists()) {
throw new IllegalArgumentException("File not found: " + streamId);
}
return new FileSegmentManagedBuffer(transportConf, file, 0, file.length());
}
@Override
public void checkAuthorization(TransportClient client, long streamId) {
StreamInfo streamInfo = streams.get(streamId);
if (streamInfo == null) {
throw new SecurityException("Stream not found: " + streamId);
}
// Custom authorization logic
if (!isAuthorized(client, streamInfo)) {
throw new SecurityException("Access denied to stream: " + streamId);
}
}
public long registerFileStream(String fileName, long chunkSize) {
File file = new File(dataDirectory, fileName);
if (!file.exists()) {
throw new IllegalArgumentException("File not found: " + fileName);
}
long streamId = generateStreamId();
StreamInfo streamInfo = new StreamInfo(fileName, file.length(), chunkSize);
streams.put(streamId, streamInfo);
return streamId;
}
private boolean isAuthorized(TransportClient client, StreamInfo streamInfo) {
// Implement custom authorization logic
return true; // Allow all for this example
}
private long generateStreamId() {
return System.currentTimeMillis() + (long)(Math.random() * 1000);
}
private static class StreamInfo {
private final String fileName;
private final long totalSize;
private final long chunkSize;
public StreamInfo(String fileName, long totalSize, long chunkSize) {
this.fileName = fileName;
this.totalSize = totalSize;
this.chunkSize = chunkSize;
}
public String getFileName() { return fileName; }
public long getTotalSize() { return totalSize; }
public long getChunkSize() { return chunkSize; }
public int getChunkCount() { return (int) Math.ceil((double) totalSize / chunkSize); }
}
}public class FileServerExample {
public static void main(String[] args) throws Exception {
// Create custom handlers
FileStreamManager streamManager = new FileStreamManager("/data/files");
RpcHandler rpcHandler = new RpcHandler() {
@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
try {
String request = new String(message.array());
if (request.startsWith("REGISTER_FILE:")) {
String fileName = request.substring("REGISTER_FILE:".length());
long streamId = streamManager.registerFileStream(fileName, 64 * 1024); // 64KB chunks
String response = "STREAM_ID:" + streamId;
callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
} else {
callback.onFailure(new IllegalArgumentException("Unknown command: " + request));
}
} catch (Exception e) {
callback.onFailure(e);
}
}
@Override
public StreamManager getStreamManager() {
return streamManager;
}
};
// Create and start server
TransportContext context = new TransportContext(conf, rpcHandler);
TransportServer server = context.createServer(8080, new ArrayList<>());
System.out.println("File server started on port: " + server.getPort());
// Keep server running
Thread.currentThread().join();
}
}@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
try {
// Process request
String request = new String(message.array());
// Validate request
if (request.length() > MAX_REQUEST_SIZE) {
callback.onFailure(new IllegalArgumentException("Request too large"));
return;
}
// Process and respond
String response = processRequest(request);
callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
} catch (IllegalArgumentException e) {
// Client error - return error response
callback.onFailure(e);
} catch (Exception e) {
// Server error - log and return generic error
System.err.println("Internal server error: " + e.getMessage());
callback.onFailure(new RuntimeException("Internal server error"));
}
}
@Override
public void exceptionCaught(Throwable cause, TransportClient client) {
System.err.println("Exception from client " + client.getSocketAddress() + ": " + cause.getMessage());
// Log for debugging
if (cause instanceof IOException) {
System.out.println("Network issue with client, connection may be lost");
} else {
System.err.println("Unexpected exception type: " + cause.getClass().getSimpleName());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10