or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdhandler.mdindex.mdmesos.mdprotocol.mdresolver.mdsecurity.md

handler.mddocs/

0

# Service Handler

1

2

Server-side components for handling shuffle requests, managing registered executors, and streaming shuffle blocks to clients.

3

4

## Capabilities

5

6

### ExternalShuffleBlockHandler

7

8

RPC handler for the external shuffle service that processes client requests and manages shuffle block access.

9

10

```java { .api }

11

/**

12

* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.

13

* Handles registering executors and opening shuffle blocks using the "one-for-one" strategy.

14

*/

15

public class ExternalShuffleBlockHandler extends RpcHandler {

16

/**

17

* Creates a handler with the specified configuration and registered executor file.

18

*

19

* @param conf transport configuration

20

* @param registeredExecutorFile file for persisting executor registrations

21

* @throws IOException if initialization fails

22

*/

23

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;

24

25

/**

26

* Creates a handler with custom stream manager and block resolver (for testing).

27

*/

28

public ExternalShuffleBlockHandler(OneForOneStreamManager streamManager,

29

ExternalShuffleBlockResolver blockManager);

30

31

/**

32

* Handles incoming RPC messages from clients.

33

* Decodes messages and delegates to handleMessage.

34

*

35

* @param client the transport client

36

* @param message the encoded message

37

* @param callback callback for sending response

38

*/

39

@Override

40

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

41

42

/**

43

* Processes decoded block transfer messages.

44

* Handles OpenBlocks and RegisterExecutor message types.

45

*

46

* @param msgObj the decoded message

47

* @param client the transport client

48

* @param callback callback for sending response

49

*/

50

protected void handleMessage(BlockTransferMessage msgObj, TransportClient client,

51

RpcResponseCallback callback);

52

53

/**

54

* Returns metrics for monitoring shuffle service performance.

55

* Includes latency, throughput, and registration metrics.

56

*/

57

public MetricSet getAllMetrics();

58

59

/**

60

* Gets the stream manager for handling block streaming.

61

*/

62

@Override

63

public StreamManager getStreamManager();

64

65

/**

66

* Removes an application and optionally cleans up local directories.

67

* Called when application terminates.

68

*

69

* @param appId the application ID

70

* @param cleanupLocalDirs whether to clean up local directories in separate thread

71

*/

72

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

73

74

/**

75

* Re-registers an executor with shuffle info.

76

* Used when service restarts to restore executor state.

77

*

78

* @param appExecId composite application and executor ID

79

* @param executorInfo executor shuffle configuration

80

*/

81

public void reregisterExecutor(ExternalShuffleBlockResolver.AppExecId appExecId,

82

ExecutorShuffleInfo executorInfo);

83

84

/**

85

* Closes the handler and releases resources.

86

*/

87

public void close();

88

}

89

```

90

91

**Usage Examples:**

92

93

```java

94

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

95

import org.apache.spark.network.util.TransportConf;

96

import org.apache.spark.network.server.TransportServer;

97

import java.io.File;

98

99

// Create handler for shuffle service

100

TransportConf conf = new TransportConf("shuffle");

101

File executorFile = new File("/var/spark/shuffle/executors.db");

102

ExternalShuffleBlockHandler handler;

103

104

try {

105

handler = new ExternalShuffleBlockHandler(conf, executorFile);

106

System.out.println("Shuffle service handler created");

107

} catch (IOException e) {

108

System.err.println("Failed to create handler: " + e.getMessage());

109

return;

110

}

111

112

// Get metrics for monitoring

113

MetricSet metrics = handler.getAllMetrics();

114

System.out.println("Handler metrics: " + metrics.getMetrics().keySet());

115

116

// Handle application cleanup

117

String appId = "application_1234567890_0001";

118

handler.applicationRemoved(appId, true); // Clean up local directories

119

120

// Restore executor after service restart

121

ExternalShuffleBlockResolver.AppExecId appExecId =

122

new ExternalShuffleBlockResolver.AppExecId(appId, "executor-1");

123

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(

124

new String[]{"/tmp/spark"}, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");

125

handler.reregisterExecutor(appExecId, executorInfo);

126

127

// Clean shutdown

128

handler.close();

129

```

130

131

### Message Processing

132

133

The handler processes two main types of client messages:

134

135

#### OpenBlocks Messages

136

137

Handles requests to open shuffle blocks for streaming to clients.

138

139

```java

140

// When client sends OpenBlocks message:

141

// 1. Validate client authentication

142

// 2. Create iterator over requested blocks

143

// 3. Register stream with StreamManager

144

// 4. Return StreamHandle to client

145

// 5. Client uses handle to fetch block chunks

146

```

147

148

#### RegisterExecutor Messages

149

150

Handles executor registration with the shuffle service.

151

152

```java

153

// When client sends RegisterExecutor message:

154

// 1. Validate client authentication

155

// 2. Extract executor shuffle info

156

// 3. Register executor with BlockResolver

157

// 4. Persist registration to disk

158

// 5. Return success response

159

```

160

161

### Metrics and Monitoring

162

163

The handler provides comprehensive metrics for monitoring shuffle service performance:

164

165

```java { .api }

166

/**

167

* Metrics provided by the shuffle service handler.

168

*/

169

public class ShuffleMetrics implements MetricSet {

170

// Time latency for open block request in milliseconds

171

private final Timer openBlockRequestLatencyMillis;

172

173

// Time latency for executor registration in milliseconds

174

private final Timer registerExecutorRequestLatencyMillis;

175

176

// Block transfer rate in bytes per second

177

private final Meter blockTransferRateBytes;

178

179

// Number of currently registered executors

180

private final Gauge<Integer> registeredExecutorsSize;

181

}

182

```

183

184

**Metrics Usage Example:**

185

186

```java

187

import com.codahale.metrics.MetricRegistry;

188

import com.codahale.metrics.ConsoleReporter;

189

190

// Set up metrics reporting

191

MetricRegistry registry = new MetricRegistry();

192

MetricSet shuffleMetrics = handler.getAllMetrics();

193

194

// Register shuffle metrics

195

for (Map.Entry<String, Metric> entry : shuffleMetrics.getMetrics().entrySet()) {

196

registry.register("shuffle." + entry.getKey(), entry.getValue());

197

}

198

199

// Create console reporter

200

ConsoleReporter reporter = ConsoleReporter.forRegistry(registry)

201

.convertRatesTo(TimeUnit.SECONDS)

202

.convertDurationsTo(TimeUnit.MILLISECONDS)

203

.build();

204

205

// Report metrics every 30 seconds

206

reporter.start(30, TimeUnit.SECONDS);

207

```

208

209

### Authentication and Security

210

211

The handler enforces client authentication when SASL is enabled:

212

213

```java

214

/**

215

* Validates that the client is authorized for the specified application.

216

* Throws SecurityException if client ID doesn't match app ID.

217

*/

218

private void checkAuth(TransportClient client, String appId) {

219

if (client.getClientId() != null && !client.getClientId().equals(appId)) {

220

throw new SecurityException(String.format(

221

"Client for %s not authorized for application %s.",

222

client.getClientId(), appId));

223

}

224

}

225

```

226

227

## Error Handling

228

229

The handler manages various error conditions:

230

231

- **SecurityException**: Client authentication failures

232

- **UnsupportedOperationException**: Unknown or unsupported message types

233

- **IOException**: File system errors when accessing shuffle blocks

234

- **IllegalArgumentException**: Invalid block IDs or malformed requests

235

236

**Error Response Example:**

237

238

```java

239

// Handler automatically sends error responses to client

240

try {

241

// Process message

242

handleMessage(msgObj, client, callback);

243

} catch (SecurityException e) {

244

// Client receives authentication error

245

callback.onFailure(e);

246

} catch (UnsupportedOperationException e) {

247

// Client receives unsupported operation error

248

callback.onFailure(e);

249

}

250

```

251

252

## Integration with Transport Layer

253

254

The handler integrates with Spark's network transport layer:

255

256

```java

257

import org.apache.spark.network.TransportContext;

258

import org.apache.spark.network.server.TransportServer;

259

260

// Create transport context with handler

261

TransportContext context = new TransportContext(conf, handler);

262

263

// Create server listening on specified port

264

TransportServer server = context.createServer(7337);

265

System.out.println("Shuffle service listening on port 7337");

266

267

// Server automatically routes messages to handler.receive()

268

```