or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md

block-fetching.mddocs/

0

# Block Fetching and Retry Logic

1

2

Advanced block fetching mechanisms with automatic retry capabilities and listener-based asynchronous handling.

3

4

## Capabilities

5

6

### BlockFetchingListener Interface

7

8

Event listener for block fetching operations with success/failure callbacks.

9

10

```java { .api }

11

/**

12

* Event listener for block fetching operations

13

*/

14

public interface BlockFetchingListener extends EventListener {

15

/**

16

* Called when a block is successfully fetched

17

* @param blockId - ID of the successfully fetched block

18

* @param data - ManagedBuffer containing the block data

19

*/

20

void onBlockFetchSuccess(String blockId, ManagedBuffer data);

21

22

/**

23

* Called when a block fetch fails

24

* @param blockId - ID of the block that failed to fetch

25

* @param exception - Exception that caused the failure

26

*/

27

void onBlockFetchFailure(String blockId, Throwable exception);

28

}

29

```

30

31

### OneForOneBlockFetcher

32

33

Block fetcher that interprets each chunk as a whole block.

34

35

```java { .api }

36

/**

37

* Block fetcher that interprets each chunk as a whole block

38

*/

39

public class OneForOneBlockFetcher {

40

/**

41

* Create a one-for-one block fetcher

42

* @param client - Transport client for network communication

43

* @param appId - Application ID

44

* @param execId - Executor ID

45

* @param blockIds - Array of block IDs to fetch

46

* @param listener - Listener for block fetch events

47

* @param transportConf - Transport configuration

48

*/

49

public OneForOneBlockFetcher(

50

TransportClient client, String appId, String execId, String[] blockIds,

51

BlockFetchingListener listener, TransportConf transportConf

52

);

53

54

/**

55

* Create a one-for-one block fetcher with download file manager

56

* @param client - Transport client for network communication

57

* @param appId - Application ID

58

* @param execId - Executor ID

59

* @param blockIds - Array of block IDs to fetch

60

* @param listener - Listener for block fetch events

61

* @param transportConf - Transport configuration

62

* @param downloadFileManager - Manager for temporary download files

63

*/

64

public OneForOneBlockFetcher(

65

TransportClient client, String appId, String execId, String[] blockIds,

66

BlockFetchingListener listener, TransportConf transportConf,

67

DownloadFileManager downloadFileManager

68

);

69

70

/**

71

* Start the block fetching process

72

*/

73

public void start();

74

}

75

```

76

77

### RetryingBlockFetcher

78

79

Wraps BlockFetcher with automatic retry capability for IO failures.

80

81

```java { .api }

82

/**

83

* Wraps block fetcher with automatic retry capability for IO failures

84

*/

85

public class RetryingBlockFetcher {

86

/**

87

* Create a retrying block fetcher

88

* @param conf - Transport configuration containing retry parameters

89

* @param fetchStarter - Strategy for creating and starting block fetchers

90

* @param blockIds - Array of block IDs to fetch

91

* @param listener - Listener for block fetch events

92

*/

93

public RetryingBlockFetcher(

94

TransportConf conf, BlockFetchStarter fetchStarter,

95

String[] blockIds, BlockFetchingListener listener

96

);

97

98

/**

99

* Start the block fetching process with retry logic

100

*/

101

public void start();

102

103

/**

104

* Strategy interface for creating and starting block fetchers

105

*/

106

public interface BlockFetchStarter {

107

/**

108

* Create and start a block fetcher for the given blocks

109

* @param blockIds - Array of block IDs to fetch

110

* @param listener - Listener for block fetch events

111

*/

112

void createAndStart(String[] blockIds, BlockFetchingListener listener);

113

}

114

}

115

```

116

117

**Usage Examples:**

118

119

```java

120

import org.apache.spark.network.shuffle.*;

121

import org.apache.spark.network.client.TransportClient;

122

import org.apache.spark.network.util.TransportConf;

123

124

// Example 1: Basic block fetching with listener

125

BlockFetchingListener basicListener = new BlockFetchingListener() {

126

@Override

127

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

128

System.out.println("Successfully fetched block: " + blockId +

129

", size: " + data.size() + " bytes");

130

131

try (InputStream dataStream = data.createInputStream()) {

132

// Process the block data

133

byte[] blockBytes = ByteStreams.toByteArray(dataStream);

134

processBlockData(blockId, blockBytes);

135

} catch (IOException e) {

136

System.err.println("Error processing block " + blockId + ": " + e.getMessage());

137

} finally {

138

// Always release the buffer to prevent memory leaks

139

data.release();

140

}

141

}

142

143

@Override

144

public void onBlockFetchFailure(String blockId, Throwable exception) {

145

System.err.println("Failed to fetch block: " + blockId +

146

", error: " + exception.getMessage());

147

148

// Handle specific error types

149

if (exception instanceof IOException) {

150

System.err.println("Network or I/O error occurred");

151

} else if (exception instanceof SecurityException) {

152

System.err.println("Authentication or authorization error");

153

}

154

155

// Log for monitoring and debugging

156

logBlockFetchFailure(blockId, exception);

157

}

158

};

159

160

// Create transport client and configuration

161

TransportConf conf = new TransportConf("shuffle");

162

TransportClient client = createTransportClient("shuffle-server", 7337);

163

164

// Fetch blocks using OneForOneBlockFetcher

165

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};

166

OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(

167

client, "app-001", "executor-1", blockIds, basicListener, conf

168

);

169

170

// Start the fetch operation

171

fetcher.start();

172

173

// Example 2: Block fetching with file downloads

174

SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager();

175

176

OneForOneBlockFetcher fetcherWithFiles = new OneForOneBlockFetcher(

177

client, "app-001", "executor-1", blockIds, basicListener, conf, fileManager

178

);

179

180

fetcherWithFiles.start();

181

182

// Example 3: Retrying block fetcher for reliability

183

BlockFetchingListener retryListener = new BlockFetchingListener() {

184

private final AtomicInteger successCount = new AtomicInteger(0);

185

private final AtomicInteger failureCount = new AtomicInteger(0);

186

187

@Override

188

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

189

int successNum = successCount.incrementAndGet();

190

System.out.println("Success #" + successNum + ": " + blockId +

191

" (" + data.size() + " bytes)");

192

193

// Process data and release buffer

194

processAndRelease(blockId, data);

195

}

196

197

@Override

198

public void onBlockFetchFailure(String blockId, Throwable exception) {

199

int failureNum = failureCount.incrementAndGet();

200

System.err.println("Failure #" + failureNum + ": " + blockId +

201

" - " + exception.getMessage());

202

203

// Update metrics

204

updateFailureMetrics(blockId, exception);

205

}

206

};

207

208

RetryingBlockFetcher.BlockFetchStarter fetchStarter =

209

new RetryingBlockFetcher.BlockFetchStarter() {

210

@Override

211

public void createAndStart(String[] blockIds, BlockFetchingListener listener) {

212

OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(

213

client, "app-001", "executor-1", blockIds, listener, conf

214

);

215

fetcher.start();

216

}

217

};

218

219

// Create retrying fetcher with automatic retry logic

220

RetryingBlockFetcher retryingFetcher = new RetryingBlockFetcher(

221

conf, fetchStarter, blockIds, retryListener

222

);

223

224

// Start with retry capability

225

retryingFetcher.start();

226

227

// Example 4: Advanced listener with metrics and monitoring

228

public class MetricsBlockFetchingListener implements BlockFetchingListener {

229

private final Timer fetchTimer = new Timer();

230

private final Counter successCounter = new Counter();

231

private final Counter failureCounter = new Counter();

232

private final Histogram dataSizeHistogram = new Histogram();

233

234

@Override

235

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

236

successCounter.inc();

237

dataSizeHistogram.update(data.size());

238

239

System.out.println("Block " + blockId + " fetched successfully");

240

241

try {

242

// Process the block data

243

processBlockData(blockId, data);

244

} finally {

245

data.release();

246

}

247

}

248

249

@Override

250

public void onBlockFetchFailure(String blockId, Throwable exception) {

251

failureCounter.inc();

252

253

// Log detailed error information

254

System.err.println("Block fetch failed: " + blockId);

255

System.err.println("Error type: " + exception.getClass().getSimpleName());

256

System.err.println("Error message: " + exception.getMessage());

257

258

// Determine if retry is appropriate

259

if (isRetryableException(exception)) {

260

System.out.println("Error is retryable, will attempt retry");

261

} else {

262

System.err.println("Error is not retryable, marking as permanent failure");

263

}

264

}

265

266

public void printMetrics() {

267

System.out.println("Fetch Metrics:");

268

System.out.println(" Successes: " + successCounter.getCount());

269

System.out.println(" Failures: " + failureCounter.getCount());

270

System.out.println(" Avg Data Size: " + dataSizeHistogram.getMean());

271

}

272

}

273

274

MetricsBlockFetchingListener metricsListener = new MetricsBlockFetchingListener();

275

// Use metricsListener with any fetcher...

276

```

277

278

### Retry Configuration

279

280

The RetryingBlockFetcher uses TransportConf parameters for retry behavior:

281

282

- `spark.shuffle.io.maxRetries` - Maximum number of retry attempts (default: 3)

283

- `spark.shuffle.io.retryWait` - Initial wait time between retries in milliseconds (default: 5000)

284

- `spark.shuffle.io.retryWaitTimeUnit` - Time unit for retry wait (default: MILLISECONDS)

285

- `spark.shuffle.io.backOffMultiplier` - Multiplier for exponential backoff (default: 1.5)

286

287

### Error Classification

288

289

Block fetch failures can be classified into several categories:

290

291

1. **Retryable Errors**:

292

- `IOException` - Network connectivity issues

293

- `TimeoutException` - Request timeouts

294

- `ConnectException` - Connection establishment failures

295

296

2. **Non-Retryable Errors**:

297

- `SecurityException` - Authentication/authorization failures

298

- `IllegalArgumentException` - Invalid block IDs or parameters

299

- `FileNotFoundException` - Missing shuffle files (permanent)

300

301

3. **Application-Specific Errors**:

302

- Custom exceptions from shuffle service implementation

303

- Data corruption errors

304

- Storage subsystem failures

305

306

### Performance Optimization

307

308

Best practices for optimal block fetching performance:

309

310

1. **Listener Implementation**:

311

- Keep success/failure handlers lightweight

312

- Process data asynchronously when possible

313

- Always release ManagedBuffer instances

314

315

2. **Batch Operations**:

316

- Fetch multiple blocks in single requests

317

- Use appropriate batch sizes based on network capacity

318

- Balance between throughput and memory usage

319

320

3. **Error Handling**:

321

- Implement exponential backoff for retries

322

- Use circuit breaker patterns for failing services

323

- Monitor and alert on high failure rates

324

325

4. **Memory Management**:

326

- Release buffers promptly after processing

327

- Monitor memory usage during large transfers

328

- Consider streaming processing for large blocks

329

330

### Monitoring and Debugging

331

332

Key metrics to monitor for block fetching operations:

333

334

- **Success/Failure Rates**: Track fetch success percentage

335

- **Latency Metrics**: Monitor fetch operation timing

336

- **Data Volume**: Track bytes transferred and rates

337

- **Retry Patterns**: Monitor retry frequency and success rates

338

- **Error Distribution**: Analyze failure types and causes