Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.
—
The TransportContext class is the main entry point for Apache Spark's networking layer. It manages Netty pipeline setup, creates client factories and servers, and handles network configuration. This class provides the foundation for all network communication in Spark.
Creates a transport context with the specified configuration and RPC handler.
/**
* Create a TransportContext with basic configuration
* @param conf - Transport configuration containing network settings
* @param rpcHandler - Handler for processing RPC messages
*/
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
/**
* Create a TransportContext with idle connection management
* @param conf - Transport configuration containing network settings
* @param rpcHandler - Handler for processing RPC messages
* @param closeIdleConnections - Whether to close idle connections automatically
*/
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
/**
* Create a TransportContext with full configuration options
* @param conf - Transport configuration containing network settings
* @param rpcHandler - Handler for processing RPC messages
* @param closeIdleConnections - Whether to close idle connections automatically
* @param isClientOnly - Whether this context is for client operations only
*/
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);Creates factories for generating transport clients with optional bootstrap customization.
/**
* Create a client factory with custom bootstrap configurations
* @param bootstraps - List of bootstrap configurations for client initialization
* @return TransportClientFactory for creating transport clients
*/
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
/**
* Create a client factory with default configuration
* @return TransportClientFactory for creating transport clients
*/
public TransportClientFactory createClientFactory();Creates transport servers for handling incoming client connections with flexible configuration options.
/**
* Create a server bound to a specific port with bootstrap configurations
* @param port - Port number to bind the server to
* @param bootstraps - List of bootstrap configurations for server initialization
* @return TransportServer instance bound to the specified port
*/
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
/**
* Create a server bound to a specific host and port with bootstrap configurations
* @param host - Host address to bind the server to
* @param port - Port number to bind the server to
* @param bootstraps - List of bootstrap configurations for server initialization
* @return TransportServer instance bound to the specified host and port
*/
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
/**
* Create a server with bootstrap configurations, using system-assigned port
* @param bootstraps - List of bootstrap configurations for server initialization
* @return TransportServer instance with system-assigned port
*/
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
/**
* Create a server with default configuration and system-assigned port
* @return TransportServer instance with default configuration
*/
public TransportServer createServer();Manages Netty channel pipeline initialization for both client and server channels.
/**
* Initialize the Netty pipeline for a socket channel with default RPC handler
* @param channel - SocketChannel to initialize pipeline for
* @return TransportChannelHandler for managing the channel
*/
public TransportChannelHandler initializePipeline(SocketChannel channel);
/**
* Initialize the Netty pipeline for a socket channel with custom RPC handler
* @param channel - SocketChannel to initialize pipeline for
* @param channelRpcHandler - Custom RPC handler for this specific channel
* @return TransportChannelHandler for managing the channel
*/
public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);Provides access to configuration settings and connection monitoring capabilities.
/**
* Get the transport configuration used by this context
* @return TransportConf instance containing all configuration settings
*/
public TransportConf getConf();
/**
* Get metrics counter for registered connections
* @return Counter tracking the number of registered connections
*/
public Counter getRegisteredConnections();Properly closes and cleans up all resources associated with the transport context.
/**
* Close the transport context and release all associated resources
* This includes closing all client factories, servers, and network resources
*/
public void close();import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.MapConfigProvider;
// Create configuration
Map<String, String> config = new HashMap<>();
config.put("spark.network.timeout", "120s");
config.put("spark.network.io.mode", "NIO");
ConfigProvider provider = new MapConfigProvider(config);
TransportConf conf = new TransportConf("spark", provider);
// Create transport context
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
// Use context to create clients and servers
TransportServer server = context.createServer();
TransportClientFactory clientFactory = context.createClientFactory();
// Cleanup
clientFactory.close();
server.close();
context.close();import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import org.apache.spark.network.sasl.SaslServerBootstrap;
// Create context with SASL authentication
List<TransportClientBootstrap> clientBootstraps = Arrays.asList(
new SaslClientBootstrap(conf, "myapp", secretKeyHolder)
);
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
new SaslServerBootstrap(conf, secretKeyHolder)
);
TransportContext context = new TransportContext(conf, rpcHandler, true, false);
// Create authenticated client factory and server
TransportClientFactory clientFactory = context.createClientFactory(clientBootstraps);
TransportServer server = context.createServer(9999, serverBootstraps);
System.out.println("Server listening on port: " + server.getPort());// Create a lightweight context for client-only operations
TransportContext clientContext = new TransportContext(conf, rpcHandler, true, true);
TransportClientFactory factory = clientContext.createClientFactory();
// Connect to existing servers
TransportClient client1 = factory.createClient("server1.example.com", 9999);
TransportClient client2 = factory.createClient("server2.example.com", 9999);
// Use clients for communication
client1.sendRpc(message, callback);
client2.fetchChunk(streamId, chunkIndex, chunkCallback);
// Cleanup
client1.close();
client2.close();
factory.close();
clientContext.close();public class TransportClientFactory implements Closeable {
public TransportClient createClient(String remoteHost, int remotePort);
public TransportClient createClient(String remoteHost, int remotePort, int clientId);
public void close();
}
public class TransportServer implements Closeable {
public int getPort();
public MetricSet getAllMetrics();
public Counter getRegisteredConnections();
public void close();
}
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
public TransportClient getClient();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12