0
# Block Fetching and Retry Logic
1
2
Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.
3
4
## Capabilities
5
6
### BlockFetchingListener Interface
7
8
Event listener for block fetching operations with success/failure callbacks.
9
10
```java { .api }
11
/**
12
* Event listener for block fetching operations
13
*/
14
public interface BlockFetchingListener extends EventListener {
15
/**
16
* Called when a block is successfully fetched
17
* @param blockId - ID of the successfully fetched block
18
* @param data - ManagedBuffer containing the block data
19
*/
20
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
21
22
/**
23
* Called when a block fetch fails
24
* @param blockId - ID of the block that failed to fetch
25
* @param exception - Exception that caused the failure
26
*/
27
void onBlockFetchFailure(String blockId, Throwable exception);
28
}
29
```
30
31
### OneForOneBlockFetcher
32
33
Block fetcher that interprets each chunk as a whole block.
34
35
```java { .api }
36
/**
37
* Block fetcher that interprets each chunk as a whole block
38
*/
39
public class OneForOneBlockFetcher {
40
/**
41
* Create a one-for-one block fetcher
42
* @param client - Transport client for network communication
43
* @param appId - Application ID
44
* @param execId - Executor ID
45
* @param blockIds - Array of block IDs to fetch
46
* @param listener - Listener for block fetch events
47
* @param transportConf - Transport configuration
48
*/
49
public OneForOneBlockFetcher(
50
TransportClient client, String appId, String execId, String[] blockIds,
51
BlockFetchingListener listener, TransportConf transportConf
52
);
53
54
/**
55
* Create a one-for-one block fetcher with download file manager
56
* @param client - Transport client for network communication
57
* @param appId - Application ID
58
* @param execId - Executor ID
59
* @param blockIds - Array of block IDs to fetch
60
* @param listener - Listener for block fetch events
61
* @param transportConf - Transport configuration
62
* @param downloadFileManager - Manager for temporary download files
63
*/
64
public OneForOneBlockFetcher(
65
TransportClient client, String appId, String execId, String[] blockIds,
66
BlockFetchingListener listener, TransportConf transportConf,
67
DownloadFileManager downloadFileManager
68
);
69
70
/**
71
* Start the block fetching process
72
*/
73
public void start();
74
}
75
```
76
77
### RetryingBlockFetcher
78
79
Wraps BlockFetcher with automatic retry capability for IO failures.
80
81
```java { .api }
82
/**
83
* Wraps block fetcher with automatic retry capability for IO failures
84
*/
85
public class RetryingBlockFetcher {
86
/**
87
* Create a retrying block fetcher
88
* @param conf - Transport configuration containing retry parameters
89
* @param fetchStarter - Strategy for creating and starting block fetchers
90
* @param blockIds - Array of block IDs to fetch
91
* @param listener - Listener for block fetch events
92
*/
93
public RetryingBlockFetcher(
94
TransportConf conf, BlockFetchStarter fetchStarter,
95
String[] blockIds, BlockFetchingListener listener
96
);
97
98
/**
99
* Start the block fetching process with retry logic
100
*/
101
public void start();
102
103
/**
104
* Strategy interface for creating and starting block fetchers
105
*/
106
public interface BlockFetchStarter {
107
/**
108
* Create and start a block fetcher for the given blocks
109
* @param blockIds - Array of block IDs to fetch
110
* @param listener - Listener for block fetch events
111
*/
112
void createAndStart(String[] blockIds, BlockFetchingListener listener);
113
}
114
}
115
```
116
117
**Usage Examples:**
118
119
```java
120
import org.apache.spark.network.shuffle.*;
121
import org.apache.spark.network.client.TransportClient;
122
import org.apache.spark.network.util.TransportConf;
123
124
// Example 1: Basic block fetching with listener
125
BlockFetchingListener basicListener = new BlockFetchingListener() {
126
@Override
127
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
128
System.out.println("Successfully fetched block: " + blockId +
129
", size: " + data.size() + " bytes");
130
131
try (InputStream dataStream = data.createInputStream()) {
132
// Process the block data
133
byte[] blockBytes = ByteStreams.toByteArray(dataStream);
134
processBlockData(blockId, blockBytes);
135
} catch (IOException e) {
136
System.err.println("Error processing block " + blockId + ": " + e.getMessage());
137
} finally {
138
// Always release the buffer to prevent memory leaks
139
data.release();
140
}
141
}
142
143
@Override
144
public void onBlockFetchFailure(String blockId, Throwable exception) {
145
System.err.println("Failed to fetch block: " + blockId +
146
", error: " + exception.getMessage());
147
148
// Handle specific error types
149
if (exception instanceof IOException) {
150
System.err.println("Network or I/O error occurred");
151
} else if (exception instanceof SecurityException) {
152
System.err.println("Authentication or authorization error");
153
}
154
155
// Log for monitoring and debugging
156
logBlockFetchFailure(blockId, exception);
157
}
158
};
159
160
// Create transport client and configuration
161
TransportConf conf = new TransportConf("shuffle");
162
TransportClient client = createTransportClient("shuffle-server", 7337);
163
164
// Fetch blocks using OneForOneBlockFetcher
165
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};
166
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
167
client, "app-001", "executor-1", blockIds, basicListener, conf
168
);
169
170
// Start the fetch operation
171
fetcher.start();
172
173
// Example 2: Block fetching with file downloads
174
SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager();
175
176
OneForOneBlockFetcher fetcherWithFiles = new OneForOneBlockFetcher(
177
client, "app-001", "executor-1", blockIds, basicListener, conf, fileManager
178
);
179
180
fetcherWithFiles.start();
181
182
// Example 3: Retrying block fetcher for reliability
183
BlockFetchingListener retryListener = new BlockFetchingListener() {
184
private final AtomicInteger successCount = new AtomicInteger(0);
185
private final AtomicInteger failureCount = new AtomicInteger(0);
186
187
@Override
188
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
189
int successNum = successCount.incrementAndGet();
190
System.out.println("Success #" + successNum + ": " + blockId +
191
" (" + data.size() + " bytes)");
192
193
// Process data and release buffer
194
processAndRelease(blockId, data);
195
}
196
197
@Override
198
public void onBlockFetchFailure(String blockId, Throwable exception) {
199
int failureNum = failureCount.incrementAndGet();
200
System.err.println("Failure #" + failureNum + ": " + blockId +
201
" - " + exception.getMessage());
202
203
// Update metrics
204
updateFailureMetrics(blockId, exception);
205
}
206
};
207
208
RetryingBlockFetcher.BlockFetchStarter fetchStarter =
209
new RetryingBlockFetcher.BlockFetchStarter() {
210
@Override
211
public void createAndStart(String[] blockIds, BlockFetchingListener listener) {
212
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
213
client, "app-001", "executor-1", blockIds, listener, conf
214
);
215
fetcher.start();
216
}
217
};
218
219
// Create retrying fetcher with automatic retry logic
220
RetryingBlockFetcher retryingFetcher = new RetryingBlockFetcher(
221
conf, fetchStarter, blockIds, retryListener
222
);
223
224
// Start with retry capability
225
retryingFetcher.start();
226
227
// Example 4: Advanced listener with metrics and monitoring
228
public class MetricsBlockFetchingListener implements BlockFetchingListener {
229
private final Timer fetchTimer = new Timer();
230
private final Counter successCounter = new Counter();
231
private final Counter failureCounter = new Counter();
232
private final Histogram dataSizeHistogram = new Histogram();
233
234
@Override
235
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
236
successCounter.inc();
237
dataSizeHistogram.update(data.size());
238
239
System.out.println("Block " + blockId + " fetched successfully");
240
241
try {
242
// Process the block data
243
processBlockData(blockId, data);
244
} finally {
245
data.release();
246
}
247
}
248
249
@Override
250
public void onBlockFetchFailure(String blockId, Throwable exception) {
251
failureCounter.inc();
252
253
// Log detailed error information
254
System.err.println("Block fetch failed: " + blockId);
255
System.err.println("Error type: " + exception.getClass().getSimpleName());
256
System.err.println("Error message: " + exception.getMessage());
257
258
// Determine if retry is appropriate
259
if (isRetryableException(exception)) {
260
System.out.println("Error is retryable, will attempt retry");
261
} else {
262
System.err.println("Error is not retryable, marking as permanent failure");
263
}
264
}
265
266
public void printMetrics() {
267
System.out.println("Fetch Metrics:");
268
System.out.println(" Successes: " + successCounter.getCount());
269
System.out.println(" Failures: " + failureCounter.getCount());
270
System.out.println(" Avg Data Size: " + dataSizeHistogram.getMean());
271
}
272
}
273
274
MetricsBlockFetchingListener metricsListener = new MetricsBlockFetchingListener();
275
// Use metricsListener with any fetcher...
276
```
277
278
### Retry Configuration
279
280
The RetryingBlockFetcher uses TransportConf parameters for retry behavior:
281
282
- `spark.shuffle.io.maxRetries` - Maximum number of retry attempts (default: 3)
283
- `spark.shuffle.io.retryWait` - Initial wait time between retries in milliseconds (default: 5000)
284
- `spark.shuffle.io.retryWaitTimeUnit` - Time unit for retry wait (default: MILLISECONDS)
285
- `spark.shuffle.io.backOffMultiplier` - Multiplier for exponential backoff (default: 1.5)
286
287
### Error Classification
288
289
Block fetch failures can be classified into several categories:
290
291
1. **Retryable Errors**:
292
- `IOException` - Network connectivity issues
293
- `TimeoutException` - Request timeouts
294
- `ConnectException` - Connection establishment failures
295
296
2. **Non-Retryable Errors**:
297
- `SecurityException` - Authentication/authorization failures
298
- `IllegalArgumentException` - Invalid block IDs or parameters
299
- `FileNotFoundException` - Missing shuffle files (permanent)
300
301
3. **Application-Specific Errors**:
302
- Custom exceptions from shuffle service implementation
303
- Data corruption errors
304
- Storage subsystem failures
305
306
### Performance Optimization
307
308
Best practices for optimal block fetching performance:
309
310
1. **Listener Implementation**:
311
- Keep success/failure handlers lightweight
312
- Process data asynchronously when possible
313
- Always release ManagedBuffer instances
314
315
2. **Batch Operations**:
316
- Fetch multiple blocks in single requests
317
- Use appropriate batch sizes based on network capacity
318
- Balance between throughput and memory usage
319
320
3. **Error Handling**:
321
- Implement exponential backoff for retries
322
- Use circuit breaker patterns for failing services
323
- Monitor and alert on high failure rates
324
325
4. **Memory Management**:
326
- Release buffers promptly after processing
327
- Monitor memory usage during large transfers
328
- Consider streaming processing for large blocks
329
330
### Monitoring and Debugging
331
332
Key metrics to monitor for block fetching operations:
333
334
- **Success/Failure Rates**: Track fetch success percentage
335
- **Latency Metrics**: Monitor fetch operation timing
336
- **Data Volume**: Track bytes transferred and rates
337
- **Retry Patterns**: Monitor retry frequency and success rates
338
- **Error Distribution**: Analyze failure types and causes