0
# Block Resolution
1
2
Block resolver functionality for managing executor metadata, locating shuffle files on disk, and providing shuffle block data access.
3
4
## Capabilities
5
6
### ExternalShuffleBlockResolver
7
8
Core component that manages executor metadata and resolves shuffle block locations on the file system.
9
10
```java { .api }
11
/**
12
* Manages executor metadata and resolves shuffle block locations on disk.
13
* Handles registration, cleanup, and block data retrieval for the external shuffle service.
14
*/
15
public class ExternalShuffleBlockResolver {
16
/**
17
* Creates a block resolver with specified configuration and executor file.
18
*
19
* @param conf transport configuration
20
* @param registeredExecutorFile file for persisting executor registrations across restarts
21
* @throws IOException if initialization fails
22
*/
23
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;
24
25
/**
26
* Creates a block resolver with custom executor and cleanup configuration.
27
*
28
* @param conf transport configuration
29
* @param registeredExecutorFile file for persisting executor registrations
30
* @param directoryCleaner executor for cleaning up directories
31
* @throws IOException if initialization fails
32
*/
33
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile,
34
Executor directoryCleaner) throws IOException;
35
36
/**
37
* Gets the number of currently registered executors.
38
*
39
* @return count of registered executors
40
*/
41
public int getRegisteredExecutorsSize();
42
43
/**
44
* Registers a new executor with all configuration needed to find shuffle files.
45
* Stores executor metadata in memory and persists to disk.
46
*
47
* @param appId application identifier
48
* @param execId executor identifier
49
* @param executorInfo shuffle configuration for this executor
50
*/
51
public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
52
53
/**
54
* Retrieves shuffle block data from disk based on block ID.
55
* Supports both hash-based and sort-based shuffle formats.
56
*
57
* @param appId application identifier
58
* @param execId executor identifier
59
* @param blockId shuffle block identifier (format: shuffle_shuffleId_mapId_reduceId)
60
* @return managed buffer containing block data
61
* @throws IllegalArgumentException if block ID format is invalid
62
*/
63
public ManagedBuffer getBlockData(String appId, String execId, String blockId);
64
65
/**
66
* Removes application metadata and optionally cleans up local directories.
67
* Called when application terminates.
68
*
69
* @param appId application identifier
70
* @param cleanupLocalDirs whether to delete local shuffle directories
71
*/
72
public void applicationRemoved(String appId, boolean cleanupLocalDirs);
73
74
/**
75
* Closes the resolver and releases resources.
76
*/
77
public void close();
78
}
79
```
80
81
**Usage Examples:**
82
83
```java
84
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
85
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
86
import org.apache.spark.network.util.TransportConf;
87
import org.apache.spark.network.buffer.ManagedBuffer;
88
import java.io.File;
89
90
// Create resolver
91
TransportConf conf = new TransportConf("shuffle");
92
File executorFile = new File("/var/spark/shuffle/executors.ldb");
93
ExternalShuffleBlockResolver resolver;
94
95
try {
96
resolver = new ExternalShuffleBlockResolver(conf, executorFile);
97
System.out.println("Block resolver initialized");
98
} catch (IOException e) {
99
System.err.println("Failed to create resolver: " + e.getMessage());
100
return;
101
}
102
103
// Register executor
104
String appId = "app-20230101-1234";
105
String execId = "executor-1";
106
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
107
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
108
localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");
109
110
resolver.registerExecutor(appId, execId, executorInfo);
111
System.out.println("Registered executor: " + execId);
112
System.out.println("Total executors: " + resolver.getRegisteredExecutorsSize());
113
114
// Retrieve block data
115
String blockId = "shuffle_0_1_0"; // shuffleId=0, mapId=1, reduceId=0
116
try {
117
ManagedBuffer blockData = resolver.getBlockData(appId, execId, blockId);
118
System.out.println("Retrieved block " + blockId + " (" + blockData.size() + " bytes)");
119
120
// Process block data
121
// Note: buffer will be automatically cleaned up
122
} catch (IllegalArgumentException e) {
123
System.err.println("Invalid block ID: " + e.getMessage());
124
}
125
126
// Clean up application
127
resolver.applicationRemoved(appId, true); // Delete local directories
128
System.out.println("Application removed: " + appId);
129
130
// Shutdown
131
resolver.close();
132
```
133
134
### AppExecId Composite Key
135
136
Composite key class for uniquely identifying executor registrations.
137
138
```java { .api }
139
/**
140
* Composite key combining application ID and executor ID.
141
* Used for uniquely identifying registered executors.
142
*/
143
public static class AppExecId {
144
public final String appId;
145
public final String execId;
146
147
/**
148
* Creates a composite key from application and executor IDs.
149
*/
150
public AppExecId(String appId, String execId);
151
152
@Override
153
public boolean equals(Object o);
154
155
@Override
156
public int hashCode();
157
158
@Override
159
public String toString();
160
}
161
```
162
163
### Shuffle Index Management
164
165
The resolver manages shuffle index information for efficient block lookups.
166
167
```java { .api }
168
/**
169
* Manages shuffle index file information to avoid repeated file operations.
170
*/
171
public class ShuffleIndexInformation {
172
/**
173
* Creates index information from index file.
174
*
175
* @param indexFile the shuffle index file
176
* @throws IOException if file cannot be read
177
*/
178
public ShuffleIndexInformation(File indexFile) throws IOException;
179
180
/**
181
* Gets index record for the specified reduce partition.
182
*
183
* @param reducer the reduce partition ID
184
* @return index record with offset and length
185
*/
186
public ShuffleIndexRecord getIndex(int reducer);
187
188
/**
189
* Gets the total number of partitions in this shuffle.
190
*/
191
public int getNumPartitions();
192
}
193
194
/**
195
* Represents a single shuffle index record.
196
*/
197
public class ShuffleIndexRecord {
198
/**
199
* Creates an index record with offset and length.
200
*/
201
public ShuffleIndexRecord(long offset, int length);
202
203
/**
204
* Gets the byte offset of this partition's data in the shuffle file.
205
*/
206
public long getOffset();
207
208
/**
209
* Gets the length in bytes of this partition's data.
210
*/
211
public int getLength();
212
}
213
```
214
215
**Index Usage Example:**
216
217
```java
218
// Resolver internally uses index information like this:
219
File indexFile = getShuffleIndexFile(appId, execId, shuffleId, mapId);
220
ShuffleIndexInformation indexInfo = new ShuffleIndexInformation(indexFile);
221
222
// Get specific partition data
223
int reduceId = 0;
224
ShuffleIndexRecord record = indexInfo.getIndex(reduceId);
225
long offset = record.getOffset();
226
int length = record.getLength();
227
228
// Create buffer for partition data
229
File dataFile = getShuffleDataFile(appId, execId, shuffleId, mapId);
230
ManagedBuffer buffer = new FileSegmentManagedBuffer(conf, dataFile, offset, length);
231
```
232
233
### Block ID Format and Parsing
234
235
The resolver handles different shuffle block ID formats:
236
237
```java
238
/**
239
* Block ID Format: shuffle_shuffleId_mapId_reduceId
240
* Examples:
241
* - shuffle_0_1_0: shuffle 0, map task 1, reduce partition 0
242
* - shuffle_2_5_3: shuffle 2, map task 5, reduce partition 3
243
*/
244
245
// Block ID parsing logic (internal)
246
String[] blockIdParts = blockId.split("_");
247
if (blockIdParts.length < 4) {
248
throw new IllegalArgumentException("Unexpected block id format: " + blockId);
249
}
250
if (!blockIdParts[0].equals("shuffle")) {
251
throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
252
}
253
254
int shuffleId = Integer.parseInt(blockIdParts[1]);
255
int mapId = Integer.parseInt(blockIdParts[2]);
256
int reduceId = Integer.parseInt(blockIdParts[3]);
257
```
258
259
### File System Layout
260
261
The resolver expects shuffle files to follow Spark's standard layout:
262
263
```
264
<localDir>/<subDir>/
265
├── shuffle_<shuffleId>_<mapId>_0.data # Shuffle data file
266
├── shuffle_<shuffleId>_<mapId>_0.index # Shuffle index file
267
└── ...
268
269
Where:
270
- <localDir>: One of the directories specified in ExecutorShuffleInfo.localDirs
271
- <subDir>: Numbered 0 to (subDirsPerLocalDir - 1)
272
- Files are distributed across subdirectories using hash(mapId) % subDirsPerLocalDir
273
```
274
275
**File Resolution Example:**
276
277
```java
278
// How resolver locates shuffle files (simplified):
279
public File getShuffleDataFile(String appId, String execId, int shuffleId, int mapId) {
280
ExecutorShuffleInfo info = executors.get(new AppExecId(appId, execId));
281
if (info == null) {
282
throw new RuntimeException("Executor not registered: " + execId);
283
}
284
285
// Determine which local directory and subdirectory
286
int dirId = mapId % info.localDirs.length;
287
int subDirId = (mapId / info.localDirs.length) % info.subDirsPerLocalDir;
288
289
String localDir = info.localDirs[dirId];
290
String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.data";
291
292
return new File(localDir, subDirId + "/" + fileName);
293
}
294
```
295
296
## Error Handling
297
298
The resolver handles various error conditions:
299
300
- **IllegalArgumentException**: Invalid block IDs, malformed shuffle identifiers
301
- **RuntimeException**: Executor not registered, missing shuffle files
302
- **IOException**: File system errors, corrupt index files
303
- **NumberFormatException**: Invalid numeric components in block IDs
304
305
**Error Handling Example:**
306
307
```java
308
try {
309
ManagedBuffer block = resolver.getBlockData(appId, execId, blockId);
310
// Process block
311
} catch (IllegalArgumentException e) {
312
System.err.println("Invalid block ID format: " + e.getMessage());
313
} catch (RuntimeException e) {
314
System.err.println("Block resolution failed: " + e.getMessage());
315
// May indicate executor not registered or missing files
316
}
317
```
318
319
## Persistence and Recovery
320
321
The resolver persists executor registrations to support service restarts:
322
323
```java
324
// Registrations are stored in LevelDB format
325
// Key: "AppExecShuffleInfo" + appId + "/" + execId
326
// Value: JSON serialized ExecutorShuffleInfo
327
328
// On startup, resolver automatically recovers registrations:
329
// 1. Open LevelDB database file
330
// 2. Scan for keys with APP_KEY_PREFIX
331
// 3. Deserialize ExecutorShuffleInfo objects
332
// 4. Restore in-memory executor map
333
```