Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.
Event listener for block fetching operations with success/failure callbacks.
/**
* Event listener for block fetching operations
*/
public interface BlockFetchingListener extends EventListener {
/**
* Called when a block is successfully fetched
* @param blockId - ID of the successfully fetched block
* @param data - ManagedBuffer containing the block data
*/
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
/**
* Called when a block fetch fails
* @param blockId - ID of the block that failed to fetch
* @param exception - Exception that caused the failure
*/
void onBlockFetchFailure(String blockId, Throwable exception);
}Block fetcher that interprets each chunk as a whole block.
/**
* Block fetcher that interprets each chunk as a whole block
*/
public class OneForOneBlockFetcher {
/**
* Create a one-for-one block fetcher
* @param client - Transport client for network communication
* @param appId - Application ID
* @param execId - Executor ID
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
* @param transportConf - Transport configuration
*/
public OneForOneBlockFetcher(
TransportClient client, String appId, String execId, String[] blockIds,
BlockFetchingListener listener, TransportConf transportConf
);
/**
* Create a one-for-one block fetcher with download file manager
* @param client - Transport client for network communication
* @param appId - Application ID
* @param execId - Executor ID
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
* @param transportConf - Transport configuration
* @param downloadFileManager - Manager for temporary download files
*/
public OneForOneBlockFetcher(
TransportClient client, String appId, String execId, String[] blockIds,
BlockFetchingListener listener, TransportConf transportConf,
DownloadFileManager downloadFileManager
);
/**
* Start the block fetching process
*/
public void start();
}Wraps BlockFetcher with automatic retry capability for IO failures.
/**
* Wraps block fetcher with automatic retry capability for IO failures
*/
public class RetryingBlockFetcher {
/**
* Create a retrying block fetcher
* @param conf - Transport configuration containing retry parameters
* @param fetchStarter - Strategy for creating and starting block fetchers
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
*/
public RetryingBlockFetcher(
TransportConf conf, BlockFetchStarter fetchStarter,
String[] blockIds, BlockFetchingListener listener
);
/**
* Start the block fetching process with retry logic
*/
public void start();
/**
* Strategy interface for creating and starting block fetchers
*/
public interface BlockFetchStarter {
/**
* Create and start a block fetcher for the given blocks
* @param blockIds - Array of block IDs to fetch
* @param listener - Listener for block fetch events
*/
void createAndStart(String[] blockIds, BlockFetchingListener listener);
}
}Usage Examples:
import org.apache.spark.network.shuffle.*;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.util.TransportConf;
// Example 1: Basic block fetching with listener
BlockFetchingListener basicListener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Successfully fetched block: " + blockId +
", size: " + data.size() + " bytes");
try (InputStream dataStream = data.createInputStream()) {
// Process the block data
byte[] blockBytes = ByteStreams.toByteArray(dataStream);
processBlockData(blockId, blockBytes);
} catch (IOException e) {
System.err.println("Error processing block " + blockId + ": " + e.getMessage());
} finally {
// Always release the buffer to prevent memory leaks
data.release();
}
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to fetch block: " + blockId +
", error: " + exception.getMessage());
// Handle specific error types
if (exception instanceof IOException) {
System.err.println("Network or I/O error occurred");
} else if (exception instanceof SecurityException) {
System.err.println("Authentication or authorization error");
}
// Log for monitoring and debugging
logBlockFetchFailure(blockId, exception);
}
};
// Create transport client and configuration
TransportConf conf = new TransportConf("shuffle");
TransportClient client = createTransportClient("shuffle-server", 7337);
// Fetch blocks using OneForOneBlockFetcher
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
client, "app-001", "executor-1", blockIds, basicListener, conf
);
// Start the fetch operation
fetcher.start();
// Example 2: Block fetching with file downloads
SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager();
OneForOneBlockFetcher fetcherWithFiles = new OneForOneBlockFetcher(
client, "app-001", "executor-1", blockIds, basicListener, conf, fileManager
);
fetcherWithFiles.start();
// Example 3: Retrying block fetcher for reliability
BlockFetchingListener retryListener = new BlockFetchingListener() {
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger failureCount = new AtomicInteger(0);
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
int successNum = successCount.incrementAndGet();
System.out.println("Success #" + successNum + ": " + blockId +
" (" + data.size() + " bytes)");
// Process data and release buffer
processAndRelease(blockId, data);
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
int failureNum = failureCount.incrementAndGet();
System.err.println("Failure #" + failureNum + ": " + blockId +
" - " + exception.getMessage());
// Update metrics
updateFailureMetrics(blockId, exception);
}
};
RetryingBlockFetcher.BlockFetchStarter fetchStarter =
new RetryingBlockFetcher.BlockFetchStarter() {
@Override
public void createAndStart(String[] blockIds, BlockFetchingListener listener) {
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
client, "app-001", "executor-1", blockIds, listener, conf
);
fetcher.start();
}
};
// Create retrying fetcher with automatic retry logic
RetryingBlockFetcher retryingFetcher = new RetryingBlockFetcher(
conf, fetchStarter, blockIds, retryListener
);
// Start with retry capability
retryingFetcher.start();
// Example 4: Advanced listener with metrics and monitoring
public class MetricsBlockFetchingListener implements BlockFetchingListener {
private final Timer fetchTimer = new Timer();
private final Counter successCounter = new Counter();
private final Counter failureCounter = new Counter();
private final Histogram dataSizeHistogram = new Histogram();
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
successCounter.inc();
dataSizeHistogram.update(data.size());
System.out.println("Block " + blockId + " fetched successfully");
try {
// Process the block data
processBlockData(blockId, data);
} finally {
data.release();
}
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
failureCounter.inc();
// Log detailed error information
System.err.println("Block fetch failed: " + blockId);
System.err.println("Error type: " + exception.getClass().getSimpleName());
System.err.println("Error message: " + exception.getMessage());
// Determine if retry is appropriate
if (isRetryableException(exception)) {
System.out.println("Error is retryable, will attempt retry");
} else {
System.err.println("Error is not retryable, marking as permanent failure");
}
}
public void printMetrics() {
System.out.println("Fetch Metrics:");
System.out.println(" Successes: " + successCounter.getCount());
System.out.println(" Failures: " + failureCounter.getCount());
System.out.println(" Avg Data Size: " + dataSizeHistogram.getMean());
}
}
MetricsBlockFetchingListener metricsListener = new MetricsBlockFetchingListener();
// Use metricsListener with any fetcher...The RetryingBlockFetcher uses TransportConf parameters for retry behavior:
spark.shuffle.io.maxRetries - Maximum number of retry attempts (default: 3)spark.shuffle.io.retryWait - Initial wait time between retries in milliseconds (default: 5000)spark.shuffle.io.retryWaitTimeUnit - Time unit for retry wait (default: MILLISECONDS)spark.shuffle.io.backOffMultiplier - Multiplier for exponential backoff (default: 1.5)Block fetch failures can be classified into several categories:
Retryable Errors:
IOException - Network connectivity issuesTimeoutException - Request timeoutsConnectException - Connection establishment failuresNon-Retryable Errors:
SecurityException - Authentication/authorization failuresIllegalArgumentException - Invalid block IDs or parametersFileNotFoundException - Missing shuffle files (permanent)Application-Specific Errors:
Best practices for optimal block fetching performance:
Listener Implementation:
Batch Operations:
Error Handling:
Memory Management:
Key metrics to monitor for block fetching operations: