or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

buffer-management.mdclient-operations.mdconfiguration-management.mdindex.mdmessage-protocol.mdsecurity-authentication.mdserver-operations.mdshuffle-database.mdtransport-context.md
tile.json

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-common_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-common-2-12@3.5.0

index.mddocs/

Apache Spark Network Common

Apache Spark Network Common provides the core networking infrastructure for Apache Spark, implementing a high-performance transport layer built on Netty for efficient inter-node communication in distributed Spark clusters. It includes comprehensive networking components such as transport contexts for managing network connections, buffer management for zero-copy operations, client-server communication protocols, cryptographic support with forward-secure authentication protocols, SASL-based authentication mechanisms, and specialized shuffle database functionality.

Package Information

  • Package Name: org.apache.spark:spark-network-common_2.12
  • Package Type: maven
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-network-common_2.12</artifactId>
      <version>3.5.6</version>
    </dependency>
  • Gradle: implementation 'org.apache.spark:spark-network-common_2.12:3.5.6'

Core Imports

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;

Basic Usage

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.MapConfigProvider;

// Configure transport layer
Map<String, String> config = new HashMap<>();
config.put("spark.network.timeout", "120s");
ConfigProvider provider = new MapConfigProvider(config);
TransportConf conf = new TransportConf("test", provider);

// Create transport context (main entry point)
TransportContext context = new TransportContext(conf, new NoOpRpcHandler());

// Create and start server
TransportServer server = context.createServer();
int port = server.getPort();

// Create client factory and connect to server
TransportClientFactory clientFactory = context.createClientFactory();
TransportClient client = clientFactory.createClient("localhost", port);

// Send RPC message
ByteBuffer message = ByteBuffer.wrap("Hello, Spark!".getBytes());
client.sendRpc(message, new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
        System.out.println("Response: " + new String(response.array()));
    }
    
    @Override
    public void onFailure(Throwable e) {
        System.err.println("RPC failed: " + e.getMessage());
    }
});

// Cleanup
client.close();
clientFactory.close();
server.close();
context.close();

Architecture

Apache Spark Network Common is built around several key components:

  • Transport Layer: Core networking functionality with TransportContext as the main entry point for creating clients and servers
  • Client-Server Model: TransportClient for client operations and TransportServer for server operations
  • Message Protocol: Comprehensive message protocol system for different types of network communication
  • Buffer Management: Zero-copy buffer operations through ManagedBuffer implementations
  • Security Layer: Authentication (SASL) and encryption (AES-CTR/GCM) capabilities for secure communication
  • Configuration System: Flexible configuration management through ConfigProvider pattern
  • Database Support: Specialized shuffle database functionality using LevelDB and RocksDB backends

Capabilities

Transport Context

Main entry point for creating transport clients and servers, managing Netty pipeline setup and network configuration.

public final class TransportContext implements Closeable {
    public TransportContext(TransportConf conf, RpcHandler rpcHandler);
    public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
    public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, boolean isClientOnly);
    
    public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
    public TransportClientFactory createClientFactory();
    public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
    public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
    public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
    public TransportServer createServer();
    public void close();
}

Transport Context

Client Operations

High-performance client functionality for fetching data chunks, sending RPCs, and streaming data with full thread safety.

public class TransportClient implements Closeable {
    public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
    public void stream(String streamId, StreamCallback callback);
    public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
    public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
    public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
    public void send(ByteBuffer message);
    public boolean isActive();
    public void close();
}

Client Operations

Server Operations

Server-side functionality for handling client connections, processing RPC requests, and managing data streams.

public class TransportServer implements Closeable {
    public int getPort();
    public MetricSet getAllMetrics();
    public Counter getRegisteredConnections();
    public void close();
}

public abstract class RpcHandler {
    public abstract void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
    public abstract StreamManager getStreamManager();
    public void channelActive(TransportClient client);
    public void channelInactive(TransportClient client);
    public void exceptionCaught(Throwable cause, TransportClient client);
}

Server Operations

Buffer Management

Zero-copy buffer management system with different backing implementations for efficient memory usage and data transfer.

public abstract class ManagedBuffer {
    public abstract long size();
    public abstract ByteBuffer nioByteBuffer() throws IOException;
    public abstract InputStream createInputStream() throws IOException;
    public abstract ManagedBuffer retain();
    public abstract ManagedBuffer release();
    public abstract Object convertToNetty() throws IOException;
}

Buffer Management

Message Protocol

Comprehensive message protocol system for different types of network communication including RPC, streaming, and chunk fetching.

public interface Message extends Encodable {
    Type type();
    ManagedBuffer body();
    boolean isBodyInFrame();
    
    enum Type {
        ChunkFetchRequest, ChunkFetchSuccess, ChunkFetchFailure,
        RpcRequest, RpcResponse, RpcFailure,
        StreamRequest, StreamResponse, StreamFailure,
        OneWayMessage, UploadStream,
        MergedBlockMetaRequest, MergedBlockMetaSuccess,
        User
    }
}

Message Protocol

Security and Authentication

Comprehensive security layer including SASL authentication and AES encryption for secure network communication.

public interface TransportCipher {
    String getKeyId() throws GeneralSecurityException;
    void addToChannel(Channel channel) throws IOException, GeneralSecurityException;
}

public interface SecretKeyHolder {
    String getSaslUser(String appId);
    String getSecretKey(String appId);
}

Security and Authentication

Shuffle Database

Specialized database functionality for handling shuffle data storage using LevelDB and RocksDB backends.

public interface DB extends Closeable {
    void put(byte[] key, byte[] value) throws IOException;
    byte[] get(byte[] key) throws IOException;
    void delete(byte[] key) throws IOException;
    DBIterator iterator();
}

public enum DBBackend {
    LEVELDB, ROCKSDB;
    
    public String fileName(String prefix);
    public static DBBackend byName(String value);
}

Shuffle Database

Configuration Management

Flexible configuration system with provider pattern for managing transport layer settings and network parameters.

public class TransportConf {
    public TransportConf(String module, ConfigProvider conf);
    
    public String ioMode();
    public boolean preferDirectBufs();
    public int connectionTimeoutMs();
    public int numConnectionsPerPeer();
    public int serverThreads();
    public int clientThreads();
    public boolean encryptionEnabled();
    public boolean saslEncryption();
}

public abstract class ConfigProvider {
    public abstract String get(String name);
    public String get(String name, String defaultValue);
    public boolean getBoolean(String name, boolean defaultValue);
    public int getInt(String name, int defaultValue);
    public long getLong(String name, long defaultValue);
}

Configuration Management

Types

Core Interfaces

public interface Closeable {
    void close() throws IOException;
}

public interface Encodable {
    int encodedLength();
    void encode(ByteBuf buf);
}

Callback Interfaces

public interface BaseResponseCallback {
    void onFailure(Throwable e);
}

public interface RpcResponseCallback extends BaseResponseCallback {
    void onSuccess(ByteBuffer response);
}

public interface ChunkReceivedCallback {
    void onSuccess(int chunkIndex, ManagedBuffer buffer);
    void onFailure(int chunkIndex, Throwable e);
}

public interface StreamCallback {
    void onData(String streamId, ByteBuffer buf) throws IOException;
    void onComplete(String streamId) throws IOException;
    void onFailure(String streamId, Throwable cause) throws IOException;
}

public interface StreamCallbackWithID extends StreamCallback {
    String getID();
}

Bootstrap Interfaces

public interface TransportClientBootstrap {
    void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

public interface TransportServerBootstrap {
    RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

Enumeration Types

public enum IOMode {
    NIO, EPOLL
}

Exception Classes

public class ChunkFetchFailureException extends RuntimeException {
    public ChunkFetchFailureException(String errorMsg, Throwable cause);
    public ChunkFetchFailureException(String errorMsg);
}

public class SaslTimeoutException extends RuntimeException {
    // Standard RuntimeException constructors
}

public class BlockPushNonFatalFailure extends RuntimeException {
    // Standard RuntimeException constructors
}