Core networking library for Apache Spark providing transport layer abstractions and utilities
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-common@2.4.0Apache Spark Network-Common provides the foundational networking layer for Apache Spark's distributed computing engine. It implements a high-performance transport abstraction built on Netty, offering comprehensive client-server communication capabilities including RPC handling, data streaming, authentication (including SASL and custom auth protocols), encryption, and connection management.
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-network-common_2.11</artifactId><version>2.4.8</version></dependency>import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.util.TransportConf;import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.MapConfigProvider;
// Configure transport
TransportConf conf = new TransportConf("myapp", new MapConfigProvider(configMap));
// Create RPC handler
RpcHandler rpcHandler = new MyRpcHandler();
// Set up transport context
TransportContext context = new TransportContext(conf, rpcHandler);
// Create server
TransportServer server = context.createServer(0); // 0 = any available port
int port = server.getPort();
// Create client factory and client
TransportClientFactory clientFactory = context.createClientFactory();
TransportClient client = clientFactory.createClient("localhost", port);
// Send RPC
ByteBuffer message = ByteBuffer.wrap("Hello".getBytes());
client.sendRpc(message, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
System.out.println("Response received: " + new String(response.array()));
}
@Override
public void onFailure(Throwable e) {
System.err.println("RPC failed: " + e.getMessage());
}
});
// Cleanup
client.close();
server.close();
clientFactory.close();Apache Spark Network-Common is built around several key components:
TransportContext) for creating servers and client factories with consistent configurationTransportClient) and server (TransportServer) implementations with connection poolingManagedBuffer) with multiple backing implementations (file, memory, Netty)Core networking functionality providing client-server communication with connection pooling, automatic reconnection, and resource management.
public class TransportContext {
public TransportContext(TransportConf conf, RpcHandler rpcHandler);
public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
public TransportClientFactory createClientFactory();
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
public TransportServer createServer();
}
public class TransportClient {
public Channel getChannel();
public boolean isActive();
public SocketAddress getSocketAddress();
public String getClientId();
public void setClientId(String id);
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
public void send(ByteBuffer message);
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
public void stream(String streamId, StreamCallback callback);
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
public void removeRpcRequest(long requestId);
public void timeOut();
public void close();
}
public class TransportServer {
public int getPort();
public void close();
}Unified buffer abstraction with multiple backing implementations for efficient memory and file-based data handling.
public abstract class ManagedBuffer {
public abstract long size();
public abstract ByteBuffer nioByteBuffer() throws IOException;
public abstract InputStream createInputStream() throws IOException;
public abstract ManagedBuffer retain();
public abstract ManagedBuffer release();
public abstract Object convertToNetty() throws IOException;
}
public final class FileSegmentManagedBuffer extends ManagedBuffer {
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
public File getFile();
public long getOffset();
public long getLength();
}Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication.
public interface Message extends Encodable {
}
public interface Encodable {
int encodedLength();
void encode(ByteBuf buf);
}
public final class RpcRequest extends AbstractMessage implements RequestMessage {
public RpcRequest(long requestId, ManagedBuffer message);
}
public final class RpcResponse extends AbstractResponseMessage implements ResponseMessage {
public RpcResponse(long requestId, ManagedBuffer message);
}Pluggable authentication system supporting SASL and custom authentication protocols with encryption.
public class SaslClientBootstrap implements TransportClientBootstrap {
public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}
public class SaslServerBootstrap implements TransportServerBootstrap {
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}
public interface SecretKeyHolder {
String getSaslUser(String appId);
String getSecretKey(String appId);
}Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead.
public abstract class StreamManager {
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
public abstract ManagedBuffer openStream(String streamId);
public void connectionTerminated(Channel channel);
public void checkAuthorization(TransportClient client, long streamId);
}
public class OneForOneStreamManager extends StreamManager {
public OneForOneStreamManager();
public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);
}Comprehensive configuration system with performance tuning options for connection management, I/O settings, and security parameters.
public class TransportConf {
public TransportConf(String module, ConfigProvider conf);
public int connectionTimeoutMs();
public int numConnectionsPerPeer();
public int serverThreads();
public int clientThreads();
public int receiveBuf();
public int sendBuf();
public boolean encryptionEnabled();
public String cipherTransformation();
}// Core callback interfaces
public interface RpcResponseCallback {
void onSuccess(ByteBuffer response);
void onFailure(Throwable e);
}
public interface ChunkReceivedCallback {
void onSuccess(int chunkIndex, ManagedBuffer buffer);
void onFailure(int chunkIndex, Throwable e);
}
public interface StreamCallback {
void onData(String streamId, ByteBuffer buf) throws IOException;
void onComplete(String streamId) throws IOException;
void onFailure(String streamId, Throwable cause) throws IOException;
}
// Bootstrap interfaces
public interface TransportClientBootstrap {
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}
public interface TransportServerBootstrap {
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}
// Configuration provider
public abstract class ConfigProvider {
public abstract String get(String name);
}
public class MapConfigProvider extends ConfigProvider {
public MapConfigProvider(Map<String, String> props);
}