External shuffle service client and server for fault-tolerant data shuffling in Apache Spark
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle_2-11@1.6.0Apache Spark Network Shuffle provides external shuffle service functionality that enables fault-tolerant data shuffling in distributed Spark applications. This library allows Spark executors to read shuffle data from external services rather than directly from other executors, improving fault tolerance by preserving shuffle data even when executors are lost.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.11</artifactId>
<version>1.6.3</version>
</dependency>import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.TransportConf;
// Create client configuration
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
// Initialize external shuffle client
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
client.init("my-app-id");
// Set up block fetch listener
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Successfully fetched block: " + blockId);
// Process the block data
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch block: " + blockId);
exception.printStackTrace();
}
};
// Fetch shuffle blocks from external service
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
client.fetchBlocks("shuffle-service-host", 7337, "executor-1", blockIds, listener);
// Clean up
client.close();The Spark Network Shuffle module is built around several key components:
ExternalShuffleClient and related classes provide the client-side API for fetching shuffle blocks with retry logic and fault toleranceExternalShuffleBlockHandler and ExternalShuffleBlockResolver handle incoming requests and resolve block locations on the server sideClient-side functionality for fetching shuffle blocks from external shuffle services with comprehensive error handling and retry logic.
public abstract class ShuffleClient implements Closeable {
public void init(String appId);
public abstract void fetchBlocks(String host, int port, String execId,
String[] blockIds, BlockFetchingListener listener);
}
public class ExternalShuffleClient extends ShuffleClient {
public ExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,
boolean saslEnabled, boolean saslEncryptionEnabled);
public void registerWithShuffleServer(String host, int port, String execId,
ExecutorShuffleInfo executorInfo) throws IOException;
}Server-side components for handling shuffle block requests, resolving block locations, and managing executor registrations.
public class ExternalShuffleBlockHandler extends RpcHandler {
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
}
public class ExternalShuffleBlockResolver {
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
public ManagedBuffer getBlockData(String appId, String execId, String blockId);
}Structured communication protocol between shuffle clients and servers with efficient binary serialization.
public abstract class BlockTransferMessage implements Encodable {
public ByteBuffer toByteBuffer();
public enum Type {
OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE, REGISTER_DRIVER
}
}
public class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;
public final int subDirsPerLocalDir;
public final String shuffleManager;
}SASL authentication support for secure communication between shuffle clients and external shuffle services.
public class ShuffleSecretManager implements SecretKeyHolder {
public void registerApp(String appId, String shuffleSecret);
public void unregisterApp(String appId);
public String getSecretKey(String appId);
}public interface BlockFetchingListener extends EventListener {
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
void onBlockFetchFailure(String blockId, Throwable exception);
}public static class AppExecId {
public final String appId;
public final String execId;
public AppExecId(String appId, String execId);
public boolean equals(Object o);
public int hashCode();
public String toString();
}