or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdprotocol-messages.mdsecurity.mdserver-components.mdshuffle-client.md
tile.json

server-components.mddocs/

Server-Side Block Management

The server-side components handle incoming shuffle requests, manage executor registrations, and resolve shuffle blocks on the local filesystem. These components run as part of the external shuffle service.

Block Handler

ExternalShuffleBlockHandler

public class ExternalShuffleBlockHandler extends RpcHandler {
    public ExternalShuffleBlockHandler(
        TransportConf conf, 
        File registeredExecutorFile
    ) throws IOException;
    
    public ExternalShuffleBlockHandler(
        OneForOneStreamManager streamManager,
        ExternalShuffleBlockResolver blockManager
    );
    
    public void receive(
        TransportClient client, 
        ByteBuffer message, 
        RpcResponseCallback callback
    );
    
    public StreamManager getStreamManager();
}

RPC handler for the external shuffle service that processes client requests for shuffle blocks.

Constructor Parameters:

  • conf (TransportConf): Network transport configuration
  • registeredExecutorFile (File): File used for persistent executor registration storage
  • streamManager (OneForOneStreamManager): Stream manager for block transfers (testing constructor)
  • blockManager (ExternalShuffleBlockResolver): Block resolver instance (testing constructor)

Key Methods:

receive

Processes incoming RPC messages from shuffle clients. Handles executor registration, block open requests, and other protocol messages.

Parameters:

  • client (TransportClient): Client connection that sent the message
  • message (ByteBuffer): Serialized protocol message
  • callback (RpcResponseCallback): Callback for sending response

getStreamManager

Returns the stream manager used for managing block data streams.

Returns:

  • StreamManager: The stream manager instance

Block Resolver

ExternalShuffleBlockResolver

public class ExternalShuffleBlockResolver {
    public ExternalShuffleBlockResolver(
        TransportConf conf, 
        File registeredExecutorFile
    ) throws IOException;
    
    public void registerExecutor(
        String appId, 
        String execId, 
        ExecutorShuffleInfo executorInfo
    );
    
    public void applicationRemoved(String appId, boolean cleanupLocalDirs);
    
    public ManagedBuffer getBlockData(
        String appId, 
        String execId, 
        String blockId
    ) throws IOException;
    
    public void close();
}

Manages the mapping between shuffle block IDs and physical file segments on the local filesystem. Handles executor registration and block location resolution.

Constructor Parameters:

  • conf (TransportConf): Transport configuration for the resolver
  • registeredExecutorFile (File): File for persisting executor registrations across restarts

Throws:

  • IOException: If unable to initialize persistent storage

Key Methods:

registerExecutor

Registers an executor's shuffle configuration, storing information about where it writes shuffle files.

Parameters:

  • appId (String): Spark application ID
  • execId (String): Executor ID
  • executorInfo (ExecutorShuffleInfo): Configuration describing shuffle file locations

applicationRemoved

Cleans up data for a removed Spark application, optionally removing local shuffle directories.

Parameters:

  • appId (String): Application ID to clean up
  • cleanupLocalDirs (boolean): Whether to delete local shuffle directories

getBlockData

Retrieves shuffle block data from the local filesystem.

Parameters:

  • appId (String): Application ID that owns the block
  • execId (String): Executor ID that wrote the block
  • blockId (String): Block identifier to retrieve

Returns:

  • ManagedBuffer: Buffer containing the block data

Throws:

  • IOException: If block cannot be found or read

close

Closes the resolver and releases resources including persistent storage connections.

Internal Components

AppExecId

public static class AppExecId {
    public final String appId;
    public final String execId;
    
    public AppExecId(String appId, String execId);
    
    public boolean equals(Object other);
    public int hashCode();
    public String toString();
}

Internal identifier class combining application and executor IDs for tracking registered executors.

Usage Examples

Basic Server Setup

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.util.TransportConf;
import java.io.File;

// Create transport configuration
TransportConf conf = new TransportConf("shuffle", new SparkConf());

// Create block handler with persistent storage
File registrationFile = new File("/tmp/spark-shuffle-registrations");
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(
    conf, 
    registrationFile
);

// The handler can now be used with a TransportServer

Custom Server Setup

import org.apache.spark.network.server.OneForOneStreamManager;

// Create components separately for testing or custom configuration
OneForOneStreamManager streamManager = new OneForOneStreamManager();
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(
    conf, 
    new File("/tmp/registrations")
);

ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(
    streamManager, 
    resolver
);

Manual Block Resolution

// Register an executor manually
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
    new String[]{"/tmp/spark-local-dir1", "/tmp/spark-local-dir2"},
    64,  // subdirs per local dir
    "org.apache.spark.shuffle.sort.SortShuffleManager"
);

resolver.registerExecutor("app-123", "executor-1", executorInfo);

// Later, retrieve a block
try {
    ManagedBuffer blockData = resolver.getBlockData(
        "app-123", 
        "executor-1", 
        "shuffle_1_2_0"
    );
    
    System.out.println("Block data size: " + blockData.size());
    
    // Process block data
    byte[] data = new byte[(int) blockData.size()];
    blockData.nioByteBuffer().get(data);
    
    // Release buffer when done
    blockData.release();
    
} catch (IOException e) {
    System.err.println("Failed to retrieve block: " + e.getMessage());
}

Application Cleanup

// Clean up after application completion
resolver.applicationRemoved("app-123", true); // true = cleanup local directories

// Or clean up without removing directories (for debugging)
resolver.applicationRemoved("app-123", false);

Integration with Transport Server

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.TransportServer;

// Create transport context with the block handler
TransportContext context = new TransportContext(conf, handler);

// Create and start server
TransportServer server = context.createServer(7337, Collections.emptyList());
System.out.println("Shuffle service started on port 7337");

// Server will now handle incoming shuffle client requests
// Remember to close when done:
// server.close();
// resolver.close();

Persistence and Recovery

The block resolver uses LevelDB for persistent storage of executor registrations. This enables recovery of executor metadata across service restarts.

Persistent Storage:

  • Executor registrations survive service restarts
  • Block locations are reconstructed from stored metadata
  • Cleanup operations are reflected in persistent state

Recovery Behavior:

  • On startup, previously registered executors are restored
  • Block requests can be served immediately after restart
  • No need to re-register executors unless shuffle files have moved

Error Handling

Common error scenarios:

  • Block Not Found: Requested block doesn't exist on filesystem
  • Executor Not Registered: Attempt to fetch blocks from unregistered executor
  • IO Errors: Filesystem permission issues or disk failures
  • Corruption: Persistent storage corruption requiring reconstruction
try {
    ManagedBuffer block = resolver.getBlockData("app-1", "exec-1", "shuffle_1_0_0");
    // Process block...
} catch (IOException e) {
    if (e.getMessage().contains("not found")) {
        // Handle missing block
        System.err.println("Block not found, may have been cleaned up");
    } else {
        // Handle other IO errors
        System.err.println("IO error reading block: " + e.getMessage());
    }
}