Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.
Base interface for reading shuffle files from executors or external services.
/**
* Abstract base class for clients that read shuffle files from executors or external services
*/
public abstract class ShuffleClient implements Closeable {
/**
* Initialize the client for the given application
* @param appId - Application ID to initialize client for
*/
public void init(String appId);
/**
* Fetch blocks from a remote shuffle service
* @param host - Host name of the shuffle service
* @param port - Port number of the shuffle service
* @param execId - Executor ID
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
* @param downloadFileManager - Manager for temporary download files, can be null
*/
public abstract void fetchBlocks(
String host, int port, String execId, String[] blockIds,
BlockFetchingListener listener, DownloadFileManager downloadFileManager
);
/**
* Get shuffle metrics for monitoring
* @return MetricSet containing shuffle performance metrics
*/
public MetricSet shuffleMetrics();
/**
* Close the client and clean up resources
*/
public void close();
}Client for reading shuffle blocks from external shuffle service.
/**
* Client implementation for reading shuffle blocks from external shuffle service
*/
public class ExternalShuffleClient extends ShuffleClient {
/**
* Create an external shuffle client
* @param conf - Transport configuration
* @param secretKeyHolder - Secret key holder for authentication
* @param authEnabled - Whether authentication is enabled
* @param registrationTimeoutMs - Timeout for registration operations in milliseconds
*/
public ExternalShuffleClient(
TransportConf conf, SecretKeyHolder secretKeyHolder,
boolean authEnabled, long registrationTimeoutMs
);
/**
* Initialize the client for the given application
* @param appId - Application ID to initialize client for
*/
@Override
public void init(String appId);
/**
* Fetch blocks from the external shuffle service
* @param host - Host name of the shuffle service
* @param port - Port number of the shuffle service
* @param execId - Executor ID
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
* @param downloadFileManager - Manager for temporary download files, can be null
*/
@Override
public void fetchBlocks(
String host, int port, String execId, String[] blockIds,
BlockFetchingListener listener, DownloadFileManager downloadFileManager
);
/**
* Get shuffle metrics for monitoring
* @return MetricSet containing shuffle performance metrics
*/
@Override
public MetricSet shuffleMetrics();
/**
* Register an executor with the external shuffle service
* @param host - Host name of the shuffle service
* @param port - Port number of the shuffle service
* @param execId - Executor ID to register
* @param executorInfo - Information about the executor's shuffle configuration
* @throws IOException if network communication fails
* @throws InterruptedException if the operation is interrupted
*/
public void registerWithShuffleServer(
String host, int port, String execId, ExecutorShuffleInfo executorInfo
) throws IOException, InterruptedException;
/**
* Close the client and clean up resources
*/
@Override
public void close();
}Usage Examples:
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;
// Create transport configuration
TransportConf conf = new TransportConf("shuffle");
// Create secret manager for authentication
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp("app1", "secretKey123");
// Create external shuffle client
ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 10000);
// Initialize for application
client.init("app1");
// Register executor with shuffle service
String[] localDirs = {"/tmp/spark-1", "/tmp/spark-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
client.registerWithShuffleServer("shuffle-node-1", 7337, "executor-1", executorInfo);
// Create block fetching listener
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Successfully fetched block: " + blockId +
", size: " + data.size() + " bytes");
// Process the block data
try {
// Convert to bytes and process
byte[] blockData = ByteStreams.toByteArray(data.createInputStream());
processBlockData(blockId, blockData);
} catch (IOException e) {
System.err.println("Error processing block data: " + e.getMessage());
} finally {
data.release(); // Important: release the buffer
}
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch block: " + blockId + ", error: " + exception.getMessage());
// Handle retry logic or error reporting
handleBlockFetchFailure(blockId, exception);
}
};
// Fetch specific blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
client.fetchBlocks("shuffle-node-1", 7337, "executor-1", blockIds, listener, null);
// Monitor shuffle metrics
MetricSet metrics = client.shuffleMetrics();
System.out.println("Shuffle metrics: " + metrics);
// Clean up when done
client.close();The ExternalShuffleClient behavior can be configured through TransportConf:
spark.shuffle.io.connectionTimeout - Connection timeout for shuffle operationsspark.shuffle.io.numConnectionsPerPeer - Number of connections per shuffle peerspark.shuffle.io.retryWait - Time to wait between retry attemptsspark.shuffle.io.maxRetries - Maximum number of retry attemptsspark.shuffle.io.preferDirectBufs - Whether to prefer direct buffers for network I/O