0
# Server-Side Block Management
1
2
The server-side components handle incoming shuffle requests, manage executor registrations, and resolve shuffle blocks on the local filesystem. These components run as part of the external shuffle service.
3
4
## Block Handler
5
6
### ExternalShuffleBlockHandler
7
8
```java { .api }
9
public class ExternalShuffleBlockHandler extends RpcHandler {
10
public ExternalShuffleBlockHandler(
11
TransportConf conf,
12
File registeredExecutorFile
13
) throws IOException;
14
15
public ExternalShuffleBlockHandler(
16
OneForOneStreamManager streamManager,
17
ExternalShuffleBlockResolver blockManager
18
);
19
20
public void receive(
21
TransportClient client,
22
ByteBuffer message,
23
RpcResponseCallback callback
24
);
25
26
public StreamManager getStreamManager();
27
}
28
```
29
30
RPC handler for the external shuffle service that processes client requests for shuffle blocks.
31
32
**Constructor Parameters:**
33
- `conf` (TransportConf): Network transport configuration
34
- `registeredExecutorFile` (File): File used for persistent executor registration storage
35
- `streamManager` (OneForOneStreamManager): Stream manager for block transfers (testing constructor)
36
- `blockManager` (ExternalShuffleBlockResolver): Block resolver instance (testing constructor)
37
38
**Key Methods:**
39
40
#### receive
41
42
Processes incoming RPC messages from shuffle clients. Handles executor registration, block open requests, and other protocol messages.
43
44
**Parameters:**
45
- `client` (TransportClient): Client connection that sent the message
46
- `message` (ByteBuffer): Serialized protocol message
47
- `callback` (RpcResponseCallback): Callback for sending response
48
49
#### getStreamManager
50
51
Returns the stream manager used for managing block data streams.
52
53
**Returns:**
54
- `StreamManager`: The stream manager instance
55
56
## Block Resolver
57
58
### ExternalShuffleBlockResolver
59
60
```java { .api }
61
public class ExternalShuffleBlockResolver {
62
public ExternalShuffleBlockResolver(
63
TransportConf conf,
64
File registeredExecutorFile
65
) throws IOException;
66
67
public void registerExecutor(
68
String appId,
69
String execId,
70
ExecutorShuffleInfo executorInfo
71
);
72
73
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
74
75
public ManagedBuffer getBlockData(
76
String appId,
77
String execId,
78
String blockId
79
) throws IOException;
80
81
public void close();
82
}
83
```
84
85
Manages the mapping between shuffle block IDs and physical file segments on the local filesystem. Handles executor registration and block location resolution.
86
87
**Constructor Parameters:**
88
- `conf` (TransportConf): Transport configuration for the resolver
89
- `registeredExecutorFile` (File): File for persisting executor registrations across restarts
90
91
**Throws:**
92
- `IOException`: If unable to initialize persistent storage
93
94
**Key Methods:**
95
96
#### registerExecutor
97
98
Registers an executor's shuffle configuration, storing information about where it writes shuffle files.
99
100
**Parameters:**
101
- `appId` (String): Spark application ID
102
- `execId` (String): Executor ID
103
- `executorInfo` (ExecutorShuffleInfo): Configuration describing shuffle file locations
104
105
#### applicationRemoved
106
107
Cleans up data for a removed Spark application, optionally removing local shuffle directories.
108
109
**Parameters:**
110
- `appId` (String): Application ID to clean up
111
- `cleanupLocalDirs` (boolean): Whether to delete local shuffle directories
112
113
#### getBlockData
114
115
Retrieves shuffle block data from the local filesystem.
116
117
**Parameters:**
118
- `appId` (String): Application ID that owns the block
119
- `execId` (String): Executor ID that wrote the block
120
- `blockId` (String): Block identifier to retrieve
121
122
**Returns:**
123
- `ManagedBuffer`: Buffer containing the block data
124
125
**Throws:**
126
- `IOException`: If block cannot be found or read
127
128
#### close
129
130
Closes the resolver and releases resources including persistent storage connections.
131
132
## Internal Components
133
134
### AppExecId
135
136
```java { .api }
137
public static class AppExecId {
138
public final String appId;
139
public final String execId;
140
141
public AppExecId(String appId, String execId);
142
143
public boolean equals(Object other);
144
public int hashCode();
145
public String toString();
146
}
147
```
148
149
Internal identifier class combining application and executor IDs for tracking registered executors.
150
151
## Usage Examples
152
153
### Basic Server Setup
154
155
```java
156
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
157
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
158
import org.apache.spark.network.util.TransportConf;
159
import java.io.File;
160
161
// Create transport configuration
162
TransportConf conf = new TransportConf("shuffle", new SparkConf());
163
164
// Create block handler with persistent storage
165
File registrationFile = new File("/tmp/spark-shuffle-registrations");
166
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(
167
conf,
168
registrationFile
169
);
170
171
// The handler can now be used with a TransportServer
172
```
173
174
### Custom Server Setup
175
176
```java
177
import org.apache.spark.network.server.OneForOneStreamManager;
178
179
// Create components separately for testing or custom configuration
180
OneForOneStreamManager streamManager = new OneForOneStreamManager();
181
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(
182
conf,
183
new File("/tmp/registrations")
184
);
185
186
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(
187
streamManager,
188
resolver
189
);
190
```
191
192
### Manual Block Resolution
193
194
```java
195
// Register an executor manually
196
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
197
new String[]{"/tmp/spark-local-dir1", "/tmp/spark-local-dir2"},
198
64, // subdirs per local dir
199
"org.apache.spark.shuffle.sort.SortShuffleManager"
200
);
201
202
resolver.registerExecutor("app-123", "executor-1", executorInfo);
203
204
// Later, retrieve a block
205
try {
206
ManagedBuffer blockData = resolver.getBlockData(
207
"app-123",
208
"executor-1",
209
"shuffle_1_2_0"
210
);
211
212
System.out.println("Block data size: " + blockData.size());
213
214
// Process block data
215
byte[] data = new byte[(int) blockData.size()];
216
blockData.nioByteBuffer().get(data);
217
218
// Release buffer when done
219
blockData.release();
220
221
} catch (IOException e) {
222
System.err.println("Failed to retrieve block: " + e.getMessage());
223
}
224
```
225
226
### Application Cleanup
227
228
```java
229
// Clean up after application completion
230
resolver.applicationRemoved("app-123", true); // true = cleanup local directories
231
232
// Or clean up without removing directories (for debugging)
233
resolver.applicationRemoved("app-123", false);
234
```
235
236
### Integration with Transport Server
237
238
```java
239
import org.apache.spark.network.TransportContext;
240
import org.apache.spark.network.server.TransportServer;
241
242
// Create transport context with the block handler
243
TransportContext context = new TransportContext(conf, handler);
244
245
// Create and start server
246
TransportServer server = context.createServer(7337, Collections.emptyList());
247
System.out.println("Shuffle service started on port 7337");
248
249
// Server will now handle incoming shuffle client requests
250
// Remember to close when done:
251
// server.close();
252
// resolver.close();
253
```
254
255
## Persistence and Recovery
256
257
The block resolver uses LevelDB for persistent storage of executor registrations. This enables recovery of executor metadata across service restarts.
258
259
**Persistent Storage:**
260
- Executor registrations survive service restarts
261
- Block locations are reconstructed from stored metadata
262
- Cleanup operations are reflected in persistent state
263
264
**Recovery Behavior:**
265
- On startup, previously registered executors are restored
266
- Block requests can be served immediately after restart
267
- No need to re-register executors unless shuffle files have moved
268
269
## Error Handling
270
271
Common error scenarios:
272
- **Block Not Found**: Requested block doesn't exist on filesystem
273
- **Executor Not Registered**: Attempt to fetch blocks from unregistered executor
274
- **IO Errors**: Filesystem permission issues or disk failures
275
- **Corruption**: Persistent storage corruption requiring reconstruction
276
277
```java
278
try {
279
ManagedBuffer block = resolver.getBlockData("app-1", "exec-1", "shuffle_1_0_0");
280
// Process block...
281
} catch (IOException e) {
282
if (e.getMessage().contains("not found")) {
283
// Handle missing block
284
System.err.println("Block not found, may have been cleaned up");
285
} else {
286
// Handle other IO errors
287
System.err.println("IO error reading block: " + e.getMessage());
288
}
289
}
290
```