Spark Project Shuffle Streaming Service - provides network shuffle functionality for Apache Spark's distributed computing engine
Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.
Base interface for reading shuffle files from executors or external services.
/**
* Abstract base class for clients that read shuffle files from executors or external services
*/
public abstract class ShuffleClient implements Closeable {
/**
* Initialize the client for the given application
* @param appId - Application ID to initialize client for
*/
public void init(String appId);
/**
* Fetch blocks from a remote shuffle service
* @param host - Host name of the shuffle service
* @param port - Port number of the shuffle service
* @param execId - Executor ID
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
* @param downloadFileManager - Manager for temporary download files, can be null
*/
public abstract void fetchBlocks(
String host, int port, String execId, String[] blockIds,
BlockFetchingListener listener, DownloadFileManager downloadFileManager
);
/**
* Get shuffle metrics for monitoring
* @return MetricSet containing shuffle performance metrics
*/
public MetricSet shuffleMetrics();
/**
* Close the client and clean up resources
*/
public void close();
}Client for reading shuffle blocks from external shuffle service.
/**
* Client implementation for reading shuffle blocks from external shuffle service
*/
public class ExternalShuffleClient extends ShuffleClient {
/**
* Create an external shuffle client
* @param conf - Transport configuration
* @param secretKeyHolder - Secret key holder for authentication
* @param authEnabled - Whether authentication is enabled
* @param registrationTimeoutMs - Timeout for registration operations in milliseconds
*/
public ExternalShuffleClient(
TransportConf conf, SecretKeyHolder secretKeyHolder,
boolean authEnabled, long registrationTimeoutMs
);
/**
* Initialize the client for the given application
* @param appId - Application ID to initialize client for
*/
@Override
public void init(String appId);
/**
* Fetch blocks from the external shuffle service
* @param host - Host name of the shuffle service
* @param port - Port number of the shuffle service
* @param execId - Executor ID
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
* @param downloadFileManager - Manager for temporary download files, can be null
*/
@Override
public void fetchBlocks(
String host, int port, String execId, String[] blockIds,
BlockFetchingListener listener, DownloadFileManager downloadFileManager
);
/**
* Get shuffle metrics for monitoring
* @return MetricSet containing shuffle performance metrics
*/
@Override
public MetricSet shuffleMetrics();
/**
* Register an executor with the external shuffle service
* @param host - Host name of the shuffle service
* @param port - Port number of the shuffle service
* @param execId - Executor ID to register
* @param executorInfo - Information about the executor's shuffle configuration
* @throws IOException if network communication fails
* @throws InterruptedException if the operation is interrupted
*/
public void registerWithShuffleServer(
String host, int port, String execId, ExecutorShuffleInfo executorInfo
) throws IOException, InterruptedException;
/**
* Close the client and clean up resources
*/
@Override
public void close();
}Usage Examples:
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;
// Create transport configuration
TransportConf conf = new TransportConf("shuffle");
// Create secret manager for authentication
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp("app1", "secretKey123");
// Create external shuffle client
ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 10000);
// Initialize for application
client.init("app1");
// Register executor with shuffle service
String[] localDirs = {"/tmp/spark-1", "/tmp/spark-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
client.registerWithShuffleServer("shuffle-node-1", 7337, "executor-1", executorInfo);
// Create block fetching listener
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Successfully fetched block: " + blockId +
", size: " + data.size() + " bytes");
// Process the block data
try {
// Convert to bytes and process
byte[] blockData = ByteStreams.toByteArray(data.createInputStream());
processBlockData(blockId, blockData);
} catch (IOException e) {
System.err.println("Error processing block data: " + e.getMessage());
} finally {
data.release(); // Important: release the buffer
}
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch block: " + blockId + ", error: " + exception.getMessage());
// Handle retry logic or error reporting
handleBlockFetchFailure(blockId, exception);
}
};
// Fetch specific blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
client.fetchBlocks("shuffle-node-1", 7337, "executor-1", blockIds, listener, null);
// Monitor shuffle metrics
MetricSet metrics = client.shuffleMetrics();
System.out.println("Shuffle metrics: " + metrics);
// Clean up when done
client.close();The ExternalShuffleClient behavior can be configured through TransportConf:
spark.shuffle.io.connectionTimeout - Connection timeout for shuffle operationsspark.shuffle.io.numConnectionsPerPeer - Number of connections per shuffle peerspark.shuffle.io.retryWait - Time to wait between retry attemptsspark.shuffle.io.maxRetries - Maximum number of retry attemptsspark.shuffle.io.preferDirectBufs - Whether to prefer direct buffers for network I/OInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-shuffle-2-11