The shuffle client components provide the primary interface for fetching shuffle blocks from external shuffle services. This enables fault-tolerant shuffle data access by persisting shuffle data outside of executor processes.
public abstract class ShuffleClient implements Closeable {
public void init(String appId);
public abstract void fetchBlocks(
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener
);
public void close() throws IOException;
}Abstract base class for shuffle clients. Must be initialized with an application ID before use.
Parameters:
appId (String): Spark application identifier used for authentication and trackinghost (String): Hostname of the shuffle serviceport (int): Port number of the shuffle serviceexecId (String): Executor ID that originally wrote the shuffle blocksblockIds (String[]): Array of block identifiers to fetchlistener (BlockFetchingListener): Callback interface for handling fetch resultspublic class ExternalShuffleClient extends ShuffleClient {
public ExternalShuffleClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean saslEnabled,
boolean saslEncryptionEnabled
);
public void registerWithShuffleServer(
String host,
int port,
String execId,
ExecutorShuffleInfo executorInfo
) throws IOException;
public void fetchBlocks(
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener
);
}Main implementation of the shuffle client for external shuffle services. Supports SASL authentication and automatic retry logic.
Constructor Parameters:
conf (TransportConf): Network transport configurationsecretKeyHolder (SecretKeyHolder): Interface for SASL secret management, can be null if SASL disabledsaslEnabled (boolean): Whether to enable SASL authenticationsaslEncryptionEnabled (boolean): Whether to enable SASL encryption (requires SASL to be enabled)Key Methods:
Registers an executor with the external shuffle server, providing information about where shuffle files are stored.
Parameters:
host (String): Shuffle server hostnameport (int): Shuffle server portexecId (String): Executor identifierexecutorInfo (ExecutorShuffleInfo): Configuration describing shuffle file locationsThrows:
IOException: If registration fails due to network or server errorsAsynchronously fetches shuffle blocks from the external service with automatic retry support.
Parameters:
host (String): Shuffle server hostnameport (int): Shuffle server portexecId (String): Executor that wrote the blocksblockIds (String[]): Block identifiers to fetchlistener (BlockFetchingListener): Callback for success/failure notificationspublic class MesosExternalShuffleClient extends ExternalShuffleClient {
public MesosExternalShuffleClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean saslEnabled,
boolean saslEncryptionEnabled
);
public void registerDriverWithShuffleService(String host, int port) throws IOException;
}Specialized client for Mesos deployments that adds driver registration functionality for cleanup purposes.
Registers the Spark driver with the external shuffle service for proper cleanup of shuffle files when the application completes.
Parameters:
host (String): Shuffle service hostnameport (int): Shuffle service portThrows:
IOException: If registration fails// Create configuration
TransportConf conf = new TransportConf("shuffle", new SparkConf());
// Create client without SASL
ExternalShuffleClient client = new ExternalShuffleClient(
conf, null, false, false
);
// Initialize with app ID
client.init("spark-app-123");// Create secret manager
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp("spark-app-123", "my-secret-key");
// Create secure client
ExternalShuffleClient client = new ExternalShuffleClient(
conf,
secretManager, // Secret key holder
true, // Enable SASL
true // Enable SASL encryption
);
client.init("spark-app-123");// Define executor shuffle configuration
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
new String[]{"/tmp/spark-shuffle", "/tmp/spark-shuffle2"}, // Local directories
64, // Number of subdirectories per local directory
"org.apache.spark.shuffle.sort.SortShuffleManager" // Shuffle manager class
);
// Register with shuffle server
client.registerWithShuffleServer("shuffle-server", 7337, "executor-1", executorInfo);// Implement fetch callback
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Fetched block " + blockId + ", size: " + data.size());
// Process block data
try {
byte[] blockData = new byte[(int) data.size()];
data.nioByteBuffer().get(blockData);
// Handle the shuffle block data...
} finally {
data.release(); // Important: release buffer when done
}
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch " + blockId + ": " + exception.getMessage());
// Handle failure, perhaps retry or skip
}
};
// Fetch multiple blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
client.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);// Create Mesos client
MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(
conf, secretManager, true, false
);
mesosClient.init("spark-app-123");
// Register driver for cleanup
mesosClient.registerDriverWithShuffleService("shuffle-server", 7337);
// Use normally for block fetching
mesosClient.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);The shuffle client implements automatic retry logic for transient network failures. Configure retry behavior through TransportConf:
// Set maximum retry attempts
conf.set("spark.shuffle.io.maxRetries", "3");
// Set retry wait time
conf.set("spark.shuffle.io.retryWait", "5s");Common exceptions:
IOException: Network connectivity issues, server unavailableIllegalArgumentException: Invalid parameters or configurationSecurityException: SASL authentication failuresAlways close the shuffle client to free network resources:
try {
// Use client...
} finally {
client.close();
}