or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md

shuffle-server.mddocs/

0

# Shuffle Server Components

1

2

Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.

3

4

## Capabilities

5

6

### ExternalShuffleBlockHandler

7

8

RPC handler for serving shuffle blocks from external shuffle service.

9

10

```java { .api }

11

/**

12

* RPC handler for serving shuffle blocks from external shuffle service

13

*/

14

public class ExternalShuffleBlockHandler extends RpcHandler {

15

/**

16

* Create an external shuffle block handler

17

* @param conf - Transport configuration

18

* @param registeredExecutorFile - File containing registered executor information

19

*/

20

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);

21

22

/**

23

* Create an external shuffle block handler for testing

24

* @param streamManager - Stream manager for handling streams

25

* @param blockManager - Block resolver for resolving block requests

26

*/

27

public ExternalShuffleBlockHandler(

28

OneForOneStreamManager streamManager, ExternalShuffleBlockResolver blockManager

29

);

30

31

/**

32

* Handle incoming RPC messages

33

* @param client - Transport client that sent the message

34

* @param message - The message bytes

35

* @param callback - Callback for sending response

36

*/

37

@Override

38

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

39

40

/**

41

* Get all metrics from the handler

42

* @return MetricSet containing all shuffle server metrics

43

*/

44

public MetricSet getAllMetrics();

45

46

/**

47

* Get the stream manager used by this handler

48

* @return StreamManager instance

49

*/

50

@Override

51

public StreamManager getStreamManager();

52

53

/**

54

* Handle application removal cleanup

55

* @param appId - Application ID to clean up

56

* @param cleanupLocalDirs - Whether to clean up local directories

57

*/

58

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

59

60

/**

61

* Handle executor removal cleanup

62

* @param executorId - Executor ID to clean up

63

* @param appId - Application ID the executor belongs to

64

*/

65

public void executorRemoved(String executorId, String appId);

66

67

/**

68

* Re-register an executor with updated information

69

* @param appExecId - Combined application and executor ID

70

* @param executorInfo - Updated executor shuffle information

71

*/

72

public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo);

73

74

/**

75

* Close the handler and clean up resources

76

*/

77

public void close();

78

}

79

```

80

81

### ExternalShuffleBlockResolver

82

83

Manages converting shuffle block IDs to physical file segments.

84

85

```java { .api }

86

/**

87

* Manages converting shuffle block IDs to physical file segments

88

*/

89

public class ExternalShuffleBlockResolver {

90

/**

91

* Create an external shuffle block resolver

92

* @param conf - Transport configuration

93

* @param registeredExecutorFile - File containing registered executor information

94

*/

95

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);

96

97

/**

98

* Get the number of registered executors

99

* @return Number of currently registered executors

100

*/

101

public int getRegisteredExecutorsSize();

102

103

/**

104

* Register an executor with its shuffle configuration

105

* @param appId - Application ID

106

* @param execId - Executor ID

107

* @param executorInfo - Executor shuffle configuration information

108

*/

109

public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

110

111

/**

112

* Get block data for a specific shuffle block

113

* @param appId - Application ID

114

* @param execId - Executor ID

115

* @param shuffleId - Shuffle ID

116

* @param mapId - Map task ID

117

* @param reduceId - Reduce task ID

118

* @return ManagedBuffer containing the block data

119

*/

120

public ManagedBuffer getBlockData(

121

String appId, String execId, int shuffleId, int mapId, int reduceId

122

);

123

124

/**

125

* Handle application removal and cleanup

126

* @param appId - Application ID to remove

127

* @param cleanupLocalDirs - Whether to clean up local directories

128

*/

129

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

130

131

/**

132

* Handle executor removal and cleanup

133

* @param executorId - Executor ID to remove

134

* @param appId - Application ID the executor belongs to

135

*/

136

public void executorRemoved(String executorId, String appId);

137

138

/**

139

* Close the resolver and clean up resources

140

*/

141

public void close();

142

143

/**

144

* Combined application and executor ID

145

*/

146

public static class AppExecId {

147

public final String appId;

148

public final String execId;

149

150

public AppExecId(String appId, String execId);

151

public boolean equals(Object other);

152

public int hashCode();

153

public String toString();

154

}

155

}

156

```

157

158

### ShuffleIndexInformation

159

160

Keeps index information for map output as in-memory buffer.

161

162

```java { .api }

163

/**

164

* Keeps index information for map output as in-memory buffer

165

*/

166

public class ShuffleIndexInformation {

167

/**

168

* Create shuffle index information from an index file

169

* @param indexFile - The shuffle index file to read

170

*/

171

public ShuffleIndexInformation(File indexFile);

172

173

/**

174

* Get the number of index entries

175

* @return Number of index entries

176

*/

177

public int getSize();

178

179

/**

180

* Get index record for a specific reduce ID

181

* @param reduceId - Reduce task ID

182

* @return ShuffleIndexRecord containing offset and length information

183

*/

184

public ShuffleIndexRecord getIndex(int reduceId);

185

}

186

```

187

188

### ShuffleIndexRecord

189

190

Contains offset and length of shuffle block data.

191

192

```java { .api }

193

/**

194

* Contains offset and length of shuffle block data

195

*/

196

public class ShuffleIndexRecord {

197

/**

198

* Create a shuffle index record

199

* @param offset - Byte offset in the shuffle data file

200

* @param length - Length of the data block in bytes

201

*/

202

public ShuffleIndexRecord(long offset, long length);

203

204

/**

205

* Get the byte offset of the block

206

* @return Byte offset in the shuffle data file

207

*/

208

public long getOffset();

209

210

/**

211

* Get the length of the block

212

* @return Length of the data block in bytes

213

*/

214

public long getLength();

215

}

216

```

217

218

**Usage Examples:**

219

220

```java

221

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

222

import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;

223

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

224

import org.apache.spark.network.util.TransportConf;

225

226

// Create transport configuration for shuffle server

227

TransportConf conf = new TransportConf("shuffle");

228

229

// Create file for storing registered executor information

230

File registeredExecutorFile = new File("/tmp/registered-executors.db");

231

232

// Create block resolver for handling block requests

233

ExternalShuffleBlockResolver blockResolver = new ExternalShuffleBlockResolver(conf, registeredExecutorFile);

234

235

// Create RPC handler

236

ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(conf, registeredExecutorFile);

237

238

// Register an executor

239

String appId = "app-20231201-001";

240

String execId = "executor-1";

241

String[] localDirs = {"/tmp/spark-local-20231201-001/1", "/tmp/spark-local-20231201-001/2"};

242

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");

243

244

blockResolver.registerExecutor(appId, execId, executorInfo);

245

System.out.println("Registered executors: " + blockResolver.getRegisteredExecutorsSize());

246

247

// Retrieve block data

248

try {

249

ManagedBuffer blockData = blockResolver.getBlockData(appId, execId, 1, 0, 0);

250

System.out.println("Retrieved block data, size: " + blockData.size() + " bytes");

251

252

// Process the block data

253

try (InputStream dataStream = blockData.createInputStream()) {

254

// Process the shuffle block data

255

processShuffleBlock(dataStream);

256

}

257

258

// Important: release the buffer

259

blockData.release();

260

} catch (Exception e) {

261

System.err.println("Error retrieving block data: " + e.getMessage());

262

}

263

264

// Monitor server metrics

265

MetricSet serverMetrics = handler.getAllMetrics();

266

System.out.println("Server metrics: " + serverMetrics);

267

268

// Handle application cleanup

269

handler.applicationRemoved(appId, true);

270

blockResolver.applicationRemoved(appId, true);

271

272

// Clean up resources

273

handler.close();

274

blockResolver.close();

275

```

276

277

### Server Configuration and Deployment

278

279

The shuffle server components can be configured through various Transport configuration parameters:

280

281

- `spark.shuffle.service.enabled` - Enable external shuffle service

282

- `spark.shuffle.service.port` - Port for the external shuffle service

283

- `spark.shuffle.service.index.cache.size` - Size of index cache

284

- `spark.shuffle.service.db.backend` - Database backend for executor registration

285

- `spark.shuffle.maxChunksBeingTransferred` - Maximum chunks being transferred simultaneously

286

287

### Error Handling and Monitoring

288

289

The server components provide comprehensive error handling and metrics:

290

291

1. **Metrics Collection**: Use `getAllMetrics()` to monitor server performance

292

2. **Resource Cleanup**: Properly handle application and executor removal

293

3. **File Management**: Automatic cleanup of local directories when configured

294

4. **Exception Handling**: Robust error handling for corrupt or missing files

295

5. **Authentication**: Integration with SASL authentication for secure operations

296

297

### Block Resolution Process

298

299

The block resolution process follows these steps:

300

301

1. **Registration**: Executors register their shuffle configuration with the server

302

2. **Block Request**: Clients request specific blocks using shuffle/map/reduce IDs

303

3. **File Location**: Server resolves block IDs to physical file locations

304

4. **Index Lookup**: Use shuffle index files to find byte ranges for blocks

305

5. **Data Retrieval**: Read the specific byte range from shuffle data files

306

6. **Buffer Management**: Return data as ManagedBuffer for efficient memory handling