or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md
tile.json

block-fetching.mddocs/

Block Fetching and Retry Logic

Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.

Capabilities

BlockFetchingListener Interface

Event listener for block fetching operations with success/failure callbacks.

/**
 * Event listener for block fetching operations
 */
public interface BlockFetchingListener extends EventListener {
    /**
     * Called when a block is successfully fetched
     * @param blockId - ID of the successfully fetched block
     * @param data - ManagedBuffer containing the block data
     */
    void onBlockFetchSuccess(String blockId, ManagedBuffer data);
    
    /**
     * Called when a block fetch fails
     * @param blockId - ID of the block that failed to fetch
     * @param exception - Exception that caused the failure
     */
    void onBlockFetchFailure(String blockId, Throwable exception);
}

OneForOneBlockFetcher

Block fetcher that interprets each chunk as a whole block.

/**
 * Block fetcher that interprets each chunk as a whole block
 */
public class OneForOneBlockFetcher {
    /**
     * Create a one-for-one block fetcher
     * @param client - Transport client for network communication
     * @param appId - Application ID
     * @param execId - Executor ID
     * @param blockIds - Array of block IDs to fetch
     * @param listener - Listener for block fetch events
     * @param transportConf - Transport configuration
     */
    public OneForOneBlockFetcher(
        TransportClient client, String appId, String execId, String[] blockIds,
        BlockFetchingListener listener, TransportConf transportConf
    );
    
    /**
     * Create a one-for-one block fetcher with download file manager
     * @param client - Transport client for network communication
     * @param appId - Application ID
     * @param execId - Executor ID
     * @param blockIds - Array of block IDs to fetch
     * @param listener - Listener for block fetch events
     * @param transportConf - Transport configuration
     * @param downloadFileManager - Manager for temporary download files
     */
    public OneForOneBlockFetcher(
        TransportClient client, String appId, String execId, String[] blockIds,
        BlockFetchingListener listener, TransportConf transportConf,
        DownloadFileManager downloadFileManager
    );
    
    /**
     * Start the block fetching process
     */
    public void start();
}

RetryingBlockFetcher

Wraps BlockFetcher with automatic retry capability for IO failures.

/**
 * Wraps block fetcher with automatic retry capability for IO failures
 */
public class RetryingBlockFetcher {
    /**
     * Create a retrying block fetcher
     * @param conf - Transport configuration containing retry parameters
     * @param fetchStarter - Strategy for creating and starting block fetchers
     * @param blockIds - Array of block IDs to fetch
     * @param listener - Listener for block fetch events
     */
    public RetryingBlockFetcher(
        TransportConf conf, BlockFetchStarter fetchStarter,
        String[] blockIds, BlockFetchingListener listener
    );
    
    /**
     * Start the block fetching process with retry logic
     */
    public void start();
    
    /**
     * Strategy interface for creating and starting block fetchers
     */
    public interface BlockFetchStarter {
        /**
         * Create and start a block fetcher for the given blocks
         * @param blockIds - Array of block IDs to fetch
         * @param listener - Listener for block fetch events
         */
        void createAndStart(String[] blockIds, BlockFetchingListener listener);
    }
}

Usage Examples:

import org.apache.spark.network.shuffle.*;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.util.TransportConf;

// Example 1: Basic block fetching with listener
BlockFetchingListener basicListener = new BlockFetchingListener() {
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        System.out.println("Successfully fetched block: " + blockId + 
                          ", size: " + data.size() + " bytes");
        
        try (InputStream dataStream = data.createInputStream()) {
            // Process the block data
            byte[] blockBytes = ByteStreams.toByteArray(dataStream);
            processBlockData(blockId, blockBytes);
        } catch (IOException e) {
            System.err.println("Error processing block " + blockId + ": " + e.getMessage());
        } finally {
            // Always release the buffer to prevent memory leaks
            data.release();
        }
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        System.err.println("Failed to fetch block: " + blockId + 
                          ", error: " + exception.getMessage());
        
        // Handle specific error types
        if (exception instanceof IOException) {
            System.err.println("Network or I/O error occurred");
        } else if (exception instanceof SecurityException) {
            System.err.println("Authentication or authorization error");
        }
        
        // Log for monitoring and debugging
        logBlockFetchFailure(blockId, exception);
    }
};

// Create transport client and configuration
TransportConf conf = new TransportConf("shuffle");
TransportClient client = createTransportClient("shuffle-server", 7337);

// Fetch blocks using OneForOneBlockFetcher
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
    client, "app-001", "executor-1", blockIds, basicListener, conf
);

// Start the fetch operation
fetcher.start();

// Example 2: Block fetching with file downloads
SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager();

OneForOneBlockFetcher fetcherWithFiles = new OneForOneBlockFetcher(
    client, "app-001", "executor-1", blockIds, basicListener, conf, fileManager
);

fetcherWithFiles.start();

// Example 3: Retrying block fetcher for reliability
BlockFetchingListener retryListener = new BlockFetchingListener() {
    private final AtomicInteger successCount = new AtomicInteger(0);
    private final AtomicInteger failureCount = new AtomicInteger(0);
    
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        int successNum = successCount.incrementAndGet();
        System.out.println("Success #" + successNum + ": " + blockId + 
                          " (" + data.size() + " bytes)");
        
        // Process data and release buffer
        processAndRelease(blockId, data);
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        int failureNum = failureCount.incrementAndGet();
        System.err.println("Failure #" + failureNum + ": " + blockId + 
                          " - " + exception.getMessage());
        
        // Update metrics
        updateFailureMetrics(blockId, exception);
    }
};

RetryingBlockFetcher.BlockFetchStarter fetchStarter = 
    new RetryingBlockFetcher.BlockFetchStarter() {
        @Override
        public void createAndStart(String[] blockIds, BlockFetchingListener listener) {
            OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
                client, "app-001", "executor-1", blockIds, listener, conf
            );
            fetcher.start();
        }
    };

// Create retrying fetcher with automatic retry logic
RetryingBlockFetcher retryingFetcher = new RetryingBlockFetcher(
    conf, fetchStarter, blockIds, retryListener
);

// Start with retry capability
retryingFetcher.start();

// Example 4: Advanced listener with metrics and monitoring
public class MetricsBlockFetchingListener implements BlockFetchingListener {
    private final Timer fetchTimer = new Timer();
    private final Counter successCounter = new Counter();
    private final Counter failureCounter = new Counter();
    private final Histogram dataSizeHistogram = new Histogram();
    
    @Override
    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
        successCounter.inc();
        dataSizeHistogram.update(data.size());
        
        System.out.println("Block " + blockId + " fetched successfully");
        
        try {
            // Process the block data
            processBlockData(blockId, data);
        } finally {
            data.release();
        }
    }
    
    @Override
    public void onBlockFetchFailure(String blockId, Throwable exception) {
        failureCounter.inc();
        
        // Log detailed error information
        System.err.println("Block fetch failed: " + blockId);
        System.err.println("Error type: " + exception.getClass().getSimpleName());
        System.err.println("Error message: " + exception.getMessage());
        
        // Determine if retry is appropriate
        if (isRetryableException(exception)) {
            System.out.println("Error is retryable, will attempt retry");
        } else {
            System.err.println("Error is not retryable, marking as permanent failure");
        }
    }
    
    public void printMetrics() {
        System.out.println("Fetch Metrics:");
        System.out.println("  Successes: " + successCounter.getCount());
        System.out.println("  Failures: " + failureCounter.getCount());
        System.out.println("  Avg Data Size: " + dataSizeHistogram.getMean());
    }
}

MetricsBlockFetchingListener metricsListener = new MetricsBlockFetchingListener();
// Use metricsListener with any fetcher...

Retry Configuration

The RetryingBlockFetcher uses TransportConf parameters for retry behavior:

  • spark.shuffle.io.maxRetries - Maximum number of retry attempts (default: 3)
  • spark.shuffle.io.retryWait - Initial wait time between retries in milliseconds (default: 5000)
  • spark.shuffle.io.retryWaitTimeUnit - Time unit for retry wait (default: MILLISECONDS)
  • spark.shuffle.io.backOffMultiplier - Multiplier for exponential backoff (default: 1.5)

Error Classification

Block fetch failures can be classified into several categories:

  1. Retryable Errors:

    • IOException - Network connectivity issues
    • TimeoutException - Request timeouts
    • ConnectException - Connection establishment failures
  2. Non-Retryable Errors:

    • SecurityException - Authentication/authorization failures
    • IllegalArgumentException - Invalid block IDs or parameters
    • FileNotFoundException - Missing shuffle files (permanent)
  3. Application-Specific Errors:

    • Custom exceptions from shuffle service implementation
    • Data corruption errors
    • Storage subsystem failures

Performance Optimization

Best practices for optimal block fetching performance:

  1. Listener Implementation:

    • Keep success/failure handlers lightweight
    • Process data asynchronously when possible
    • Always release ManagedBuffer instances
  2. Batch Operations:

    • Fetch multiple blocks in single requests
    • Use appropriate batch sizes based on network capacity
    • Balance between throughput and memory usage
  3. Error Handling:

    • Implement exponential backoff for retries
    • Use circuit breaker patterns for failing services
    • Monitor and alert on high failure rates
  4. Memory Management:

    • Release buffers promptly after processing
    • Monitor memory usage during large transfers
    • Consider streaming processing for large blocks

Monitoring and Debugging

Key metrics to monitor for block fetching operations:

  • Success/Failure Rates: Track fetch success percentage
  • Latency Metrics: Monitor fetch operation timing
  • Data Volume: Track bytes transferred and rates
  • Retry Patterns: Monitor retry frequency and success rates
  • Error Distribution: Analyze failure types and causes