0
# Shuffle Server Components
1
2
Server-side components for handling shuffle requests, including RPC handlers and block resolution mechanisms.
3
4
## Capabilities
5
6
### ExternalShuffleBlockHandler
7
8
RPC handler for serving shuffle blocks from external shuffle service.
9
10
```java { .api }
11
/**
12
* RPC handler for serving shuffle blocks from external shuffle service
13
*/
14
public class ExternalShuffleBlockHandler extends RpcHandler {
15
/**
16
* Create an external shuffle block handler
17
* @param conf - Transport configuration
18
* @param registeredExecutorFile - File containing registered executor information
19
*/
20
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile);
21
22
/**
23
* Create an external shuffle block handler for testing
24
* @param streamManager - Stream manager for handling streams
25
* @param blockManager - Block resolver for resolving block requests
26
*/
27
public ExternalShuffleBlockHandler(
28
OneForOneStreamManager streamManager, ExternalShuffleBlockResolver blockManager
29
);
30
31
/**
32
* Handle incoming RPC messages
33
* @param client - Transport client that sent the message
34
* @param message - The message bytes
35
* @param callback - Callback for sending response
36
*/
37
@Override
38
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
39
40
/**
41
* Get all metrics from the handler
42
* @return MetricSet containing all shuffle server metrics
43
*/
44
public MetricSet getAllMetrics();
45
46
/**
47
* Get the stream manager used by this handler
48
* @return StreamManager instance
49
*/
50
@Override
51
public StreamManager getStreamManager();
52
53
/**
54
* Handle application removal cleanup
55
* @param appId - Application ID to clean up
56
* @param cleanupLocalDirs - Whether to clean up local directories
57
*/
58
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
59
60
/**
61
* Handle executor removal cleanup
62
* @param executorId - Executor ID to clean up
63
* @param appId - Application ID the executor belongs to
64
*/
65
public void executorRemoved(String executorId, String appId);
66
67
/**
68
* Re-register an executor with updated information
69
* @param appExecId - Combined application and executor ID
70
* @param executorInfo - Updated executor shuffle information
71
*/
72
public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo);
73
74
/**
75
* Close the handler and clean up resources
76
*/
77
public void close();
78
}
79
```
80
81
### ExternalShuffleBlockResolver
82
83
Manages converting shuffle block IDs to physical file segments.
84
85
```java { .api }
86
/**
87
* Manages converting shuffle block IDs to physical file segments
88
*/
89
public class ExternalShuffleBlockResolver {
90
/**
91
* Create an external shuffle block resolver
92
* @param conf - Transport configuration
93
* @param registeredExecutorFile - File containing registered executor information
94
*/
95
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile);
96
97
/**
98
* Get the number of registered executors
99
* @return Number of currently registered executors
100
*/
101
public int getRegisteredExecutorsSize();
102
103
/**
104
* Register an executor with its shuffle configuration
105
* @param appId - Application ID
106
* @param execId - Executor ID
107
* @param executorInfo - Executor shuffle configuration information
108
*/
109
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
110
111
/**
112
* Get block data for a specific shuffle block
113
* @param appId - Application ID
114
* @param execId - Executor ID
115
* @param shuffleId - Shuffle ID
116
* @param mapId - Map task ID
117
* @param reduceId - Reduce task ID
118
* @return ManagedBuffer containing the block data
119
*/
120
public ManagedBuffer getBlockData(
121
String appId, String execId, int shuffleId, int mapId, int reduceId
122
);
123
124
/**
125
* Handle application removal and cleanup
126
* @param appId - Application ID to remove
127
* @param cleanupLocalDirs - Whether to clean up local directories
128
*/
129
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
130
131
/**
132
* Handle executor removal and cleanup
133
* @param executorId - Executor ID to remove
134
* @param appId - Application ID the executor belongs to
135
*/
136
public void executorRemoved(String executorId, String appId);
137
138
/**
139
* Close the resolver and clean up resources
140
*/
141
public void close();
142
143
/**
144
* Combined application and executor ID
145
*/
146
public static class AppExecId {
147
public final String appId;
148
public final String execId;
149
150
public AppExecId(String appId, String execId);
151
public boolean equals(Object other);
152
public int hashCode();
153
public String toString();
154
}
155
}
156
```
157
158
### ShuffleIndexInformation
159
160
Keeps index information for map output as in-memory buffer.
161
162
```java { .api }
163
/**
164
* Keeps index information for map output as in-memory buffer
165
*/
166
public class ShuffleIndexInformation {
167
/**
168
* Create shuffle index information from an index file
169
* @param indexFile - The shuffle index file to read
170
*/
171
public ShuffleIndexInformation(File indexFile);
172
173
/**
174
* Get the number of index entries
175
* @return Number of index entries
176
*/
177
public int getSize();
178
179
/**
180
* Get index record for a specific reduce ID
181
* @param reduceId - Reduce task ID
182
* @return ShuffleIndexRecord containing offset and length information
183
*/
184
public ShuffleIndexRecord getIndex(int reduceId);
185
}
186
```
187
188
### ShuffleIndexRecord
189
190
Contains offset and length of shuffle block data.
191
192
```java { .api }
193
/**
194
* Contains offset and length of shuffle block data
195
*/
196
public class ShuffleIndexRecord {
197
/**
198
* Create a shuffle index record
199
* @param offset - Byte offset in the shuffle data file
200
* @param length - Length of the data block in bytes
201
*/
202
public ShuffleIndexRecord(long offset, long length);
203
204
/**
205
* Get the byte offset of the block
206
* @return Byte offset in the shuffle data file
207
*/
208
public long getOffset();
209
210
/**
211
* Get the length of the block
212
* @return Length of the data block in bytes
213
*/
214
public long getLength();
215
}
216
```
217
218
**Usage Examples:**
219
220
```java
221
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
222
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
223
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
224
import org.apache.spark.network.util.TransportConf;
225
226
// Create transport configuration for shuffle server
227
TransportConf conf = new TransportConf("shuffle");
228
229
// Create file for storing registered executor information
230
File registeredExecutorFile = new File("/tmp/registered-executors.db");
231
232
// Create block resolver for handling block requests
233
ExternalShuffleBlockResolver blockResolver = new ExternalShuffleBlockResolver(conf, registeredExecutorFile);
234
235
// Create RPC handler
236
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(conf, registeredExecutorFile);
237
238
// Register an executor
239
String appId = "app-20231201-001";
240
String execId = "executor-1";
241
String[] localDirs = {"/tmp/spark-local-20231201-001/1", "/tmp/spark-local-20231201-001/2"};
242
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
243
244
blockResolver.registerExecutor(appId, execId, executorInfo);
245
System.out.println("Registered executors: " + blockResolver.getRegisteredExecutorsSize());
246
247
// Retrieve block data
248
try {
249
ManagedBuffer blockData = blockResolver.getBlockData(appId, execId, 1, 0, 0);
250
System.out.println("Retrieved block data, size: " + blockData.size() + " bytes");
251
252
// Process the block data
253
try (InputStream dataStream = blockData.createInputStream()) {
254
// Process the shuffle block data
255
processShuffleBlock(dataStream);
256
}
257
258
// Important: release the buffer
259
blockData.release();
260
} catch (Exception e) {
261
System.err.println("Error retrieving block data: " + e.getMessage());
262
}
263
264
// Monitor server metrics
265
MetricSet serverMetrics = handler.getAllMetrics();
266
System.out.println("Server metrics: " + serverMetrics);
267
268
// Handle application cleanup
269
handler.applicationRemoved(appId, true);
270
blockResolver.applicationRemoved(appId, true);
271
272
// Clean up resources
273
handler.close();
274
blockResolver.close();
275
```
276
277
### Server Configuration and Deployment
278
279
The shuffle server components can be configured through various Transport configuration parameters:
280
281
- `spark.shuffle.service.enabled` - Enable external shuffle service
282
- `spark.shuffle.service.port` - Port for the external shuffle service
283
- `spark.shuffle.service.index.cache.size` - Size of index cache
284
- `spark.shuffle.service.db.backend` - Database backend for executor registration
285
- `spark.shuffle.maxChunksBeingTransferred` - Maximum chunks being transferred simultaneously
286
287
### Error Handling and Monitoring
288
289
The server components provide comprehensive error handling and metrics:
290
291
1. **Metrics Collection**: Use `getAllMetrics()` to monitor server performance
292
2. **Resource Cleanup**: Properly handle application and executor removal
293
3. **File Management**: Automatic cleanup of local directories when configured
294
4. **Exception Handling**: Robust error handling for corrupt or missing files
295
5. **Authentication**: Integration with SASL authentication for secure operations
296
297
### Block Resolution Process
298
299
The block resolution process follows these steps:
300
301
1. **Registration**: Executors register their shuffle configuration with the server
302
2. **Block Request**: Clients request specific blocks using shuffle/map/reduce IDs
303
3. **File Location**: Server resolves block IDs to physical file locations
304
4. **Index Lookup**: Use shuffle index files to find byte ranges for blocks
305
5. **Data Retrieval**: Read the specific byte range from shuffle data files
306
6. **Buffer Management**: Return data as ManagedBuffer for efficient memory handling