Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication
—
Core factory and setup functionality for creating transport clients and servers. TransportContext serves as the main entry point for all networking operations in Apache Spark.
Main factory class for creating transport clients and servers with Netty pipeline management.
/**
* TransportContext manages the lifecycle and creation of transport clients and servers.
* It sets up Netty pipelines with proper handlers for the Spark networking protocol.
*
* @param conf Transport configuration settings
* @param rpcHandler Handler for processing RPC requests
* @param closeIdleConnections Whether to close idle connections automatically
*/
public class TransportContext {
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
/** Creates a client factory for managing outbound connections */
public TransportClientFactory createClientFactory();
/** Creates a client factory with custom bootstrap handlers */
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
/** Creates a server on any available port */
public TransportServer createServer();
/** Creates a server on the specified port */
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
/** Creates a server bound to specific host and port */
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
/** Creates a server with custom bootstrap handlers */
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
/** Initializes Netty pipeline for a channel */
public TransportChannelHandler initializePipeline(SocketChannel channel);
/** Initializes Netty pipeline with custom RPC handler */
public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);
/** Gets the transport configuration */
public TransportConf getConf();
}Basic Setup Example:
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
// Create configuration from system properties
TransportConf conf = new TransportConf("spark.shuffle", new SystemPropertyConfigProvider());
// Create a no-op RPC handler for basic transport
RpcHandler handler = new NoOpRpcHandler();
// Create transport context
TransportContext context = new TransportContext(conf, handler);
// Now you can create clients and servers from this context
TransportServer server = context.createServer(8080, new ArrayList<>());
TransportClientFactory clientFactory = context.createClientFactory();Setup with Custom Configuration:
import org.apache.spark.network.util.MapConfigProvider;
import java.util.HashMap;
import java.util.Map;
// Create custom configuration
Map<String, String> configMap = new HashMap<>();
configMap.put("spark.shuffle.io.mode", "NIO");
configMap.put("spark.shuffle.io.preferDirectBufs", "true");
configMap.put("spark.shuffle.io.connectionTimeout", "120s");
TransportConf conf = new TransportConf("spark.shuffle", new MapConfigProvider(configMap));
TransportContext context = new TransportContext(conf, handler);Setup with Bootstrap Handlers:
import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import java.util.Arrays;
// Create SASL authentication bootstraps
SaslServerBootstrap serverBootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
SaslClientBootstrap clientBootstrap = new SaslClientBootstrap(conf, appId, secretKeyHolder);
// Create server with SASL authentication
TransportServer server = context.createServer(8080, Arrays.asList(serverBootstrap));
// Create client factory with SASL authentication
TransportClientFactory clientFactory = context.createClientFactory(Arrays.asList(clientBootstrap));Interfaces for customizing client and server channel initialization.
/**
* Interface for customizing client channel initialization.
* Implementations can add custom handlers to the Netty pipeline.
*/
public interface TransportClientBootstrap {
/**
* Customizes the client channel after it's created but before it's used.
*
* @param client The transport client instance
* @param channel The Netty channel to customize
*/
void doBootstrap(TransportClient client, Channel channel);
}
/**
* Interface for customizing server channel initialization.
* Implementations can modify the RPC handler or add custom pipeline handlers.
*/
public interface TransportServerBootstrap {
/**
* Customizes the server channel and potentially wraps the RPC handler.
*
* @param channel The Netty channel to customize
* @param rpcHandler The current RPC handler
* @return The RPC handler to use (may be the original or a wrapper)
*/
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}The transport setup integrates closely with the configuration system to control networking behavior.
Key Configuration Properties:
io.mode: Network IO mode (NIO or EPOLL)io.preferDirectBufs: Whether to use direct ByteBuffersio.connectionTimeout: Connection timeout in millisecondsio.numConnectionsPerPeer: Maximum connections per remote peerserverThreads: Number of server worker threadsclientThreads: Number of client worker threadsExample with Custom Configuration:
// Configure for high-throughput scenarios
Map<String, String> config = new HashMap<>();
config.put("spark.network.io.mode", "EPOLL"); // Use EPOLL on Linux
config.put("spark.network.io.preferDirectBufs", "true"); // Use direct buffers
config.put("spark.network.io.numConnectionsPerPeer", "5"); // More connections per peer
config.put("spark.network.serverThreads", "8"); // More server threads
TransportConf conf = new TransportConf("spark.network", new MapConfigProvider(config));
TransportContext context = new TransportContext(conf, rpcHandler, true); // Close idle connectionsTransport setup operations can throw various exceptions that should be handled appropriately.
Common Setup Exceptions:
try {
TransportServer server = context.createServer("localhost", 8080, bootstraps);
System.out.println("Server started on port: " + server.getPort());
} catch (Exception e) {
// Handle server creation failure (port in use, binding issues, etc.)
System.err.println("Failed to create server: " + e.getMessage());
}
try {
TransportClient client = clientFactory.createClient("remote-host", 9090);
System.out.println("Connected to: " + client.getSocketAddress());
} catch (Exception e) {
// Handle connection failure (host unreachable, connection refused, etc.)
System.err.println("Failed to connect: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10