or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdprotocol-messages.mdsecurity.mdserver-components.mdshuffle-client.md
tile.json

shuffle-client.mddocs/

Shuffle Client Operations

The shuffle client components provide the primary interface for fetching shuffle blocks from external shuffle services. This enables fault-tolerant shuffle data access by persisting shuffle data outside of executor processes.

Core Client Classes

ShuffleClient

public abstract class ShuffleClient implements Closeable {
    public void init(String appId);
    
    public abstract void fetchBlocks(
        String host,
        int port, 
        String execId,
        String[] blockIds,
        BlockFetchingListener listener
    );
    
    public void close() throws IOException;
}

Abstract base class for shuffle clients. Must be initialized with an application ID before use.

Parameters:

  • appId (String): Spark application identifier used for authentication and tracking
  • host (String): Hostname of the shuffle service
  • port (int): Port number of the shuffle service
  • execId (String): Executor ID that originally wrote the shuffle blocks
  • blockIds (String[]): Array of block identifiers to fetch
  • listener (BlockFetchingListener): Callback interface for handling fetch results

ExternalShuffleClient

public class ExternalShuffleClient extends ShuffleClient {
    public ExternalShuffleClient(
        TransportConf conf,
        SecretKeyHolder secretKeyHolder,
        boolean saslEnabled,
        boolean saslEncryptionEnabled
    );
    
    public void registerWithShuffleServer(
        String host,
        int port,
        String execId, 
        ExecutorShuffleInfo executorInfo
    ) throws IOException;
    
    public void fetchBlocks(
        String host,
        int port,
        String execId,
        String[] blockIds,
        BlockFetchingListener listener
    );
}

Main implementation of the shuffle client for external shuffle services. Supports SASL authentication and automatic retry logic.

Constructor Parameters:

  • conf (TransportConf): Network transport configuration
  • secretKeyHolder (SecretKeyHolder): Interface for SASL secret management, can be null if SASL disabled
  • saslEnabled (boolean): Whether to enable SASL authentication
  • saslEncryptionEnabled (boolean): Whether to enable SASL encryption (requires SASL to be enabled)

Key Methods:

registerWithShuffleServer

Registers an executor with the external shuffle server, providing information about where shuffle files are stored.

Parameters:

  • host (String): Shuffle server hostname
  • port (int): Shuffle server port
  • execId (String): Executor identifier
  • executorInfo (ExecutorShuffleInfo): Configuration describing shuffle file locations

Throws:

  • IOException: If registration fails due to network or server errors

fetchBlocks

Asynchronously fetches shuffle blocks from the external service with automatic retry support.

Parameters:

  • host (String): Shuffle server hostname
  • port (int): Shuffle server port
  • execId (String): Executor that wrote the blocks
  • blockIds (String[]): Block identifiers to fetch
  • listener (BlockFetchingListener): Callback for success/failure notifications

Mesos Integration

MesosExternalShuffleClient

public class MesosExternalShuffleClient extends ExternalShuffleClient {
    public MesosExternalShuffleClient(
        TransportConf conf,
        SecretKeyHolder secretKeyHolder,
        boolean saslEnabled,
        boolean saslEncryptionEnabled
    );
    
    public void registerDriverWithShuffleService(String host, int port) throws IOException;
}

Specialized client for Mesos deployments that adds driver registration functionality for cleanup purposes.

registerDriverWithShuffleService

Registers the Spark driver with the external shuffle service for proper cleanup of shuffle files when the application completes.

Parameters:

  • host (String): Shuffle service hostname
  • port (int): Shuffle service port

Throws:

  • IOException: If registration fails

Usage Examples

Basic Client Setup

// Create configuration
TransportConf conf = new TransportConf("shuffle", new SparkConf());

// Create client without SASL
ExternalShuffleClient client = new ExternalShuffleClient(
    conf, null, false, false
);

// Initialize with app ID
client.init("spark-app-123");

Client with SASL Authentication

// Create secret manager
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp("spark-app-123", "my-secret-key");

// Create secure client
ExternalShuffleClient client = new ExternalShuffleClient(
    conf, 
    secretManager,  // Secret key holder
    true,          // Enable SASL
    true           // Enable SASL encryption
);

client.init("spark-app-123");

Executor Registration

// Define executor shuffle configuration
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
    new String[]{"/tmp/spark-shuffle", "/tmp/spark-shuffle2"}, // Local directories
    64,  // Number of subdirectories per local directory
    "org.apache.spark.shuffle.sort.SortShuffleManager"  // Shuffle manager class
);

// Register with shuffle server
client.registerWithShuffleServer("shuffle-server", 7337, "executor-1", executorInfo);

Block Fetching with Callback

// Implement fetch callback
BlockFetchingListener listener = new BlockFetchingListener() {
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Fetched block " + blockId + ", size: " + data.size());
        // Process block data
        try {
            byte[] blockData = new byte[(int) data.size()];
            data.nioByteBuffer().get(blockData);
            // Handle the shuffle block data...
        } finally {
            data.release(); // Important: release buffer when done
        }
    }
    
    @Override 
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Failed to fetch " + blockId + ": " + exception.getMessage());
        // Handle failure, perhaps retry or skip
    }
};

// Fetch multiple blocks
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
client.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);

Mesos-Specific Usage

// Create Mesos client
MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(
    conf, secretManager, true, false
);

mesosClient.init("spark-app-123");

// Register driver for cleanup
mesosClient.registerDriverWithShuffleService("shuffle-server", 7337);

// Use normally for block fetching
mesosClient.fetchBlocks("shuffle-server", 7337, "executor-1", blockIds, listener);

Error Handling

The shuffle client implements automatic retry logic for transient network failures. Configure retry behavior through TransportConf:

// Set maximum retry attempts
conf.set("spark.shuffle.io.maxRetries", "3");

// Set retry wait time
conf.set("spark.shuffle.io.retryWait", "5s");

Common exceptions:

  • IOException: Network connectivity issues, server unavailable
  • IllegalArgumentException: Invalid parameters or configuration
  • SecurityException: SASL authentication failures

Resource Management

Always close the shuffle client to free network resources:

try {
    // Use client...
} finally {
    client.close();
}