Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-common-2-12@3.5.0Apache Spark Network Common provides the core networking infrastructure for Apache Spark, implementing a high-performance transport layer built on Netty for efficient inter-node communication in distributed Spark clusters. It includes comprehensive networking components such as transport contexts for managing network connections, buffer management for zero-copy operations, client-server communication protocols, cryptographic support with forward-secure authentication protocols, SASL-based authentication mechanisms, and specialized shuffle database functionality.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.12</artifactId>
<version>3.5.6</version>
</dependency>implementation 'org.apache.spark:spark-network-common_2.12:3.5.6'import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.MapConfigProvider;
// Configure transport layer
Map<String, String> config = new HashMap<>();
config.put("spark.network.timeout", "120s");
ConfigProvider provider = new MapConfigProvider(config);
TransportConf conf = new TransportConf("test", provider);
// Create transport context (main entry point)
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
// Create and start server
TransportServer server = context.createServer();
int port = server.getPort();
// Create client factory and connect to server
TransportClientFactory clientFactory = context.createClientFactory();
TransportClient client = clientFactory.createClient("localhost", port);
// Send RPC message
ByteBuffer message = ByteBuffer.wrap("Hello, Spark!".getBytes());
client.sendRpc(message, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
System.out.println("Response: " + new String(response.array()));
}
@Override
public void onFailure(Throwable e) {
System.err.println("RPC failed: " + e.getMessage());
}
});
// Cleanup
client.close();
clientFactory.close();
server.close();
context.close();Apache Spark Network Common is built around several key components:
TransportContext as the main entry point for creating clients and serversTransportClient for client operations and TransportServer for server operationsManagedBuffer implementationsConfigProvider patternMain entry point for creating transport clients and servers, managing Netty pipeline setup and network configuration.
public final class TransportContext implements Closeable {
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
public TransportClientFactory createClientFactory();
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
public TransportServer createServer();
public void close();
}High-performance client functionality for fetching data chunks, sending RPCs, and streaming data with full thread safety.
public class TransportClient implements Closeable {
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
public void stream(String streamId, StreamCallback callback);
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
public void send(ByteBuffer message);
public boolean isActive();
public void close();
}Server-side functionality for handling client connections, processing RPC requests, and managing data streams.
public class TransportServer implements Closeable {
public int getPort();
public MetricSet getAllMetrics();
public Counter getRegisteredConnections();
public void close();
}
public abstract class RpcHandler {
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
public abstract StreamManager getStreamManager();
public void channelActive(TransportClient client);
public void channelInactive(TransportClient client);
public void exceptionCaught(Throwable cause, TransportClient client);
}Zero-copy buffer management system with different backing implementations for efficient memory usage and data transfer.
public abstract class ManagedBuffer {
public abstract long size();
public abstract ByteBuffer nioByteBuffer() throws IOException;
public abstract InputStream createInputStream() throws IOException;
public abstract ManagedBuffer retain();
public abstract ManagedBuffer release();
public abstract Object convertToNetty() throws IOException;
}Comprehensive message protocol system for different types of network communication including RPC, streaming, and chunk fetching.
public interface Message extends Encodable {
Type type();
ManagedBuffer body();
boolean isBodyInFrame();
enum Type {
ChunkFetchRequest, ChunkFetchSuccess, ChunkFetchFailure,
RpcRequest, RpcResponse, RpcFailure,
StreamRequest, StreamResponse, StreamFailure,
OneWayMessage, UploadStream,
MergedBlockMetaRequest, MergedBlockMetaSuccess,
User
}
}Comprehensive security layer including SASL authentication and AES encryption for secure network communication.
public interface TransportCipher {
String getKeyId() throws GeneralSecurityException;
void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
}
public interface SecretKeyHolder {
String getSaslUser(String appId);
String getSecretKey(String appId);
}Specialized database functionality for handling shuffle data storage using LevelDB and RocksDB backends.
public interface DB extends Closeable {
void put(byte[] key, byte[] value) throws IOException;
byte[] get(byte[] key) throws IOException;
void delete(byte[] key) throws IOException;
DBIterator iterator();
}
public enum DBBackend {
LEVELDB, ROCKSDB;
public String fileName(String prefix);
public static DBBackend byName(String value);
}Flexible configuration system with provider pattern for managing transport layer settings and network parameters.
public class TransportConf {
public TransportConf(String module, ConfigProvider conf);
public String ioMode();
public boolean preferDirectBufs();
public int connectionTimeoutMs();
public int numConnectionsPerPeer();
public int serverThreads();
public int clientThreads();
public boolean encryptionEnabled();
public boolean saslEncryption();
}
public abstract class ConfigProvider {
public abstract String get(String name);
public String get(String name, String defaultValue);
public boolean getBoolean(String name, boolean defaultValue);
public int getInt(String name, int defaultValue);
public long getLong(String name, long defaultValue);
}public interface Closeable {
void close() throws IOException;
}
public interface Encodable {
int encodedLength();
void encode(ByteBuf buf);
}public interface BaseResponseCallback {
void onFailure(Throwable e);
}
public interface RpcResponseCallback extends BaseResponseCallback {
void onSuccess(ByteBuffer response);
}
public interface ChunkReceivedCallback {
void onSuccess(int chunkIndex, ManagedBuffer buffer);
void onFailure(int chunkIndex, Throwable e);
}
public interface StreamCallback {
void onData(String streamId, ByteBuffer buf) throws IOException;
void onComplete(String streamId) throws IOException;
void onFailure(String streamId, Throwable cause) throws IOException;
}
public interface StreamCallbackWithID extends StreamCallback {
String getID();
}public interface TransportClientBootstrap {
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}
public interface TransportServerBootstrap {
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}public enum IOMode {
NIO, EPOLL
}public class ChunkFetchFailureException extends RuntimeException {
public ChunkFetchFailureException(String errorMsg, Throwable cause);
public ChunkFetchFailureException(String errorMsg);
}
public class SaslTimeoutException extends RuntimeException {
// Standard RuntimeException constructors
}
public class BlockPushNonFatalFailure extends RuntimeException {
// Standard RuntimeException constructors
}