CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common

Core networking library for Apache Spark providing transport layer abstractions and utilities

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Apache Spark Network-Common

Apache Spark Network-Common provides the foundational networking layer for Apache Spark's distributed computing engine. It implements a high-performance transport abstraction built on Netty, offering comprehensive client-server communication capabilities including RPC handling, data streaming, authentication (including SASL and custom auth protocols), encryption, and connection management.

Package Information

  • Package Name: spark-network-common_2.11
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.spark
  • Artifact ID: spark-network-common_2.11
  • Installation: <dependency><groupId>org.apache.spark</groupId><artifactId>spark-network-common_2.11</artifactId><version>2.4.8</version></dependency>

Core Imports

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.util.TransportConf;

Basic Usage

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.MapConfigProvider;

// Configure transport
TransportConf conf = new TransportConf("myapp", new MapConfigProvider(configMap));

// Create RPC handler
RpcHandler rpcHandler = new MyRpcHandler();

// Set up transport context
TransportContext context = new TransportContext(conf, rpcHandler);

// Create server
TransportServer server = context.createServer(0); // 0 = any available port
int port = server.getPort();

// Create client factory and client
TransportClientFactory clientFactory = context.createClientFactory();
TransportClient client = clientFactory.createClient("localhost", port);

// Send RPC
ByteBuffer message = ByteBuffer.wrap("Hello".getBytes());
client.sendRpc(message, new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
        System.out.println("Response received: " + new String(response.array()));
    }
    
    @Override
    public void onFailure(Throwable e) {
        System.err.println("RPC failed: " + e.getMessage());
    }
});

// Cleanup
client.close();
server.close();
clientFactory.close();

Architecture

Apache Spark Network-Common is built around several key components:

  • Transport Context: Central factory (TransportContext) for creating servers and client factories with consistent configuration
  • Client-Server Model: Asynchronous client (TransportClient) and server (TransportServer) implementations with connection pooling
  • RPC Framework: Pluggable RPC handlers with support for bidirectional communication and one-way messages
  • Stream Management: Efficient streaming data transfer with chunk-based fetching and zero-copy I/O
  • Authentication Layer: Pluggable authentication via SASL and custom protocols with encryption support
  • Buffer Abstraction: Unified buffer interface (ManagedBuffer) with multiple backing implementations (file, memory, Netty)
  • Protocol Stack: Complete message protocol with encoding/decoding and frame-based transport

Capabilities

Transport Layer

Core networking functionality providing client-server communication with connection pooling, automatic reconnection, and resource management.

public class TransportContext {
    public TransportContext(TransportConf conf, RpcHandler rpcHandler);
    public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
    public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
    public TransportClientFactory createClientFactory();
    public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
    public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
    public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
    public TransportServer createServer();
}

public class TransportClient {
    public Channel getChannel();
    public boolean isActive();
    public SocketAddress getSocketAddress();
    public String getClientId();
    public void setClientId(String id);
    public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
    public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
    public void send(ByteBuffer message);
    public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
    public void stream(String streamId, StreamCallback callback);
    public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
    public void removeRpcRequest(long requestId);
    public void timeOut();
    public void close();
}

public class TransportServer {
    public int getPort();
    public void close();
}

Transport Layer

Buffer Management

Unified buffer abstraction with multiple backing implementations for efficient memory and file-based data handling.

public abstract class ManagedBuffer {
    public abstract long size();
    public abstract ByteBuffer nioByteBuffer() throws IOException;
    public abstract InputStream createInputStream() throws IOException;
    public abstract ManagedBuffer retain();
    public abstract ManagedBuffer release();
    public abstract Object convertToNetty() throws IOException;
}

public final class FileSegmentManagedBuffer extends ManagedBuffer {
    public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
    public File getFile();
    public long getOffset();
    public long getLength();
}

Buffer Management

Protocol Handling

Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication.

public interface Message extends Encodable {
}

public interface Encodable {
    int encodedLength();
    void encode(ByteBuf buf);
}

public final class RpcRequest extends AbstractMessage implements RequestMessage {
    public RpcRequest(long requestId, ManagedBuffer message);
}

public final class RpcResponse extends AbstractResponseMessage implements ResponseMessage {
    public RpcResponse(long requestId, ManagedBuffer message);
}

Protocol Handling

Authentication

Pluggable authentication system supporting SASL and custom authentication protocols with encryption.

public class SaslClientBootstrap implements TransportClientBootstrap {
    public void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

public class SaslServerBootstrap implements TransportServerBootstrap {
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

public interface SecretKeyHolder {
    String getSaslUser(String appId);
    String getSecretKey(String appId);
}

Authentication

Stream Management

Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead.

public abstract class StreamManager {
    public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
    public abstract ManagedBuffer openStream(String streamId);
    public void connectionTerminated(Channel channel);
    public void checkAuthorization(TransportClient client, long streamId);
}

public class OneForOneStreamManager extends StreamManager {
    public OneForOneStreamManager();
    public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);
}

Stream Management

Configuration

Comprehensive configuration system with performance tuning options for connection management, I/O settings, and security parameters.

public class TransportConf {
    public TransportConf(String module, ConfigProvider conf);
    public int connectionTimeoutMs();
    public int numConnectionsPerPeer();
    public int serverThreads();
    public int clientThreads();
    public int receiveBuf();
    public int sendBuf();
    public boolean encryptionEnabled();
    public String cipherTransformation();
}

Configuration and Utilities

Types

// Core callback interfaces
public interface RpcResponseCallback {
    void onSuccess(ByteBuffer response);
    void onFailure(Throwable e);
}

public interface ChunkReceivedCallback {
    void onSuccess(int chunkIndex, ManagedBuffer buffer);
    void onFailure(int chunkIndex, Throwable e);
}

public interface StreamCallback {
    void onData(String streamId, ByteBuffer buf) throws IOException;
    void onComplete(String streamId) throws IOException;
    void onFailure(String streamId, Throwable cause) throws IOException;
}

// Bootstrap interfaces
public interface TransportClientBootstrap {
    void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

public interface TransportServerBootstrap {
    RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

// Configuration provider
public abstract class ConfigProvider {
    public abstract String get(String name);
}

public class MapConfigProvider extends ConfigProvider {
    public MapConfigProvider(Map<String, String> props);
}

docs

authentication.md

buffers.md

configuration.md

index.md

protocol.md

streaming.md

transport.md

tile.json