0
# Server APIs
1
2
Server-side components for handling shuffle block requests, resolving block locations, managing executor registrations, and serving shuffle data to clients.
3
4
## Capabilities
5
6
### External Shuffle Block Handler
7
8
RPC handler for serving shuffle blocks from outside the executor process, managing client requests and coordinating with block resolver.
9
10
```java { .api }
11
/**
12
* RPC Handler for serving shuffle blocks from outside executor process
13
*/
14
public class ExternalShuffleBlockHandler extends RpcHandler {
15
/**
16
* Create block handler with configuration and executor registry
17
* @param conf - Transport configuration
18
* @param registeredExecutorFile - File for persisting executor registrations
19
* @throws IOException if handler initialization fails
20
*/
21
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;
22
23
/**
24
* Create block handler with custom stream manager and block resolver (for testing)
25
* @param streamManager - Stream manager for handling block streaming
26
* @param blockManager - Block resolver for locating blocks
27
*/
28
@VisibleForTesting
29
public ExternalShuffleBlockHandler(OneForOneStreamManager streamManager,
30
ExternalShuffleBlockResolver blockManager);
31
32
/**
33
* Handle incoming RPC requests from shuffle clients
34
* @param client - Transport client that sent the request
35
* @param message - Request message as byte buffer
36
* @param callback - Callback for sending response
37
*/
38
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);
39
40
/**
41
* Get stream manager for block streaming operations
42
* @return StreamManager instance
43
*/
44
public StreamManager getStreamManager();
45
46
/**
47
* Handle application removal and cleanup
48
* @param appId - Application identifier to remove
49
* @param cleanupLocalDirs - Whether to cleanup local directories
50
*/
51
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
52
53
/**
54
* Re-register executor with updated configuration
55
* @param appExecId - Application and executor identifier
56
* @param executorInfo - Updated executor configuration
57
*/
58
public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo);
59
60
/**
61
* Close handler and cleanup resources
62
*/
63
public void close();
64
}
65
```
66
67
**Usage Example:**
68
69
```java
70
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
71
import org.apache.spark.network.util.TransportConf;
72
73
// Create transport configuration
74
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
75
76
// Create registered executor file for persistence
77
File registeredExecutorFile = new File("/tmp/spark-shuffle/registered-executors.db");
78
79
// Initialize block handler
80
ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(conf, registeredExecutorFile);
81
82
// Use handler in transport server
83
TransportContext context = new TransportContext(conf, handler);
84
TransportServer server = context.createServer(7337, Collections.emptyList());
85
```
86
87
### External Shuffle Block Resolver
88
89
Manages the conversion of shuffle block IDs into physical segments of local files, handling executor registration and block location resolution.
90
91
```java { .api }
92
/**
93
* Manages converting shuffle BlockIds into physical segments of local files
94
*/
95
public class ExternalShuffleBlockResolver {
96
/**
97
* Create block resolver with configuration and executor registry
98
* @param conf - Transport configuration
99
* @param registeredExecutorFile - File for persisting executor registrations
100
* @throws IOException if resolver initialization fails
101
*/
102
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;
103
104
/**
105
* Register executor configuration for block resolution
106
* @param appId - Application identifier
107
* @param execId - Executor identifier
108
* @param executorInfo - Executor configuration including local directories
109
*/
110
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
111
112
/**
113
* Get block data for specified block identifier
114
* @param appId - Application identifier
115
* @param execId - Executor identifier
116
* @param blockId - Block identifier (e.g., "shuffle_0_1_2")
117
* @return Managed buffer containing block data
118
*/
119
public ManagedBuffer getBlockData(String appId, String execId, String blockId);
120
121
/**
122
* Handle application removal and cleanup
123
* @param appId - Application identifier to remove
124
* @param cleanupLocalDirs - Whether to cleanup local directories
125
*/
126
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
127
}
128
```
129
130
**Usage Example:**
131
132
```java
133
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
134
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
135
import org.apache.spark.network.buffer.ManagedBuffer;
136
137
// Create block resolver
138
File registeredExecutorFile = new File("/tmp/spark-shuffle/registered-executors.db");
139
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, registeredExecutorFile);
140
141
// Register executor configuration
142
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
143
new String[]{"/tmp/spark-local-1", "/tmp/spark-local-2"},
144
64,
145
"org.apache.spark.shuffle.sort.SortShuffleManager"
146
);
147
resolver.registerExecutor("app-123", "executor-1", executorInfo);
148
149
// Retrieve block data
150
ManagedBuffer blockData = resolver.getBlockData("app-123", "executor-1", "shuffle_0_1_2");
151
try {
152
// Process block data
153
ByteBuffer buffer = blockData.nioByteBuffer();
154
// ... handle shuffle data
155
} finally {
156
blockData.release(); // Important: release the buffer
157
}
158
```
159
160
## Types
161
162
### Application Executor Identifier
163
164
Identifier pair for application and executor used throughout the block resolution system.
165
166
```java { .api }
167
/**
168
* Application and executor identifier pair
169
*/
170
public static class AppExecId {
171
/** Application identifier */
172
public final String appId;
173
/** Executor identifier */
174
public final String execId;
175
176
/**
177
* Create application executor identifier
178
* @param appId - Application identifier
179
* @param execId - Executor identifier
180
*/
181
public AppExecId(String appId, String execId);
182
183
/**
184
* Check equality with another AppExecId
185
* @param o - Object to compare
186
* @return true if equal
187
*/
188
public boolean equals(Object o);
189
190
/**
191
* Generate hash code for this identifier
192
* @return hash code
193
*/
194
public int hashCode();
195
196
/**
197
* String representation of this identifier
198
* @return string representation
199
*/
200
public String toString();
201
}
202
```
203
204
### Store Version
205
206
Version information for persistent storage format used by the block resolver.
207
208
```java { .api }
209
/**
210
* Version information for persistent storage
211
*/
212
public static class StoreVersion {
213
/** Major version number */
214
public final int major;
215
/** Minor version number */
216
public final int minor;
217
218
/**
219
* Create store version
220
* @param major - Major version number
221
* @param minor - Minor version number
222
*/
223
public StoreVersion(int major, int minor);
224
225
/**
226
* Check equality with another StoreVersion
227
* @param o - Object to compare
228
* @return true if equal
229
*/
230
public boolean equals(Object o);
231
232
/**
233
* Generate hash code for this version
234
* @return hash code
235
*/
236
public int hashCode();
237
}
238
```
239
240
**Usage Example:**
241
242
```java
243
// Create application executor identifier
244
AppExecId appExecId = new AppExecId("spark-app-123", "executor-1");
245
246
// Use in executor registration
247
resolver.reregisterExecutor(appExecId, updatedExecutorInfo);
248
249
// Store version for compatibility checking
250
StoreVersion currentVersion = new StoreVersion(1, 0);
251
```