or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-apis.mdindex.mdprotocol-messages.mdsecurity.mdserver-apis.md
tile.json

tessl/maven-org-apache-spark--spark-network-shuffle_2-11

External shuffle service client and server for fault-tolerant data shuffling in Apache Spark

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-shuffle_2.11@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle_2-11@1.6.0

index.mddocs/

Spark Network Shuffle

Apache Spark Network Shuffle provides external shuffle service functionality that enables fault-tolerant data shuffling in distributed Spark applications. This library allows Spark executors to read shuffle data from external services rather than directly from other executors, improving fault tolerance by preserving shuffle data even when executors are lost.

Package Information

  • Package Name: org.apache.spark:spark-network-shuffle_2.11
  • Package Type: maven
  • Language: Java
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-network-shuffle_2.11</artifactId>
    <version>1.6.3</version>
</dependency>

Core Imports

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

Basic Usage

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.TransportConf;

// Create client configuration
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());

// Initialize external shuffle client
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
client.init("my-app-id");

// Set up block fetch listener
BlockFetchingListener listener = new BlockFetchingListener() {
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Successfully fetched block: " + blockId);
        // Process the block data
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Failed to fetch block: " + blockId);
        exception.printStackTrace();
    }
};

// Fetch shuffle blocks from external service
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
client.fetchBlocks("shuffle-service-host", 7337, "executor-1", blockIds, listener);

// Clean up
client.close();

Architecture

The Spark Network Shuffle module is built around several key components:

  • Client Layer: ExternalShuffleClient and related classes provide the client-side API for fetching shuffle blocks with retry logic and fault tolerance
  • Server Layer: ExternalShuffleBlockHandler and ExternalShuffleBlockResolver handle incoming requests and resolve block locations on the server side
  • Protocol Layer: Structured message classes for client-server communication using efficient binary serialization
  • Security Layer: SASL authentication support for secure communication between clients and shuffle services
  • Fault Tolerance: Built-in retry mechanisms and proper error handling to ensure reliable data access in distributed environments

Capabilities

Client APIs

Client-side functionality for fetching shuffle blocks from external shuffle services with comprehensive error handling and retry logic.

public abstract class ShuffleClient implements Closeable {
    public void init(String appId);
    public abstract void fetchBlocks(String host, int port, String execId, 
                                   String[] blockIds, BlockFetchingListener listener);
}

public class ExternalShuffleClient extends ShuffleClient {
    public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, 
                               boolean saslEnabled, boolean saslEncryptionEnabled);
    public void registerWithShuffleServer(String host, int port, String execId, 
                                        ExecutorShuffleInfo executorInfo) throws IOException;
}

Client APIs

Server APIs

Server-side components for handling shuffle block requests, resolving block locations, and managing executor registrations.

public class ExternalShuffleBlockHandler extends RpcHandler {
    public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
    public void applicationRemoved(String appId, boolean cleanupLocalDirs);
}

public class ExternalShuffleBlockResolver {
    public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
    public ManagedBuffer getBlockData(String appId, String execId, String blockId);
}

Server APIs

Protocol Messages

Structured communication protocol between shuffle clients and servers with efficient binary serialization.

public abstract class BlockTransferMessage implements Encodable {
    public ByteBuffer toByteBuffer();
    
    public enum Type {
        OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE, REGISTER_DRIVER
    }
}

public class ExecutorShuffleInfo implements Encodable {
    public final String[] localDirs;
    public final int subDirsPerLocalDir; 
    public final String shuffleManager;
}

Protocol Messages

Security

SASL authentication support for secure communication between shuffle clients and external shuffle services.

public class ShuffleSecretManager implements SecretKeyHolder {
    public void registerApp(String appId, String shuffleSecret);
    public void unregisterApp(String appId);
    public String getSecretKey(String appId);
}

Security

Types

Core Interfaces

public interface BlockFetchingListener extends EventListener {
    void onBlockFetchSuccess(String blockId, ManagedBuffer data);
    void onBlockFetchFailure(String blockId, Throwable exception);
}

Configuration Types

public static class AppExecId {
    public final String appId;
    public final String execId;
    
    public AppExecId(String appId, String execId);
    public boolean equals(Object o);
    public int hashCode();
    public String toString();
}