Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-common-2-10@1.6.0Apache Spark Network Common provides the core networking infrastructure for Apache Spark cluster computing. It implements a high-performance, Netty-based transport layer that enables efficient communication between Spark components across cluster nodes with RPC, chunk fetching, SASL authentication, and comprehensive buffer management.
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.TransportConf;import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
// Create transport configuration
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
// Create RPC handler
RpcHandler rpcHandler = new NoOpRpcHandler();
// Create transport context
TransportContext context = new TransportContext(conf, rpcHandler);
// Create client factory for outbound connections
TransportClientFactory clientFactory = context.createClientFactory();
// Create client to connect to remote server
TransportClient client = clientFactory.createClient("localhost", 8080);
// Create server for inbound connections
TransportServer server = context.createServer(8080, new ArrayList<>());Apache Spark Network Common is built around several key architectural components:
TransportContext serving as the main factory for clients and serversTransportClient for outbound connections and TransportServer for inbound connectionsManagedBuffer) supporting memory, file, and Netty ByteBuf backendsCore factory and setup functionality for creating transport clients and servers. The main entry point for all networking operations in Spark.
public class TransportContext {
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
public TransportClientFactory createClientFactory();
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
public TransportServer createServer();
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
}Client-side networking functionality for establishing connections, sending RPC requests, and fetching data chunks from remote servers.
public class TransportClient implements Closeable {
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
public void stream(String streamId, StreamCallback callback);
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
public void send(ByteBuffer message);
}
public class TransportClientFactory implements Closeable {
public TransportClient createClient(String remoteHost, int remotePort);
public TransportClient createUnmanagedClient(String remoteHost, int remotePort);
}Server-side networking functionality for handling inbound connections, processing RPC requests, and managing data streams.
public class TransportServer implements Closeable {
public int getPort();
public void close();
}
public abstract class RpcHandler {
public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
public abstract StreamManager getStreamManager();
}
public abstract class StreamManager {
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
}Type-safe message protocol for network communication, including RPC requests/responses, chunk fetching, and streaming operations.
public interface Message {
Type type();
ManagedBuffer body();
boolean isBodyInFrame();
enum Type {
ChunkFetchRequest, ChunkFetchSuccess, ChunkFetchFailure,
RpcRequest, RpcResponse, RpcFailure,
StreamRequest, StreamResponse, StreamFailure,
OneWayMessage, User
}
}
public class RpcRequest extends AbstractMessage implements RequestMessage {
public final long requestId;
public RpcRequest(long requestId, ManagedBuffer message);
}
public class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
public final StreamChunkId streamChunkId;
public ChunkFetchRequest(StreamChunkId streamChunkId);
}Unified buffer management system providing abstractions over different buffer types including memory, file segments, and Netty ByteBufs.
public abstract class ManagedBuffer {
public abstract long size();
public abstract ByteBuffer nioByteBuffer() throws IOException;
public abstract InputStream createInputStream() throws IOException;
public abstract ManagedBuffer retain();
public abstract ManagedBuffer release();
public abstract Object convertToNetty() throws IOException;
}
public class NioManagedBuffer extends ManagedBuffer {
public NioManagedBuffer(ByteBuffer buf);
}
public class FileSegmentManagedBuffer extends ManagedBuffer {
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
}Security framework providing SASL-based authentication and encryption for secure network communication.
public interface SecretKeyHolder {
String getSaslUser(String appId);
String getSecretKey(String appId);
}
public class SaslClientBootstrap implements TransportClientBootstrap {
public SaslClientBootstrap(TransportConf conf, String appId, SecretKeyHolder secretKeyHolder);
}
public class SaslServerBootstrap implements TransportServerBootstrap {
public SaslServerBootstrap(TransportConf conf, SecretKeyHolder secretKeyHolder);
}Configuration system and utility classes for transport settings, Netty integration, and Java operations.
public class TransportConf {
public TransportConf(String module, ConfigProvider conf);
public String ioMode();
public boolean preferDirectBufs();
public int connectionTimeoutMs();
public int numConnectionsPerPeer();
public int serverThreads();
public int clientThreads();
}
public abstract class ConfigProvider {
public abstract String get(String name);
public String get(String name, String defaultValue);
public int getInt(String name, int defaultValue);
public boolean getBoolean(String name, boolean defaultValue);
}public interface RequestMessage extends Message {}
public interface ResponseMessage extends Message {}
public interface ChunkReceivedCallback {
void onSuccess(int chunkIndex, ManagedBuffer buffer);
void onFailure(int chunkIndex, Throwable e);
}
public interface RpcResponseCallback {
void onSuccess(ByteBuffer response);
void onFailure(Throwable e);
}
public interface StreamCallback {
void onData(String streamId, ByteBuffer buf);
void onComplete(String streamId);
void onFailure(String streamId, Throwable cause);
}
public interface TransportClientBootstrap {
void doBootstrap(TransportClient client, Channel channel);
}
public interface TransportServerBootstrap {
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}public class StreamChunkId {
public final long streamId;
public final int chunkIndex;
public StreamChunkId(long streamId, int chunkIndex);
public String toString();
public boolean equals(Object other);
public int hashCode();
}public enum IOMode {
NIO, EPOLL
}
public enum ByteUnit {
BYTE, KiB, MiB, GiB, TiB, PiB;
public long convertFrom(long d, ByteUnit u);
public long convertTo(long d, ByteUnit u);
public double toBytes(long d);
public long toKiB(long d);
public long toMiB(long d);
public long toGiB(long d);
public long toTiB(long d);
public long toPiB(long d);
}public class ChunkFetchFailureException extends RuntimeException {
public ChunkFetchFailureException(String errorMsg, Throwable cause);
public ChunkFetchFailureException(String errorMsg);
}