or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdprotocol-messages.mdsecurity.mdserver-components.mdshuffle-client.md
tile.json

tessl/maven-org-apache-spark--spark-network-shuffle_2-10

External shuffle service client for Apache Spark that enables reading shuffle blocks from external servers instead of executors

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-shuffle_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle_2-10@1.6.0

index.mddocs/

Apache Spark Network Shuffle

Apache Spark Network Shuffle provides external shuffle service functionality that enables reading shuffle blocks from external servers instead of directly from executors. This improves fault tolerance by allowing shuffle data to persist even when executors are lost, making Spark applications more reliable in distributed computing environments.

Package Information

  • Package Name: spark-network-shuffle_2.10
  • Package Type: Maven
  • Language: Java
  • Group ID: org.apache.spark
  • Installation: Add dependency to your Maven pom.xml:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-network-shuffle_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

Core Imports

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;

For server-side components:

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;

Basic Usage

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;

// Create simple configuration provider with defaults
ConfigProvider configProvider = new ConfigProvider() {
    @Override
    public String get(String name) {
        // Return default values for shuffle configuration
        if (name.equals("spark.shuffle.io.maxRetries")) return "3";
        if (name.equals("spark.shuffle.io.retryWait")) return "5s";
        throw new java.util.NoSuchElementException(name);
    }
};

// Create transport configuration
TransportConf conf = new TransportConf("shuffle", configProvider);

// Create client with SASL disabled for simplicity
ExternalShuffleClient client = new ExternalShuffleClient(
    conf, 
    null,       // secretKeyHolder - null for no SASL
    false,      // saslEnabled
    false       // saslEncryptionEnabled
);

// Initialize client
client.init("my-spark-app");

// Register executor with shuffle server
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
    new String[]{"/tmp/spark-shuffle"},  // local directories
    64,                                  // subdirs per local dir
    "org.apache.spark.shuffle.sort.SortShuffleManager"
);

client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);

// Implement callback for block fetching
BlockFetchingListener listener = new BlockFetchingListener() {
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Successfully fetched block: " + blockId);
        // Process the data...
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());
    }
};

// Fetch shuffle blocks
String[] blockIds = {"shuffle_1_2_0", "shuffle_1_2_1"};
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener);

// Close client when done
client.close();

Architecture

The Apache Spark Network Shuffle library is built around several key components:

  • Client Components: ExternalShuffleClient and MesosExternalShuffleClient provide the primary interface for fetching shuffle data from external services
  • Server Components: ExternalShuffleBlockHandler and ExternalShuffleBlockResolver implement the server-side logic for serving shuffle blocks
  • Protocol Layer: Network protocol messages (BlockTransferMessage subclasses) handle communication between clients and servers
  • Security Layer: ShuffleSecretManager provides SASL-based authentication for secure shuffle data access
  • Retry Mechanisms: RetryingBlockFetcher and OneForOneBlockFetcher implement fault-tolerant block fetching with configurable retry logic

Capabilities

Shuffle Client Operations

Primary client interface for fetching shuffle blocks from external shuffle services. Supports both basic and Mesos-specific deployments with configurable SASL authentication.

public abstract class ShuffleClient implements Closeable {
    public void init(String appId);
    public abstract void fetchBlocks(
        String host, 
        int port, 
        String execId, 
        String[] blockIds, 
        BlockFetchingListener listener
    );
}

public class ExternalShuffleClient extends ShuffleClient {
    public ExternalShuffleClient(
        TransportConf conf,
        SecretKeyHolder secretKeyHolder,
        boolean saslEnabled,
        boolean saslEncryptionEnabled
    );
    
    public void registerWithShuffleServer(
        String host,
        int port,
        String execId,
        ExecutorShuffleInfo executorInfo
    ) throws IOException;
}

Shuffle Client

Server-Side Block Management

Server-side components that handle shuffle block requests, manage executor registrations, and resolve block locations on the filesystem.

public class ExternalShuffleBlockHandler extends RpcHandler {
    public ExternalShuffleBlockHandler(
        TransportConf conf, 
        File registeredExecutorFile
    ) throws IOException;
    
    public void receive(
        TransportClient client, 
        ByteBuffer message, 
        RpcResponseCallback callback
    );
}

public class ExternalShuffleBlockResolver {
    public ExternalShuffleBlockResolver(
        TransportConf conf, 
        File registeredExecutorFile
    ) throws IOException;
    
    public void registerExecutor(
        String appId, 
        String execId, 
        ExecutorShuffleInfo executorInfo
    );
    
    public ManagedBuffer getBlockData(
        String appId, 
        String execId, 
        String blockId
    ) throws IOException;
}

Server Components

Network Protocol Messages

Protocol message classes for communication between shuffle clients and servers, including executor registration, block requests, and data transfer.

public abstract class BlockTransferMessage implements Encodable {
    public ByteBuffer toByteBuffer();
    
    public static class Decoder {
        public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
    }
}

public class ExecutorShuffleInfo implements Encodable {
    public ExecutorShuffleInfo(
        String[] localDirs,
        int subDirsPerLocalDir,
        String shuffleManager
    );
}

Protocol Messages

Security and Authentication

SASL-based security mechanisms for authenticating shuffle clients with external shuffle services, including secret management and secure communication.

public class ShuffleSecretManager implements SecretKeyHolder {
    public ShuffleSecretManager();
    
    public void registerApp(String appId, String shuffleSecret);
    public void unregisterApp(String appId);
    public String getSecretKey(String appId);
}

Security

Types

public interface BlockFetchingListener extends EventListener {
    void onBlockFetchSuccess(String blockId, ManagedBuffer data);
    void onBlockFetchFailure(String blockId, Throwable exception);
}

public enum BlockTransferMessage.Type {
    OPEN_BLOCKS(0),
    UPLOAD_BLOCK(1), 
    REGISTER_EXECUTOR(2),
    STREAM_HANDLE(3),
    REGISTER_DRIVER(4);
}