CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Pending
Overview
Eval results
Files

server-operations.mddocs/

Server Operations

The server operations API provides the foundation for handling client connections, processing RPC requests, and managing data streams in Apache Spark's networking layer. The TransportServer class serves as the main server component, while RpcHandler defines the interface for custom request processing logic.

Capabilities

TransportServer

Main server class for handling client connections and network communication.

/**
 * Create a transport server with specified configuration
 * @param context - TransportContext for server configuration
 * @param hostToBind - Host address to bind the server to
 * @param portToBind - Port number to bind the server to (0 for system-assigned)
 * @param appRpcHandler - RPC handler for processing application messages
 * @param bootstraps - List of server bootstrap configurations
 */
public TransportServer(TransportContext context, String hostToBind, int portToBind, RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps);

Server Information

Methods for retrieving server status and configuration information.

/**
 * Get the port number the server is bound to
 * @return int port number (useful when server was created with port 0)
 */
public int getPort();

/**
 * Get comprehensive metrics for the server
 * @return MetricSet containing all server metrics including connection counts and performance data
 */
public MetricSet getAllMetrics();

/**
 * Get counter for registered connections
 * @return Counter tracking the number of active registered connections
 */
public Counter getRegisteredConnections();

Resource Management

Proper server shutdown and resource cleanup.

/**
 * Close the server and release all associated resources
 * This includes closing all client connections and shutting down the server socket
 */
public void close();

RPC Handler

RpcHandler (Abstract Class)

Abstract base class for handling RPC messages and defining server behavior.

/**
 * Process an RPC message from a client with callback response
 * @param client - TransportClient representing the sender
 * @param message - ByteBuffer containing the RPC message data
 * @param callback - RpcResponseCallback for sending the response
 */
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

/**
 * Get the stream manager for handling streaming operations
 * @return StreamManager instance for managing data streams
 */
public abstract StreamManager getStreamManager();

RPC Handler Lifecycle Methods

Methods for handling client connection lifecycle events.

/**
 * Called when a client connection becomes active
 * @param client - TransportClient that became active
 */
public void channelActive(TransportClient client);

/**
 * Called when a client connection becomes inactive
 * @param client - TransportClient that became inactive
 */
public void channelInactive(TransportClient client);

/**
 * Called when an exception occurs on a client connection
 * @param cause - Throwable representing the exception
 * @param client - TransportClient where the exception occurred
 */
public void exceptionCaught(Throwable cause, TransportClient client);

Stream Processing

Methods for handling streaming operations and upload streams.

/**
 * Handle incoming stream data with header and callback
 * @param client - TransportClient sending the stream
 * @param messageHeader - ByteBuffer containing stream metadata
 * @param callback - RpcResponseCallback for stream response
 * @return StreamCallbackWithID for handling the streaming data, or null if not supported
 */
public StreamCallbackWithID receiveStream(TransportClient client, ByteBuffer messageHeader, RpcResponseCallback callback);

/**
 * Handle one-way messages (no response expected)
 * @param client - TransportClient sending the message
 * @param message - ByteBuffer containing the message data
 */
public void receive(TransportClient client, ByteBuffer message);

Merged Block Support

Support for Spark's merged block functionality used in shuffle optimization.

/**
 * Get the handler for merged block metadata requests
 * @return MergedBlockMetaReqHandler for processing merged block requests, or null if not supported
 */
public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler();

Stream Management

StreamManager (Abstract Class)

Abstract class for managing streams that can be read by TransportClients.

/**
 * Get a specific chunk from a stream
 * @param streamId - Identifier of the stream
 * @param chunkIndex - Index of the chunk within the stream
 * @return ManagedBuffer containing the chunk data
 */
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

/**
 * Open a named stream for reading
 * @param streamId - Identifier of the stream to open
 * @return ManagedBuffer containing the stream data
 */
public abstract ManagedBuffer openStream(String streamId);

Built-in Implementations

NoOpRpcHandler

No-operation RPC handler for testing and basic server setups.

public class NoOpRpcHandler extends RpcHandler {
    public NoOpRpcHandler();
    
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
    
    @Override
    public StreamManager getStreamManager();
}

OneForOneStreamManager

Simple stream manager implementation that provides one-to-one mapping between stream IDs and buffers.

public class OneForOneStreamManager extends StreamManager {
    /**
     * Create a stream manager with a list of managed buffers
     * @param buffers - List of ManagedBuffer instances to serve as streams
     */
    public OneForOneStreamManager(List<ManagedBuffer> buffers);
    
    @Override
    public ManagedBuffer getChunk(long streamId, int chunkIndex);
    
    @Override
    public ManagedBuffer openStream(String streamId);
    
    /**
     * Register a new stream and return its ID
     * @param buffer - ManagedBuffer to register as a stream
     * @return long stream ID for the registered buffer
     */
    public long registerStream(ManagedBuffer buffer);
}

Server Bootstrap

TransportServerBootstrap

Interface for customizing server initialization, commonly used for authentication and encryption setup.

public interface TransportServerBootstrap {
    /**
     * Perform bootstrap operations on a new server channel
     * @param channel - Netty Channel for the server connection
     * @param rpcHandler - RpcHandler that will process requests on this channel
     * @return RpcHandler (possibly wrapped or modified) to use for this channel
     */
    RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

Exception Classes

BlockPushNonFatalFailure

Exception for non-fatal failures in block push operations.

public class BlockPushNonFatalFailure extends RuntimeException {
    /**
     * Create exception with error message
     * @param message - Description of the non-fatal failure
     */
    public BlockPushNonFatalFailure(String message);
    
    /**
     * Create exception with error message and cause
     * @param message - Description of the non-fatal failure
     * @param cause - Underlying cause of the failure
     */
    public BlockPushNonFatalFailure(String message, Throwable cause);
}

Usage Examples

Basic Server Setup

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.RpcResponseCallback;

// Create custom RPC handler
RpcHandler customHandler = new RpcHandler() {
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        // Process the RPC message
        String request = new String(message.array());
        System.out.println("Received RPC: " + request);
        
        // Send response
        String response = "Processed: " + request;
        ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
        callback.onSuccess(responseBuffer);
    }
    
    @Override
    public StreamManager getStreamManager() {
        // Return stream manager for handling streaming operations
        return new OneForOneStreamManager(Arrays.asList());
    }
};

// Create and start server
TransportContext context = new TransportContext(conf, customHandler);
TransportServer server = context.createServer(8080, Collections.emptyList());

System.out.println("Server started on port: " + server.getPort());

// Server automatically handles client connections
// Cleanup when done
server.close();
context.close();

Server with Stream Management

import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.buffer.NioManagedBuffer;

// Create stream manager with some data
List<ManagedBuffer> streamBuffers = Arrays.asList(
    new NioManagedBuffer(ByteBuffer.wrap("Stream data 1".getBytes())),
    new NioManagedBuffer(ByteBuffer.wrap("Stream data 2".getBytes())),
    new NioManagedBuffer(ByteBuffer.wrap("Stream data 3".getBytes()))
);

OneForOneStreamManager streamManager = new OneForOneStreamManager(streamBuffers);

// Custom RPC handler with streaming support
RpcHandler streamingHandler = new RpcHandler() {
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        // Handle RPC requests
        String request = new String(message.array());
        if (request.startsWith("GET_STREAM:")) {
            // Register a new stream and return the stream ID
            String data = request.substring(11);
            ManagedBuffer buffer = new NioManagedBuffer(ByteBuffer.wrap(data.getBytes()));
            long streamId = streamManager.registerStream(buffer);
            
            String response = "STREAM_ID:" + streamId;
            callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
        } else {
            callback.onSuccess(ByteBuffer.wrap("OK".getBytes()));
        }
    }
    
    @Override
    public StreamManager getStreamManager() {
        return streamManager;
    }
    
    @Override
    public void channelActive(TransportClient client) {
        System.out.println("Client connected: " + client.getSocketAddress());
    }
    
    @Override
    public void channelInactive(TransportClient client) {
        System.out.println("Client disconnected: " + client.getSocketAddress());
    }
};

// Create server with streaming support
TransportServer server = context.createServer(9999, Collections.emptyList());

Server with Authentication Bootstrap

import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;

// Create secret key holder for authentication
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
    @Override
    public String getSaslUser(String appId) {
        return "spark-user";
    }
    
    @Override
    public String getSecretKey(String appId) {
        return "my-secret-key";
    }
};

// Create server with SASL authentication
List<TransportServerBootstrap> bootstraps = Arrays.asList(
    new SaslServerBootstrap(conf, secretKeyHolder)
);

TransportServer authenticatedServer = context.createServer(8443, bootstraps);
System.out.println("Authenticated server started on port: " + authenticatedServer.getPort());

Advanced RPC Handler with Error Handling

RpcHandler robustHandler = new RpcHandler() {
    @Override
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        try {
            // Process message
            String request = new String(message.array());
            System.out.println("Processing request from " + client.getSocketAddress() + ": " + request);
            
            // Simulate processing
            if (request.equals("ERROR")) {
                throw new RuntimeException("Simulated processing error");
            }
            
            // Send successful response
            String response = "Processed at " + System.currentTimeMillis();
            callback.onSuccess(ByteBuffer.wrap(response.getBytes()));
            
        } catch (Exception e) {
            System.err.println("Error processing RPC: " + e.getMessage());
            callback.onFailure(e);
        }
    }
    
    @Override
    public StreamManager getStreamManager() {
        return new OneForOneStreamManager(Collections.emptyList());
    }
    
    @Override
    public void exceptionCaught(Throwable cause, TransportClient client) {
        System.err.println("Exception on client " + client.getSocketAddress() + ": " + cause.getMessage());
        // Could implement custom error handling logic here
    }
};

Server Metrics Monitoring

import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Counter;

// Monitor server metrics
TransportServer server = context.createServer();

// Get metrics
MetricSet metrics = server.getAllMetrics();
Counter connections = server.getRegisteredConnections();

// Print metrics periodically
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        System.out.println("Active connections: " + connections.getCount());
        // Access other metrics from MetricSet as needed
    }
}, 0, 5000); // Every 5 seconds

// Stop monitoring when server shuts down
server.close();
timer.cancel();

Types

Related Interfaces and Classes

public interface MergedBlockMetaReqHandler {
    void receiveMergedBlockMetaReq(TransportClient client, MergedBlockMetaRequest request, RpcResponseCallback callback);
}

public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
    public TransportClient getClient();
}

public abstract class AbstractAuthRpcHandler extends RpcHandler {
    // Base class for authentication-aware RPC handlers
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12

docs

buffer-management.md

client-operations.md

configuration-management.md

index.md

message-protocol.md

security-authentication.md

server-operations.md

shuffle-database.md

transport-context.md

tile.json