or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md
tile.json

shuffle-client.mddocs/

Shuffle Client Operations

Core client functionality for fetching shuffle blocks from external shuffle services, including registration, block fetching, and connection management.

Capabilities

ShuffleClient Abstract Base Class

Base interface for reading shuffle files from executors or external services.

/**
 * Abstract base class for clients that read shuffle files from executors or external services
 */
public abstract class ShuffleClient implements Closeable {
    /**
     * Initialize the client for the given application
     * @param appId - Application ID to initialize client for
     */
    public void init(String appId);
    
    /**
     * Fetch blocks from a remote shuffle service
     * @param host - Host name of the shuffle service
     * @param port - Port number of the shuffle service
     * @param execId - Executor ID
     * @param blockIds - Array of block IDs to fetch
     * @param listener - Listener for block fetch events
     * @param downloadFileManager - Manager for temporary download files, can be null
     */
    public abstract void fetchBlocks(
        String host, int port, String execId, String[] blockIds,
        BlockFetchingListener listener, DownloadFileManager downloadFileManager
    );
    
    /**
     * Get shuffle metrics for monitoring
     * @return MetricSet containing shuffle performance metrics
     */
    public MetricSet shuffleMetrics();
    
    /**
     * Close the client and clean up resources
     */
    public void close();
}

ExternalShuffleClient Implementation

Client for reading shuffle blocks from external shuffle service.

/**
 * Client implementation for reading shuffle blocks from external shuffle service
 */
public class ExternalShuffleClient extends ShuffleClient {
    /**
     * Create an external shuffle client
     * @param conf - Transport configuration
     * @param secretKeyHolder - Secret key holder for authentication
     * @param authEnabled - Whether authentication is enabled
     * @param registrationTimeoutMs - Timeout for registration operations in milliseconds
     */
    public ExternalShuffleClient(
        TransportConf conf, SecretKeyHolder secretKeyHolder,
        boolean authEnabled, long registrationTimeoutMs
    );
    
    /**
     * Initialize the client for the given application
     * @param appId - Application ID to initialize client for
     */
    @Override
    public void init(String appId);
    
    /**
     * Fetch blocks from the external shuffle service
     * @param host - Host name of the shuffle service
     * @param port - Port number of the shuffle service
     * @param execId - Executor ID
     * @param blockIds - Array of block IDs to fetch
     * @param listener - Listener for block fetch events
     * @param downloadFileManager - Manager for temporary download files, can be null
     */
    @Override
    public void fetchBlocks(
        String host, int port, String execId, String[] blockIds,
        BlockFetchingListener listener, DownloadFileManager downloadFileManager
    );
    
    /**
     * Get shuffle metrics for monitoring
     * @return MetricSet containing shuffle performance metrics
     */
    @Override
    public MetricSet shuffleMetrics();
    
    /**
     * Register an executor with the external shuffle service
     * @param host - Host name of the shuffle service
     * @param port - Port number of the shuffle service
     * @param execId - Executor ID to register
     * @param executorInfo - Information about the executor's shuffle configuration
     * @throws IOException if network communication fails
     * @throws InterruptedException if the operation is interrupted
     */
    public void registerWithShuffleServer(
        String host, int port, String execId, ExecutorShuffleInfo executorInfo
    ) throws IOException, InterruptedException;
    
    /**
     * Close the client and clean up resources
     */
    @Override
    public void close();
}

Usage Examples:

import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.util.TransportConf;

// Create transport configuration
TransportConf conf = new TransportConf("shuffle");

// Create secret manager for authentication
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp("app1", "secretKey123");

// Create external shuffle client
ExternalShuffleClient client = new ExternalShuffleClient(conf, secretManager, true, 10000);

// Initialize for application
client.init("app1");

// Register executor with shuffle service
String[] localDirs = {"/tmp/spark-1", "/tmp/spark-2"};
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
client.registerWithShuffleServer("shuffle-node-1", 7337, "executor-1", executorInfo);

// Create block fetching listener
BlockFetchingListener listener = new BlockFetchingListener() {
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Successfully fetched block: " + blockId + 
                          ", size: " + data.size() + " bytes");
        // Process the block data
        try {
            // Convert to bytes and process
            byte[] blockData = ByteStreams.toByteArray(data.createInputStream());
            processBlockData(blockId, blockData);
        } catch (IOException e) {
            System.err.println("Error processing block data: " + e.getMessage());
        } finally {
            data.release(); // Important: release the buffer
        }
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Failed to fetch block: " + blockId + ", error: " + exception.getMessage());
        // Handle retry logic or error reporting
        handleBlockFetchFailure(blockId, exception);
    }
};

// Fetch specific blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
client.fetchBlocks("shuffle-node-1", 7337, "executor-1", blockIds, listener, null);

// Monitor shuffle metrics
MetricSet metrics = client.shuffleMetrics();
System.out.println("Shuffle metrics: " + metrics);

// Clean up when done
client.close();

Block Fetching Best Practices

  1. Listener Implementation: Always implement both success and failure callbacks in BlockFetchingListener
  2. Buffer Management: Release ManagedBuffer instances after processing to avoid memory leaks
  3. Error Handling: Implement proper retry logic for transient failures
  4. Resource Cleanup: Always call close() on the client when finished
  5. Authentication: Use ShuffleSecretManager for secure deployments
  6. Configuration: Tune TransportConf parameters for optimal performance

Common Configuration Parameters

The ExternalShuffleClient behavior can be configured through TransportConf:

  • spark.shuffle.io.connectionTimeout - Connection timeout for shuffle operations
  • spark.shuffle.io.numConnectionsPerPeer - Number of connections per shuffle peer
  • spark.shuffle.io.retryWait - Time to wait between retry attempts
  • spark.shuffle.io.maxRetries - Maximum number of retry attempts
  • spark.shuffle.io.preferDirectBufs - Whether to prefer direct buffers for network I/O