or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdhandler.mdindex.mdmesos.mdprotocol.mdresolver.mdsecurity.md

resolver.mddocs/

0

# Block Resolution

1

2

Block resolver functionality for managing executor metadata, locating shuffle files on disk, and providing shuffle block data access.

3

4

## Capabilities

5

6

### ExternalShuffleBlockResolver

7

8

Core component that manages executor metadata and resolves shuffle block locations on the file system.

9

10

```java { .api }

11

/**

12

* Manages executor metadata and resolves shuffle block locations on disk.

13

* Handles registration, cleanup, and block data retrieval for the external shuffle service.

14

*/

15

public class ExternalShuffleBlockResolver {

16

/**

17

* Creates a block resolver with specified configuration and executor file.

18

*

19

* @param conf transport configuration

20

* @param registeredExecutorFile file for persisting executor registrations across restarts

21

* @throws IOException if initialization fails

22

*/

23

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;

24

25

/**

26

* Creates a block resolver with custom executor and cleanup configuration.

27

*

28

* @param conf transport configuration

29

* @param registeredExecutorFile file for persisting executor registrations

30

* @param directoryCleaner executor for cleaning up directories

31

* @throws IOException if initialization fails

32

*/

33

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile,

34

Executor directoryCleaner) throws IOException;

35

36

/**

37

* Gets the number of currently registered executors.

38

*

39

* @return count of registered executors

40

*/

41

public int getRegisteredExecutorsSize();

42

43

/**

44

* Registers a new executor with all configuration needed to find shuffle files.

45

* Stores executor metadata in memory and persists to disk.

46

*

47

* @param appId application identifier

48

* @param execId executor identifier

49

* @param executorInfo shuffle configuration for this executor

50

*/

51

public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

52

53

/**

54

* Retrieves shuffle block data from disk based on block ID.

55

* Supports both hash-based and sort-based shuffle formats.

56

*

57

* @param appId application identifier

58

* @param execId executor identifier

59

* @param blockId shuffle block identifier (format: shuffle_shuffleId_mapId_reduceId)

60

* @return managed buffer containing block data

61

* @throws IllegalArgumentException if block ID format is invalid

62

*/

63

public ManagedBuffer getBlockData(String appId, String execId, String blockId);

64

65

/**

66

* Removes application metadata and optionally cleans up local directories.

67

* Called when application terminates.

68

*

69

* @param appId application identifier

70

* @param cleanupLocalDirs whether to delete local shuffle directories

71

*/

72

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

73

74

/**

75

* Closes the resolver and releases resources.

76

*/

77

public void close();

78

}

79

```

80

81

**Usage Examples:**

82

83

```java

84

import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;

85

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

86

import org.apache.spark.network.util.TransportConf;

87

import org.apache.spark.network.buffer.ManagedBuffer;

88

import java.io.File;

89

90

// Create resolver

91

TransportConf conf = new TransportConf("shuffle");

92

File executorFile = new File("/var/spark/shuffle/executors.ldb");

93

ExternalShuffleBlockResolver resolver;

94

95

try {

96

resolver = new ExternalShuffleBlockResolver(conf, executorFile);

97

System.out.println("Block resolver initialized");

98

} catch (IOException e) {

99

System.err.println("Failed to create resolver: " + e.getMessage());

100

return;

101

}

102

103

// Register executor

104

String appId = "app-20230101-1234";

105

String execId = "executor-1";

106

String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};

107

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(

108

localDirs, 64, "org.apache.spark.shuffle.sort.SortShuffleManager");

109

110

resolver.registerExecutor(appId, execId, executorInfo);

111

System.out.println("Registered executor: " + execId);

112

System.out.println("Total executors: " + resolver.getRegisteredExecutorsSize());

113

114

// Retrieve block data

115

String blockId = "shuffle_0_1_0"; // shuffleId=0, mapId=1, reduceId=0

116

try {

117

ManagedBuffer blockData = resolver.getBlockData(appId, execId, blockId);

118

System.out.println("Retrieved block " + blockId + " (" + blockData.size() + " bytes)");

119

120

// Process block data

121

// Note: buffer will be automatically cleaned up

122

} catch (IllegalArgumentException e) {

123

System.err.println("Invalid block ID: " + e.getMessage());

124

}

125

126

// Clean up application

127

resolver.applicationRemoved(appId, true); // Delete local directories

128

System.out.println("Application removed: " + appId);

129

130

// Shutdown

131

resolver.close();

132

```

133

134

### AppExecId Composite Key

135

136

Composite key class for uniquely identifying executor registrations.

137

138

```java { .api }

139

/**

140

* Composite key combining application ID and executor ID.

141

* Used for uniquely identifying registered executors.

142

*/

143

public static class AppExecId {

144

public final String appId;

145

public final String execId;

146

147

/**

148

* Creates a composite key from application and executor IDs.

149

*/

150

public AppExecId(String appId, String execId);

151

152

@Override

153

public boolean equals(Object o);

154

155

@Override

156

public int hashCode();

157

158

@Override

159

public String toString();

160

}

161

```

162

163

### Shuffle Index Management

164

165

The resolver manages shuffle index information for efficient block lookups.

166

167

```java { .api }

168

/**

169

* Manages shuffle index file information to avoid repeated file operations.

170

*/

171

public class ShuffleIndexInformation {

172

/**

173

* Creates index information from index file.

174

*

175

* @param indexFile the shuffle index file

176

* @throws IOException if file cannot be read

177

*/

178

public ShuffleIndexInformation(File indexFile) throws IOException;

179

180

/**

181

* Gets index record for the specified reduce partition.

182

*

183

* @param reducer the reduce partition ID

184

* @return index record with offset and length

185

*/

186

public ShuffleIndexRecord getIndex(int reducer);

187

188

/**

189

* Gets the total number of partitions in this shuffle.

190

*/

191

public int getNumPartitions();

192

}

193

194

/**

195

* Represents a single shuffle index record.

196

*/

197

public class ShuffleIndexRecord {

198

/**

199

* Creates an index record with offset and length.

200

*/

201

public ShuffleIndexRecord(long offset, int length);

202

203

/**

204

* Gets the byte offset of this partition's data in the shuffle file.

205

*/

206

public long getOffset();

207

208

/**

209

* Gets the length in bytes of this partition's data.

210

*/

211

public int getLength();

212

}

213

```

214

215

**Index Usage Example:**

216

217

```java

218

// Resolver internally uses index information like this:

219

File indexFile = getShuffleIndexFile(appId, execId, shuffleId, mapId);

220

ShuffleIndexInformation indexInfo = new ShuffleIndexInformation(indexFile);

221

222

// Get specific partition data

223

int reduceId = 0;

224

ShuffleIndexRecord record = indexInfo.getIndex(reduceId);

225

long offset = record.getOffset();

226

int length = record.getLength();

227

228

// Create buffer for partition data

229

File dataFile = getShuffleDataFile(appId, execId, shuffleId, mapId);

230

ManagedBuffer buffer = new FileSegmentManagedBuffer(conf, dataFile, offset, length);

231

```

232

233

### Block ID Format and Parsing

234

235

The resolver handles different shuffle block ID formats:

236

237

```java

238

/**

239

* Block ID Format: shuffle_shuffleId_mapId_reduceId

240

* Examples:

241

* - shuffle_0_1_0: shuffle 0, map task 1, reduce partition 0

242

* - shuffle_2_5_3: shuffle 2, map task 5, reduce partition 3

243

*/

244

245

// Block ID parsing logic (internal)

246

String[] blockIdParts = blockId.split("_");

247

if (blockIdParts.length < 4) {

248

throw new IllegalArgumentException("Unexpected block id format: " + blockId);

249

}

250

if (!blockIdParts[0].equals("shuffle")) {

251

throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);

252

}

253

254

int shuffleId = Integer.parseInt(blockIdParts[1]);

255

int mapId = Integer.parseInt(blockIdParts[2]);

256

int reduceId = Integer.parseInt(blockIdParts[3]);

257

```

258

259

### File System Layout

260

261

The resolver expects shuffle files to follow Spark's standard layout:

262

263

```

264

<localDir>/<subDir>/

265

├── shuffle_<shuffleId>_<mapId>_0.data # Shuffle data file

266

├── shuffle_<shuffleId>_<mapId>_0.index # Shuffle index file

267

└── ...

268

269

Where:

270

- <localDir>: One of the directories specified in ExecutorShuffleInfo.localDirs

271

- <subDir>: Numbered 0 to (subDirsPerLocalDir - 1)

272

- Files are distributed across subdirectories using hash(mapId) % subDirsPerLocalDir

273

```

274

275

**File Resolution Example:**

276

277

```java

278

// How resolver locates shuffle files (simplified):

279

public File getShuffleDataFile(String appId, String execId, int shuffleId, int mapId) {

280

ExecutorShuffleInfo info = executors.get(new AppExecId(appId, execId));

281

if (info == null) {

282

throw new RuntimeException("Executor not registered: " + execId);

283

}

284

285

// Determine which local directory and subdirectory

286

int dirId = mapId % info.localDirs.length;

287

int subDirId = (mapId / info.localDirs.length) % info.subDirsPerLocalDir;

288

289

String localDir = info.localDirs[dirId];

290

String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.data";

291

292

return new File(localDir, subDirId + "/" + fileName);

293

}

294

```

295

296

## Error Handling

297

298

The resolver handles various error conditions:

299

300

- **IllegalArgumentException**: Invalid block IDs, malformed shuffle identifiers

301

- **RuntimeException**: Executor not registered, missing shuffle files

302

- **IOException**: File system errors, corrupt index files

303

- **NumberFormatException**: Invalid numeric components in block IDs

304

305

**Error Handling Example:**

306

307

```java

308

try {

309

ManagedBuffer block = resolver.getBlockData(appId, execId, blockId);

310

// Process block

311

} catch (IllegalArgumentException e) {

312

System.err.println("Invalid block ID format: " + e.getMessage());

313

} catch (RuntimeException e) {

314

System.err.println("Block resolution failed: " + e.getMessage());

315

// May indicate executor not registered or missing files

316

}

317

```

318

319

## Persistence and Recovery

320

321

The resolver persists executor registrations to support service restarts:

322

323

```java

324

// Registrations are stored in LevelDB format

325

// Key: "AppExecShuffleInfo" + appId + "/" + execId

326

// Value: JSON serialized ExecutorShuffleInfo

327

328

// On startup, resolver automatically recovers registrations:

329

// 1. Open LevelDB database file

330

// 2. Scan for keys with APP_KEY_PREFIX

331

// 3. Deserialize ExecutorShuffleInfo objects

332

// 4. Restore in-memory executor map

333

```