0
# Service Handler
1
2
Server-side components for handling shuffle requests, managing registered executors, and streaming shuffle blocks to clients.
3
4
## Capabilities
5
6
### ExternalShuffleBlockHandler
7
8
RPC handler for the external shuffle service that processes client requests and manages shuffle block access.
9
10
```java { .api }
11
/**
12
* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
13
* Handles registering executors and opening shuffle blocks using the "one-for-one" strategy.
14
*/
15
public class ExternalShuffleBlockHandler extends RpcHandler {
16
/**
17
* Creates a handler with the specified configuration and registered executor file.
18
*
19
* @param conf transport configuration
20
* @param registeredExecutorFile file for persisting executor registrations
21
* @throws IOException if initialization fails
22
*/
23
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
24
25
/**
26
* Creates a handler with custom stream manager and block resolver (for testing).
27
*/
28
public ExternalShuffleBlockHandler(OneForOneStreamManager streamManager,
29
ExternalShuffleBlockResolver blockManager);
30
31
/**
32
* Handles incoming RPC messages from clients.
33
* Decodes messages and delegates to handleMessage.
34
*
35
* @param client the transport client
36
* @param message the encoded message
37
* @param callback callback for sending response
38
*/
39
@Override
40
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
41
42
/**
43
* Processes decoded block transfer messages.
44
* Handles OpenBlocks and RegisterExecutor message types.
45
*
46
* @param msgObj the decoded message
47
* @param client the transport client
48
* @param callback callback for sending response
49
*/
50
protected void handleMessage(BlockTransferMessage msgObj, TransportClient client,
51
RpcResponseCallback callback);
52
53
/**
54
* Returns metrics for monitoring shuffle service performance.
55
* Includes latency, throughput, and registration metrics.
56
*/
57
public MetricSet getAllMetrics();
58
59
/**
60
* Gets the stream manager for handling block streaming.
61
*/
62
@Override
63
public StreamManager getStreamManager();
64
65
/**
66
* Removes an application and optionally cleans up local directories.
67
* Called when application terminates.
68
*
69
* @param appId the application ID
70
* @param cleanupLocalDirs whether to clean up local directories in separate thread
71
*/
72
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
73
74
/**
75
* Re-registers an executor with shuffle info.
76
* Used when service restarts to restore executor state.
77
*
78
* @param appExecId composite application and executor ID
79
* @param executorInfo executor shuffle configuration
80
*/
81
public void reregisterExecutor(ExternalShuffleBlockResolver.AppExecId appExecId,
82
ExecutorShuffleInfo executorInfo);
83
84
/**
85
* Closes the handler and releases resources.
86
*/
87
public void close();
88
}
89
```
90
91
**Usage Examples:**
92
93
```java
94
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
95
import org.apache.spark.network.util.TransportConf;
96
import org.apache.spark.network.server.TransportServer;
97
import java.io.File;
98
99
// Create handler for shuffle service
100
TransportConf conf = new TransportConf("shuffle");
101
File executorFile = new File("/var/spark/shuffle/executors.db");
102
ExternalShuffleBlockHandler handler;
103
104
try {
105
handler = new ExternalShuffleBlockHandler(conf, executorFile);
106
System.out.println("Shuffle service handler created");
107
} catch (IOException e) {
108
System.err.println("Failed to create handler: " + e.getMessage());
109
return;
110
}
111
112
// Get metrics for monitoring
113
MetricSet metrics = handler.getAllMetrics();
114
System.out.println("Handler metrics: " + metrics.getMetrics().keySet());
115
116
// Handle application cleanup
117
String appId = "application_1234567890_0001";
118
handler.applicationRemoved(appId, true); // Clean up local directories
119
120
// Restore executor after service restart
121
ExternalShuffleBlockResolver.AppExecId appExecId =
122
new ExternalShuffleBlockResolver.AppExecId(appId, "executor-1");
123
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
124
new String[]{"/tmp/spark"}, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");
125
handler.reregisterExecutor(appExecId, executorInfo);
126
127
// Clean shutdown
128
handler.close();
129
```
130
131
### Message Processing
132
133
The handler processes two main types of client messages:
134
135
#### OpenBlocks Messages
136
137
Handles requests to open shuffle blocks for streaming to clients.
138
139
```java
140
// When client sends OpenBlocks message:
141
// 1. Validate client authentication
142
// 2. Create iterator over requested blocks
143
// 3. Register stream with StreamManager
144
// 4. Return StreamHandle to client
145
// 5. Client uses handle to fetch block chunks
146
```
147
148
#### RegisterExecutor Messages
149
150
Handles executor registration with the shuffle service.
151
152
```java
153
// When client sends RegisterExecutor message:
154
// 1. Validate client authentication
155
// 2. Extract executor shuffle info
156
// 3. Register executor with BlockResolver
157
// 4. Persist registration to disk
158
// 5. Return success response
159
```
160
161
### Metrics and Monitoring
162
163
The handler provides comprehensive metrics for monitoring shuffle service performance:
164
165
```java { .api }
166
/**
167
* Metrics provided by the shuffle service handler.
168
*/
169
public class ShuffleMetrics implements MetricSet {
170
// Time latency for open block request in milliseconds
171
private final Timer openBlockRequestLatencyMillis;
172
173
// Time latency for executor registration in milliseconds
174
private final Timer registerExecutorRequestLatencyMillis;
175
176
// Block transfer rate in bytes per second
177
private final Meter blockTransferRateBytes;
178
179
// Number of currently registered executors
180
private final Gauge<Integer> registeredExecutorsSize;
181
}
182
```
183
184
**Metrics Usage Example:**
185
186
```java
187
import com.codahale.metrics.MetricRegistry;
188
import com.codahale.metrics.ConsoleReporter;
189
190
// Set up metrics reporting
191
MetricRegistry registry = new MetricRegistry();
192
MetricSet shuffleMetrics = handler.getAllMetrics();
193
194
// Register shuffle metrics
195
for (Map.Entry<String, Metric> entry : shuffleMetrics.getMetrics().entrySet()) {
196
registry.register("shuffle." + entry.getKey(), entry.getValue());
197
}
198
199
// Create console reporter
200
ConsoleReporter reporter = ConsoleReporter.forRegistry(registry)
201
.convertRatesTo(TimeUnit.SECONDS)
202
.convertDurationsTo(TimeUnit.MILLISECONDS)
203
.build();
204
205
// Report metrics every 30 seconds
206
reporter.start(30, TimeUnit.SECONDS);
207
```
208
209
### Authentication and Security
210
211
The handler enforces client authentication when SASL is enabled:
212
213
```java
214
/**
215
* Validates that the client is authorized for the specified application.
216
* Throws SecurityException if client ID doesn't match app ID.
217
*/
218
private void checkAuth(TransportClient client, String appId) {
219
if (client.getClientId() != null && !client.getClientId().equals(appId)) {
220
throw new SecurityException(String.format(
221
"Client for %s not authorized for application %s.",
222
client.getClientId(), appId));
223
}
224
}
225
```
226
227
## Error Handling
228
229
The handler manages various error conditions:
230
231
- **SecurityException**: Client authentication failures
232
- **UnsupportedOperationException**: Unknown or unsupported message types
233
- **IOException**: File system errors when accessing shuffle blocks
234
- **IllegalArgumentException**: Invalid block IDs or malformed requests
235
236
**Error Response Example:**
237
238
```java
239
// Handler automatically sends error responses to client
240
try {
241
// Process message
242
handleMessage(msgObj, client, callback);
243
} catch (SecurityException e) {
244
// Client receives authentication error
245
callback.onFailure(e);
246
} catch (UnsupportedOperationException e) {
247
// Client receives unsupported operation error
248
callback.onFailure(e);
249
}
250
```
251
252
## Integration with Transport Layer
253
254
The handler integrates with Spark's network transport layer:
255
256
```java
257
import org.apache.spark.network.TransportContext;
258
import org.apache.spark.network.server.TransportServer;
259
260
// Create transport context with handler
261
TransportContext context = new TransportContext(conf, handler);
262
263
// Create server listening on specified port
264
TransportServer server = context.createServer(7337);
265
System.out.println("Shuffle service listening on port 7337");
266
267
// Server automatically routes messages to handler.receive()
268
```