External shuffle service client for Apache Spark that enables reading shuffle blocks from external servers instead of executors
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-shuffle_2-10@1.6.0Apache Spark Network Shuffle provides external shuffle service functionality that enables reading shuffle blocks from external servers instead of directly from executors. This improves fault tolerance by allowing shuffle data to persist even when executors are lost, making Spark applications more reliable in distributed computing environments.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.10</artifactId>
<version>1.6.3</version>
</dependency>import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;For server-side components:
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;
// Create simple configuration provider with defaults
ConfigProvider configProvider = new ConfigProvider() {
@Override
public String get(String name) {
// Return default values for shuffle configuration
if (name.equals("spark.shuffle.io.maxRetries")) return "3";
if (name.equals("spark.shuffle.io.retryWait")) return "5s";
throw new java.util.NoSuchElementException(name);
}
};
// Create transport configuration
TransportConf conf = new TransportConf("shuffle", configProvider);
// Create client with SASL disabled for simplicity
ExternalShuffleClient client = new ExternalShuffleClient(
conf,
null, // secretKeyHolder - null for no SASL
false, // saslEnabled
false // saslEncryptionEnabled
);
// Initialize client
client.init("my-spark-app");
// Register executor with shuffle server
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
new String[]{"/tmp/spark-shuffle"}, // local directories
64, // subdirs per local dir
"org.apache.spark.shuffle.sort.SortShuffleManager"
);
client.registerWithShuffleServer("localhost", 7337, "executor-1", executorInfo);
// Implement callback for block fetching
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Successfully fetched block: " + blockId);
// Process the data...
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch block " + blockId + ": " + exception.getMessage());
}
};
// Fetch shuffle blocks
String[] blockIds = {"shuffle_1_2_0", "shuffle_1_2_1"};
client.fetchBlocks("localhost", 7337, "executor-1", blockIds, listener);
// Close client when done
client.close();The Apache Spark Network Shuffle library is built around several key components:
ExternalShuffleClient and MesosExternalShuffleClient provide the primary interface for fetching shuffle data from external servicesExternalShuffleBlockHandler and ExternalShuffleBlockResolver implement the server-side logic for serving shuffle blocksBlockTransferMessage subclasses) handle communication between clients and serversShuffleSecretManager provides SASL-based authentication for secure shuffle data accessRetryingBlockFetcher and OneForOneBlockFetcher implement fault-tolerant block fetching with configurable retry logicPrimary client interface for fetching shuffle blocks from external shuffle services. Supports both basic and Mesos-specific deployments with configurable SASL authentication.
public abstract class ShuffleClient implements Closeable {
public void init(String appId);
public abstract void fetchBlocks(
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener
);
}
public class ExternalShuffleClient extends ShuffleClient {
public ExternalShuffleClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean saslEnabled,
boolean saslEncryptionEnabled
);
public void registerWithShuffleServer(
String host,
int port,
String execId,
ExecutorShuffleInfo executorInfo
) throws IOException;
}Server-side components that handle shuffle block requests, manage executor registrations, and resolve block locations on the filesystem.
public class ExternalShuffleBlockHandler extends RpcHandler {
public ExternalShuffleBlockHandler(
TransportConf conf,
File registeredExecutorFile
) throws IOException;
public void receive(
TransportClient client,
ByteBuffer message,
RpcResponseCallback callback
);
}
public class ExternalShuffleBlockResolver {
public ExternalShuffleBlockResolver(
TransportConf conf,
File registeredExecutorFile
) throws IOException;
public void registerExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo
);
public ManagedBuffer getBlockData(
String appId,
String execId,
String blockId
) throws IOException;
}Protocol message classes for communication between shuffle clients and servers, including executor registration, block requests, and data transfer.
public abstract class BlockTransferMessage implements Encodable {
public ByteBuffer toByteBuffer();
public static class Decoder {
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
}
}
public class ExecutorShuffleInfo implements Encodable {
public ExecutorShuffleInfo(
String[] localDirs,
int subDirsPerLocalDir,
String shuffleManager
);
}SASL-based security mechanisms for authenticating shuffle clients with external shuffle services, including secret management and secure communication.
public class ShuffleSecretManager implements SecretKeyHolder {
public ShuffleSecretManager();
public void registerApp(String appId, String shuffleSecret);
public void unregisterApp(String appId);
public String getSecretKey(String appId);
}public interface BlockFetchingListener extends EventListener {
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
void onBlockFetchFailure(String blockId, Throwable exception);
}
public enum BlockTransferMessage.Type {
OPEN_BLOCKS(0),
UPLOAD_BLOCK(1),
REGISTER_EXECUTOR(2),
STREAM_HANDLE(3),
REGISTER_DRIVER(4);
}