or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md
tile.json

tessl/maven-org-apache-spark--spark-network-shuffle-2-11

Spark Project Shuffle Streaming Service - provides network shuffle functionality for Apache Spark's distributed computing engine

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-shuffle_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle-2-11@2.4.0

index.mddocs/

Apache Spark Network Shuffle

Apache Spark Network Shuffle is a Java library that provides network-based shuffle functionality for Apache Spark's distributed computing framework. It enables efficient data exchange between Spark executors during shuffle operations through external shuffle services, supporting secure authentication, retry mechanisms, and comprehensive metrics collection.

Package Information

  • Package Name: spark-network-shuffle_2.11
  • Package Type: maven
  • Language: Java
  • Installation: Maven dependency org.apache.spark:spark-network-shuffle_2.11:2.4.8

Core Imports

import org.apache.spark.network.shuffle.ShuffleClient;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;

Basic Usage

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.buffer.ManagedBuffer;

// Create shuffle secret manager for authentication
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp("myApp", "mySecretKey");

// Create external shuffle client
TransportConf conf = new TransportConf("shuffle");
ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 5000);

// Initialize client
client.init("myApp");

// Register executor with shuffle service
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);

// Fetch blocks with listener
BlockFetchingListener listener = new BlockFetchingListener() {
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Successfully fetched block: " + blockId);
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Failed to fetch block: " + blockId);
    }
};

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);

// Close client when done
client.close();

Architecture

Apache Spark Network Shuffle is built around several key components:

  • Shuffle Clients: Client-side components for fetching shuffle blocks from external services
  • Shuffle Handlers: Server-side RPC handlers that serve shuffle blocks to clients
  • Block Resolution: Components that convert shuffle block IDs to physical file segments
  • Security Management: SASL-based authentication system for secure shuffle operations
  • Protocol Messages: Serializable message classes for client-server communication
  • File Management: Temporary file handling during block transfer operations
  • Retry Mechanisms: Automatic retry functionality for handling transient failures

Capabilities

Shuffle Client Operations

Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.

public abstract class ShuffleClient implements Closeable {
    public void init(String appId);
    public abstract void fetchBlocks(
        String host, int port, String execId, String[] blockIds,
        BlockFetchingListener listener, DownloadFileManager downloadFileManager
    );
    public MetricSet shuffleMetrics();
    public void close();
}

public class ExternalShuffleClient extends ShuffleClient {
    public ExternalShuffleClient(
        TransportConf conf, SecretKeyHolder secretKeyHolder,
        boolean authEnabled, long registrationTimeoutMs
    );
    public void registerWithShuffleServer(
        String host, int port, String execId, ExecutorShuffleInfo executorInfo
    ) throws IOException, InterruptedException;
}

Shuffle Client Operations

Shuffle Server Components

Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.

public class ExternalShuffleBlockHandler extends RpcHandler {
    public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);
    public void receive(
        TransportClient client, ByteBuffer message, RpcResponseCallback callback
    );
    public void applicationRemoved(String appId, boolean cleanupLocalDirs);
    public void executorRemoved(String executorId, String appId);
}

public class ExternalShuffleBlockResolver {
    public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);
    public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
    public ManagedBuffer getBlockData(
        String appId, String execId, int shuffleId, int mapId, int reduceId
    );
}

Shuffle Server Components

Authentication and Security

SASL-based authentication system for securing shuffle operations between clients and external shuffle services.

public class ShuffleSecretManager implements SecretKeyHolder {
    private static final String SPARK_SASL_USER = "sparkSaslUser";
    
    public ShuffleSecretManager();
    public void registerApp(String appId, String shuffleSecret);
    public void registerApp(String appId, ByteBuffer shuffleSecret);
    public void unregisterApp(String appId);
    public String getSaslUser(String appId);
    public String getSecretKey(String appId);
}

Authentication and Security

Protocol Messages

Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.

public abstract class BlockTransferMessage implements Encodable {
    public ByteBuffer toByteBuffer();
    
    public enum Type {
        OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,
        REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM
    }
    
    public static class Decoder {
        public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
    }
}

public class ExecutorShuffleInfo implements Encodable {
    public final String[] localDirs;
    public final int subDirsPerLocalDir;
    public final String shuffleManager;
    
    public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
}

Protocol Messages

Block Fetching and Retry Logic

Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.

public interface BlockFetchingListener extends EventListener {
    void onBlockFetchSuccess(String blockId, ManagedBuffer data);
    void onBlockFetchFailure(String blockId, Throwable exception);
}

public class OneForOneBlockFetcher {
    public OneForOneBlockFetcher(
        TransportClient client, String appId, String execId, String[] blockIds,
        BlockFetchingListener listener, TransportConf transportConf
    );
    public void start();
}

public class RetryingBlockFetcher {
    public RetryingBlockFetcher(
        TransportConf conf, BlockFetchStarter fetchStarter,
        String[] blockIds, BlockFetchingListener listener
    );
    public void start();
    
    public interface BlockFetchStarter {
        void createAndStart(String[] blockIds, BlockFetchingListener listener);
    }
}

Block Fetching and Retry Logic

File Management

Temporary file management system for handling downloaded blocks during transfer operations.

public interface DownloadFile {
    boolean delete();
    DownloadFileWritableChannel openForWriting() throws IOException;
    String path();
}

public interface DownloadFileManager {
    DownloadFile createTempFile(TransportConf transportConf);
    boolean registerTempFileToClean(DownloadFile file);
}

public interface DownloadFileWritableChannel extends WritableByteChannel {
    ManagedBuffer closeAndRead();
}

File Management

Mesos Integration

Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.

public class MesosExternalShuffleClient extends ExternalShuffleClient {
    public MesosExternalShuffleClient(
        TransportConf conf, SecretKeyHolder secretKeyHolder,
        boolean authEnabled, long registrationTimeoutMs
    );
    public void registerDriverWithShuffleService(
        String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs
    ) throws IOException, InterruptedException;
}

Mesos Integration

Types

public class ShuffleIndexRecord {
    public ShuffleIndexRecord(long offset, long length);
    public long getOffset();
    public long getLength();
}

public class ShuffleIndexInformation {
    public ShuffleIndexInformation(File indexFile);
    public int getSize();
    public ShuffleIndexRecord getIndex(int reduceId);
}

public class SimpleDownloadFile implements DownloadFile {
    public SimpleDownloadFile(File file, TransportConf transportConf);
    public boolean delete();
    public DownloadFileWritableChannel openForWriting();
    public String path();
}