or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdprotocol-messages.mdsecurity.mdserver-components.mdshuffle-client.md

server-components.mddocs/

0

# Server-Side Block Management

1

2

The server-side components handle incoming shuffle requests, manage executor registrations, and resolve shuffle blocks on the local filesystem. These components run as part of the external shuffle service.

3

4

## Block Handler

5

6

### ExternalShuffleBlockHandler

7

8

```java { .api }

9

public class ExternalShuffleBlockHandler extends RpcHandler {

10

public ExternalShuffleBlockHandler(

11

TransportConf conf,

12

File registeredExecutorFile

13

) throws IOException;

14

15

public ExternalShuffleBlockHandler(

16

OneForOneStreamManager streamManager,

17

ExternalShuffleBlockResolver blockManager

18

);

19

20

public void receive(

21

TransportClient client,

22

ByteBuffer message,

23

RpcResponseCallback callback

24

);

25

26

public StreamManager getStreamManager();

27

}

28

```

29

30

RPC handler for the external shuffle service that processes client requests for shuffle blocks.

31

32

**Constructor Parameters:**

33

- `conf` (TransportConf): Network transport configuration

34

- `registeredExecutorFile` (File): File used for persistent executor registration storage

35

- `streamManager` (OneForOneStreamManager): Stream manager for block transfers (testing constructor)

36

- `blockManager` (ExternalShuffleBlockResolver): Block resolver instance (testing constructor)

37

38

**Key Methods:**

39

40

#### receive

41

42

Processes incoming RPC messages from shuffle clients. Handles executor registration, block open requests, and other protocol messages.

43

44

**Parameters:**

45

- `client` (TransportClient): Client connection that sent the message

46

- `message` (ByteBuffer): Serialized protocol message

47

- `callback` (RpcResponseCallback): Callback for sending response

48

49

#### getStreamManager

50

51

Returns the stream manager used for managing block data streams.

52

53

**Returns:**

54

- `StreamManager`: The stream manager instance

55

56

## Block Resolver

57

58

### ExternalShuffleBlockResolver

59

60

```java { .api }

61

public class ExternalShuffleBlockResolver {

62

public ExternalShuffleBlockResolver(

63

TransportConf conf,

64

File registeredExecutorFile

65

) throws IOException;

66

67

public void registerExecutor(

68

String appId,

69

String execId,

70

ExecutorShuffleInfo executorInfo

71

);

72

73

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

74

75

public ManagedBuffer getBlockData(

76

String appId,

77

String execId,

78

String blockId

79

) throws IOException;

80

81

public void close();

82

}

83

```

84

85

Manages the mapping between shuffle block IDs and physical file segments on the local filesystem. Handles executor registration and block location resolution.

86

87

**Constructor Parameters:**

88

- `conf` (TransportConf): Transport configuration for the resolver

89

- `registeredExecutorFile` (File): File for persisting executor registrations across restarts

90

91

**Throws:**

92

- `IOException`: If unable to initialize persistent storage

93

94

**Key Methods:**

95

96

#### registerExecutor

97

98

Registers an executor's shuffle configuration, storing information about where it writes shuffle files.

99

100

**Parameters:**

101

- `appId` (String): Spark application ID

102

- `execId` (String): Executor ID

103

- `executorInfo` (ExecutorShuffleInfo): Configuration describing shuffle file locations

104

105

#### applicationRemoved

106

107

Cleans up data for a removed Spark application, optionally removing local shuffle directories.

108

109

**Parameters:**

110

- `appId` (String): Application ID to clean up

111

- `cleanupLocalDirs` (boolean): Whether to delete local shuffle directories

112

113

#### getBlockData

114

115

Retrieves shuffle block data from the local filesystem.

116

117

**Parameters:**

118

- `appId` (String): Application ID that owns the block

119

- `execId` (String): Executor ID that wrote the block

120

- `blockId` (String): Block identifier to retrieve

121

122

**Returns:**

123

- `ManagedBuffer`: Buffer containing the block data

124

125

**Throws:**

126

- `IOException`: If block cannot be found or read

127

128

#### close

129

130

Closes the resolver and releases resources including persistent storage connections.

131

132

## Internal Components

133

134

### AppExecId

135

136

```java { .api }

137

public static class AppExecId {

138

public final String appId;

139

public final String execId;

140

141

public AppExecId(String appId, String execId);

142

143

public boolean equals(Object other);

144

public int hashCode();

145

public String toString();

146

}

147

```

148

149

Internal identifier class combining application and executor IDs for tracking registered executors.

150

151

## Usage Examples

152

153

### Basic Server Setup

154

155

```java

156

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

157

import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;

158

import org.apache.spark.network.util.TransportConf;

159

import java.io.File;

160

161

// Create transport configuration

162

TransportConf conf = new TransportConf("shuffle", new SparkConf());

163

164

// Create block handler with persistent storage

165

File registrationFile = new File("/tmp/spark-shuffle-registrations");

166

ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(

167

conf,

168

registrationFile

169

);

170

171

// The handler can now be used with a TransportServer

172

```

173

174

### Custom Server Setup

175

176

```java

177

import org.apache.spark.network.server.OneForOneStreamManager;

178

179

// Create components separately for testing or custom configuration

180

OneForOneStreamManager streamManager = new OneForOneStreamManager();

181

ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(

182

conf,

183

new File("/tmp/registrations")

184

);

185

186

ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(

187

streamManager,

188

resolver

189

);

190

```

191

192

### Manual Block Resolution

193

194

```java

195

// Register an executor manually

196

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(

197

new String[]{"/tmp/spark-local-dir1", "/tmp/spark-local-dir2"},

198

64, // subdirs per local dir

199

"org.apache.spark.shuffle.sort.SortShuffleManager"

200

);

201

202

resolver.registerExecutor("app-123", "executor-1", executorInfo);

203

204

// Later, retrieve a block

205

try {

206

ManagedBuffer blockData = resolver.getBlockData(

207

"app-123",

208

"executor-1",

209

"shuffle_1_2_0"

210

);

211

212

System.out.println("Block data size: " + blockData.size());

213

214

// Process block data

215

byte[] data = new byte[(int) blockData.size()];

216

blockData.nioByteBuffer().get(data);

217

218

// Release buffer when done

219

blockData.release();

220

221

} catch (IOException e) {

222

System.err.println("Failed to retrieve block: " + e.getMessage());

223

}

224

```

225

226

### Application Cleanup

227

228

```java

229

// Clean up after application completion

230

resolver.applicationRemoved("app-123", true); // true = cleanup local directories

231

232

// Or clean up without removing directories (for debugging)

233

resolver.applicationRemoved("app-123", false);

234

```

235

236

### Integration with Transport Server

237

238

```java

239

import org.apache.spark.network.TransportContext;

240

import org.apache.spark.network.server.TransportServer;

241

242

// Create transport context with the block handler

243

TransportContext context = new TransportContext(conf, handler);

244

245

// Create and start server

246

TransportServer server = context.createServer(7337, Collections.emptyList());

247

System.out.println("Shuffle service started on port 7337");

248

249

// Server will now handle incoming shuffle client requests

250

// Remember to close when done:

251

// server.close();

252

// resolver.close();

253

```

254

255

## Persistence and Recovery

256

257

The block resolver uses LevelDB for persistent storage of executor registrations. This enables recovery of executor metadata across service restarts.

258

259

**Persistent Storage:**

260

- Executor registrations survive service restarts

261

- Block locations are reconstructed from stored metadata

262

- Cleanup operations are reflected in persistent state

263

264

**Recovery Behavior:**

265

- On startup, previously registered executors are restored

266

- Block requests can be served immediately after restart

267

- No need to re-register executors unless shuffle files have moved

268

269

## Error Handling

270

271

Common error scenarios:

272

- **Block Not Found**: Requested block doesn't exist on filesystem

273

- **Executor Not Registered**: Attempt to fetch blocks from unregistered executor

274

- **IO Errors**: Filesystem permission issues or disk failures

275

- **Corruption**: Persistent storage corruption requiring reconstruction

276

277

```java

278

try {

279

ManagedBuffer block = resolver.getBlockData("app-1", "exec-1", "shuffle_1_0_0");

280

// Process block...

281

} catch (IOException e) {

282

if (e.getMessage().contains("not found")) {

283

// Handle missing block

284

System.err.println("Block not found, may have been cleaned up");

285

} else {

286

// Handle other IO errors

287

System.err.println("IO error reading block: " + e.getMessage());

288

}

289

}

290

```