or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-apis.mdindex.mdprotocol-messages.mdsecurity.mdserver-apis.md

server-apis.mddocs/

0

# Server APIs

1

2

Server-side components for handling shuffle block requests, resolving block locations, managing executor registrations, and serving shuffle data to clients.

3

4

## Capabilities

5

6

### External Shuffle Block Handler

7

8

RPC handler for serving shuffle blocks from outside the executor process, managing client requests and coordinating with block resolver.

9

10

```java { .api }

11

/**

12

* RPC Handler for serving shuffle blocks from outside executor process

13

*/

14

public class ExternalShuffleBlockHandler extends RpcHandler {

15

/**

16

* Create block handler with configuration and executor registry

17

* @param conf - Transport configuration

18

* @param registeredExecutorFile - File for persisting executor registrations

19

* @throws IOException if handler initialization fails

20

*/

21

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException;

22

23

/**

24

* Create block handler with custom stream manager and block resolver (for testing)

25

* @param streamManager - Stream manager for handling block streaming

26

* @param blockManager - Block resolver for locating blocks

27

*/

28

@VisibleForTesting

29

public ExternalShuffleBlockHandler(OneForOneStreamManager streamManager,

30

ExternalShuffleBlockResolver blockManager);

31

32

/**

33

* Handle incoming RPC requests from shuffle clients

34

* @param client - Transport client that sent the request

35

* @param message - Request message as byte buffer

36

* @param callback - Callback for sending response

37

*/

38

public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback);

39

40

/**

41

* Get stream manager for block streaming operations

42

* @return StreamManager instance

43

*/

44

public StreamManager getStreamManager();

45

46

/**

47

* Handle application removal and cleanup

48

* @param appId - Application identifier to remove

49

* @param cleanupLocalDirs - Whether to cleanup local directories

50

*/

51

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

52

53

/**

54

* Re-register executor with updated configuration

55

* @param appExecId - Application and executor identifier

56

* @param executorInfo - Updated executor configuration

57

*/

58

public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo);

59

60

/**

61

* Close handler and cleanup resources

62

*/

63

public void close();

64

}

65

```

66

67

**Usage Example:**

68

69

```java

70

import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;

71

import org.apache.spark.network.util.TransportConf;

72

73

// Create transport configuration

74

TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());

75

76

// Create registered executor file for persistence

77

File registeredExecutorFile = new File("/tmp/spark-shuffle/registered-executors.db");

78

79

// Initialize block handler

80

ExternalShuffleBlockHandler handler = new ExternalShuffleBlockHandler(conf, registeredExecutorFile);

81

82

// Use handler in transport server

83

TransportContext context = new TransportContext(conf, handler);

84

TransportServer server = context.createServer(7337, Collections.emptyList());

85

```

86

87

### External Shuffle Block Resolver

88

89

Manages the conversion of shuffle block IDs into physical segments of local files, handling executor registration and block location resolution.

90

91

```java { .api }

92

/**

93

* Manages converting shuffle BlockIds into physical segments of local files

94

*/

95

public class ExternalShuffleBlockResolver {

96

/**

97

* Create block resolver with configuration and executor registry

98

* @param conf - Transport configuration

99

* @param registeredExecutorFile - File for persisting executor registrations

100

* @throws IOException if resolver initialization fails

101

*/

102

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException;

103

104

/**

105

* Register executor configuration for block resolution

106

* @param appId - Application identifier

107

* @param execId - Executor identifier

108

* @param executorInfo - Executor configuration including local directories

109

*/

110

public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

111

112

/**

113

* Get block data for specified block identifier

114

* @param appId - Application identifier

115

* @param execId - Executor identifier

116

* @param blockId - Block identifier (e.g., "shuffle_0_1_2")

117

* @return Managed buffer containing block data

118

*/

119

public ManagedBuffer getBlockData(String appId, String execId, String blockId);

120

121

/**

122

* Handle application removal and cleanup

123

* @param appId - Application identifier to remove

124

* @param cleanupLocalDirs - Whether to cleanup local directories

125

*/

126

public void applicationRemoved(String appId, boolean cleanupLocalDirs);

127

}

128

```

129

130

**Usage Example:**

131

132

```java

133

import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;

134

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

135

import org.apache.spark.network.buffer.ManagedBuffer;

136

137

// Create block resolver

138

File registeredExecutorFile = new File("/tmp/spark-shuffle/registered-executors.db");

139

ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, registeredExecutorFile);

140

141

// Register executor configuration

142

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(

143

new String[]{"/tmp/spark-local-1", "/tmp/spark-local-2"},

144

64,

145

"org.apache.spark.shuffle.sort.SortShuffleManager"

146

);

147

resolver.registerExecutor("app-123", "executor-1", executorInfo);

148

149

// Retrieve block data

150

ManagedBuffer blockData = resolver.getBlockData("app-123", "executor-1", "shuffle_0_1_2");

151

try {

152

// Process block data

153

ByteBuffer buffer = blockData.nioByteBuffer();

154

// ... handle shuffle data

155

} finally {

156

blockData.release(); // Important: release the buffer

157

}

158

```

159

160

## Types

161

162

### Application Executor Identifier

163

164

Identifier pair for application and executor used throughout the block resolution system.

165

166

```java { .api }

167

/**

168

* Application and executor identifier pair

169

*/

170

public static class AppExecId {

171

/** Application identifier */

172

public final String appId;

173

/** Executor identifier */

174

public final String execId;

175

176

/**

177

* Create application executor identifier

178

* @param appId - Application identifier

179

* @param execId - Executor identifier

180

*/

181

public AppExecId(String appId, String execId);

182

183

/**

184

* Check equality with another AppExecId

185

* @param o - Object to compare

186

* @return true if equal

187

*/

188

public boolean equals(Object o);

189

190

/**

191

* Generate hash code for this identifier

192

* @return hash code

193

*/

194

public int hashCode();

195

196

/**

197

* String representation of this identifier

198

* @return string representation

199

*/

200

public String toString();

201

}

202

```

203

204

### Store Version

205

206

Version information for persistent storage format used by the block resolver.

207

208

```java { .api }

209

/**

210

* Version information for persistent storage

211

*/

212

public static class StoreVersion {

213

/** Major version number */

214

public final int major;

215

/** Minor version number */

216

public final int minor;

217

218

/**

219

* Create store version

220

* @param major - Major version number

221

* @param minor - Minor version number

222

*/

223

public StoreVersion(int major, int minor);

224

225

/**

226

* Check equality with another StoreVersion

227

* @param o - Object to compare

228

* @return true if equal

229

*/

230

public boolean equals(Object o);

231

232

/**

233

* Generate hash code for this version

234

* @return hash code

235

*/

236

public int hashCode();

237

}

238

```

239

240

**Usage Example:**

241

242

```java

243

// Create application executor identifier

244

AppExecId appExecId = new AppExecId("spark-app-123", "executor-1");

245

246

// Use in executor registration

247

resolver.reregisterExecutor(appExecId, updatedExecutorInfo);

248

249

// Store version for compatibility checking

250

StoreVersion currentVersion = new StoreVersion(1, 0);

251

```