or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client.mdhandler.mdindex.mdmesos.mdprotocol.mdresolver.mdsecurity.md
tile.json

tessl/maven-org-apache-spark--spark-network-shuffle

External shuffle service for Apache Spark that enables shuffle operations outside of executor processes

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-shuffle_2.11@2.2.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle@2.2.0

index.mddocs/

Spark Network Shuffle

Spark Network Shuffle provides an external shuffle service for Apache Spark that enables shuffle operations to be performed outside of executor processes. This improves resource utilization and fault tolerance by allowing executors to store and retrieve shuffle data through a dedicated service.

Package Information

  • Package Name: spark-network-shuffle_2.11
  • Package Type: maven
  • Language: Java
  • Installation: Add Maven dependency: org.apache.spark:spark-network-shuffle_2.11:2.2.3

Core Imports

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;

Basic Usage

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.buffer.ManagedBuffer;

// Create and initialize client
TransportConf conf = new TransportConf("shuffle");
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
client.init("myApp");

// Register executor with shuffle service
String[] localDirs = {"/tmp/spark-shuffle"};
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
    localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");
client.registerWithShuffleServer("localhost", 7337, "executor-1", info);

// Fetch blocks asynchronously
String[] blockIds = {"shuffle_0_1_0", "shuffle_0_2_0"};
BlockFetchingListener listener = new BlockFetchingListener() {
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Fetched block: " + blockId);
        // Process block data
    }
    
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Failed to fetch block: " + blockId);
    }
};

client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);
client.close();

Architecture

Spark Network Shuffle is built around several key components:

  • Shuffle Client: Client-side interface for fetching shuffle blocks from external services
  • Block Handler: Server-side RPC handler that processes shuffle requests and manages block access
  • Block Resolver: Manages executor metadata and resolves shuffle block locations on disk
  • Protocol Messages: Structured messages for client-server communication using Netty encoding
  • SASL Security: Authentication system for secure shuffle operations
  • Mesos Integration: Specialized components for Mesos cluster manager integration

Capabilities

Shuffle Client Operations

Core client functionality for connecting to external shuffle services and fetching shuffle blocks with retry logic and authentication support.

public abstract class ShuffleClient implements Closeable {
    public void init(String appId);
    public abstract void fetchBlocks(
        String host, int port, String execId, String[] blockIds,
        BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);
}

public class ExternalShuffleClient extends ShuffleClient {
    public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled);
    public void registerWithShuffleServer(String host, int port, String execId, ExecutorShuffleInfo executorInfo) 
        throws IOException, InterruptedException;
}

Shuffle Client

Shuffle Service Handler

Server-side components for handling shuffle requests, managing registered executors, and streaming shuffle blocks to clients.

public class ExternalShuffleBlockHandler extends RpcHandler {
    public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
    public void applicationRemoved(String appId, boolean cleanupLocalDirs);
    public MetricSet getAllMetrics();
}

Service Handler

Block Resolution

Block resolver functionality for managing executor metadata, locating shuffle files on disk, and providing shuffle block data access.

public class ExternalShuffleBlockResolver {
    public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;
    public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
    public ManagedBuffer getBlockData(String appId, String execId, String blockId);
    public void applicationRemoved(String appId, boolean cleanupLocalDirs);
}

Block Resolution

Protocol Messages

Structured protocol messages for client-server communication, including executor registration, block requests, and streaming handles.

public abstract class BlockTransferMessage implements Encodable {
    protected abstract Type type();
    public ByteBuffer toByteBuffer();
    
    public static class Decoder {
        public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
    }
}

public class ExecutorShuffleInfo implements Encodable {
    public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
    public final String[] localDirs;
    public final int subDirsPerLocalDir;
    public final String shuffleManager;
}

Protocol Messages

Security and Authentication

SASL-based authentication system for secure shuffle operations, managing application secrets and authenticating client connections.

public class ShuffleSecretManager implements SecretKeyHolder {
    public ShuffleSecretManager();
    public void registerApp(String appId, String shuffleSecret);
    public void unregisterApp(String appId);
    public String getSecretKey(String appId);
}

Security

Mesos Integration

Specialized components for Mesos cluster manager integration, including driver registration and heartbeat mechanisms.

public class MesosExternalShuffleClient extends ExternalShuffleClient {
    public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, 
        boolean authEnabled);
    public void registerDriverWithShuffleService(String host, int port, 
        long heartbeatTimeoutMs, long heartbeatIntervalMs) throws IOException, InterruptedException;
}

Mesos Integration

Types

Core Types

public interface BlockFetchingListener extends EventListener {
    void onBlockFetchSuccess(String blockId, ManagedBuffer data);
    void onBlockFetchFailure(String blockId, Throwable exception);
}

public interface TempShuffleFileManager {
    File createTempShuffleFile();
    boolean registerTempShuffleFileToClean(File file);
}

public class ShuffleIndexRecord {
    public ShuffleIndexRecord(long offset, long length);
    public long getOffset();
    public long getLength();
}

Exception Types

// Common exceptions thrown by shuffle operations
java.io.IOException // File I/O operations, network connectivity
java.lang.InterruptedException // Blocking operations that can be interrupted  
java.lang.SecurityException // Authentication failures
java.lang.IllegalArgumentException // Invalid block IDs or parameters
java.lang.UnsupportedOperationException // Unsupported message types
java.lang.RuntimeException // Executor not registered or runtime errors