External shuffle service for Apache Spark that enables shuffle operations outside of executor processes
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle@2.2.0Spark Network Shuffle provides an external shuffle service for Apache Spark that enables shuffle operations to be performed outside of executor processes. This improves resource utilization and fault tolerance by allowing executors to store and retrieve shuffle data through a dedicated service.
org.apache.spark:spark-network-shuffle_2.11:2.2.3import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.buffer.ManagedBuffer;
// Create and initialize client
TransportConf conf = new TransportConf("shuffle");
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false);
client.init("myApp");
// Register executor with shuffle service
String[] localDirs = {"/tmp/spark-shuffle"};
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");
client.registerWithShuffleServer("localhost", 7337, "executor-1", info);
// Fetch blocks asynchronously
String[] blockIds = {"shuffle_0_1_0", "shuffle_0_2_0"};
BlockFetchingListener listener = new BlockFetchingListener() {
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Fetched block: " + blockId);
// Process block data
}
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch block: " + blockId);
}
};
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);
client.close();Spark Network Shuffle is built around several key components:
Core client functionality for connecting to external shuffle services and fetching shuffle blocks with retry logic and authentication support.
public abstract class ShuffleClient implements Closeable {
public void init(String appId);
public abstract void fetchBlocks(
String host, int port, String execId, String[] blockIds,
BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager);
}
public class ExternalShuffleClient extends ShuffleClient {
public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder, boolean authEnabled);
public void registerWithShuffleServer(String host, int port, String execId, ExecutorShuffleInfo executorInfo)
throws IOException, InterruptedException;
}Server-side components for handling shuffle requests, managing registered executors, and streaming shuffle blocks to clients.
public class ExternalShuffleBlockHandler extends RpcHandler {
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
public MetricSet getAllMetrics();
}Block resolver functionality for managing executor metadata, locating shuffle files on disk, and providing shuffle block data access.
public class ExternalShuffleBlockResolver {
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
public ManagedBuffer getBlockData(String appId, String execId, String blockId);
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
}Structured protocol messages for client-server communication, including executor registration, block requests, and streaming handles.
public abstract class BlockTransferMessage implements Encodable {
protected abstract Type type();
public ByteBuffer toByteBuffer();
public static class Decoder {
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
}
}
public class ExecutorShuffleInfo implements Encodable {
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
public final String[] localDirs;
public final int subDirsPerLocalDir;
public final String shuffleManager;
}SASL-based authentication system for secure shuffle operations, managing application secrets and authenticating client connections.
public class ShuffleSecretManager implements SecretKeyHolder {
public ShuffleSecretManager();
public void registerApp(String appId, String shuffleSecret);
public void unregisterApp(String appId);
public String getSecretKey(String appId);
}Specialized components for Mesos cluster manager integration, including driver registration and heartbeat mechanisms.
public class MesosExternalShuffleClient extends ExternalShuffleClient {
public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,
boolean authEnabled);
public void registerDriverWithShuffleService(String host, int port,
long heartbeatTimeoutMs, long heartbeatIntervalMs) throws IOException, InterruptedException;
}public interface BlockFetchingListener extends EventListener {
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
void onBlockFetchFailure(String blockId, Throwable exception);
}
public interface TempShuffleFileManager {
File createTempShuffleFile();
boolean registerTempShuffleFileToClean(File file);
}
public class ShuffleIndexRecord {
public ShuffleIndexRecord(long offset, long length);
public long getOffset();
public long getLength();
}// Common exceptions thrown by shuffle operations
java.io.IOException // File I/O operations, network connectivity
java.lang.InterruptedException // Blocking operations that can be interrupted
java.lang.SecurityException // Authentication failures
java.lang.IllegalArgumentException // Invalid block IDs or parameters
java.lang.UnsupportedOperationException // Unsupported message types
java.lang.RuntimeException // Executor not registered or runtime errors