Apache Spark Network Shuffle is a Java library that provides network-based shuffle functionality for Apache Spark's distributed computing framework. It enables efficient data exchange between Spark executors during shuffle operations through external shuffle services, supporting secure authentication, retry mechanisms, and comprehensive metrics collection.
org.apache.spark:spark-network-shuffle_2.11:2.4.8import org.apache.spark.network.shuffle.ShuffleClient;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.buffer.ManagedBuffer;
// Create shuffle secret manager for authentication
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp("myApp", "mySecretKey");
// Create external shuffle client
TransportConf conf = new TransportConf("shuffle");
ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 5000);
// Initialize client
client.init("myApp");
// Register executor with shuffle service
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);
// Fetch blocks with listener
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Successfully fetched block: " + blockId);
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch block: " + blockId);
}
};
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener, null);
// Close client when done
client.close();Apache Spark Network Shuffle is built around several key components:
Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.
public abstract class ShuffleClient implements Closeable {
public void init(String appId);
public abstract void fetchBlocks(
String host, int port, String execId, String[] blockIds,
BlockFetchingListener listener, DownloadFileManager downloadFileManager
);
public MetricSet shuffleMetrics();
public void close();
}
public class ExternalShuffleClient extends ShuffleClient {
public ExternalShuffleClient(
TransportConf conf, SecretKeyHolder secretKeyHolder,
boolean authEnabled, long registrationTimeoutMs
);
public void registerWithShuffleServer(
String host, int port, String execId, ExecutorShuffleInfo executorInfo
) throws IOException, InterruptedException;
}Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.
public class ExternalShuffleBlockHandler extends RpcHandler {
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);
public void receive(
TransportClient client, ByteBuffer message, RpcResponseCallback callback
);
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
public void executorRemoved(String executorId, String appId);
}
public class ExternalShuffleBlockResolver {
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
public ManagedBuffer getBlockData(
String appId, String execId, int shuffleId, int mapId, int reduceId
);
}SASL-based authentication system for securing shuffle operations between clients and external shuffle services.
public class ShuffleSecretManager implements SecretKeyHolder {
private static final String SPARK_SASL_USER = "sparkSaslUser";
public ShuffleSecretManager();
public void registerApp(String appId, String shuffleSecret);
public void registerApp(String appId, ByteBuffer shuffleSecret);
public void unregisterApp(String appId);
public String getSaslUser(String appId);
public String getSecretKey(String appId);
}Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.
public abstract class BlockTransferMessage implements Encodable {
public ByteBuffer toByteBuffer();
public enum Type {
OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,
REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM
}
public static class Decoder {
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
}
}
public class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;
public final int subDirsPerLocalDir;
public final String shuffleManager;
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
}Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.
public interface BlockFetchingListener extends EventListener {
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
void onBlockFetchFailure(String blockId, Throwable exception);
}
public class OneForOneBlockFetcher {
public OneForOneBlockFetcher(
TransportClient client, String appId, String execId, String[] blockIds,
BlockFetchingListener listener, TransportConf transportConf
);
public void start();
}
public class RetryingBlockFetcher {
public RetryingBlockFetcher(
TransportConf conf, BlockFetchStarter fetchStarter,
String[] blockIds, BlockFetchingListener listener
);
public void start();
public interface BlockFetchStarter {
void createAndStart(String[] blockIds, BlockFetchingListener listener);
}
}Block Fetching and Retry Logic
Temporary file management system for handling downloaded blocks during transfer operations.
public interface DownloadFile {
boolean delete();
DownloadFileWritableChannel openForWriting() throws IOException;
String path();
}
public interface DownloadFileManager {
DownloadFile createTempFile(TransportConf transportConf);
boolean registerTempFileToClean(DownloadFile file);
}
public interface DownloadFileWritableChannel extends WritableByteChannel {
ManagedBuffer closeAndRead();
}Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.
public class MesosExternalShuffleClient extends ExternalShuffleClient {
public MesosExternalShuffleClient(
TransportConf conf, SecretKeyHolder secretKeyHolder,
boolean authEnabled, long registrationTimeoutMs
);
public void registerDriverWithShuffleService(
String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs
) throws IOException, InterruptedException;
}public class ShuffleIndexRecord {
public ShuffleIndexRecord(long offset, long length);
public long getOffset();
public long getLength();
}
public class ShuffleIndexInformation {
public ShuffleIndexInformation(File indexFile);
public int getSize();
public ShuffleIndexRecord getIndex(int reduceId);
}
public class SimpleDownloadFile implements DownloadFile {
public SimpleDownloadFile(File file, TransportConf transportConf);
public boolean delete();
public DownloadFileWritableChannel openForWriting();
public String path();
}