or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdprotocol-messages.mdsecurity.mdserver-components.mdshuffle-client.md

protocol-messages.mddocs/

0

# Network Protocol Messages

1

2

The protocol message classes define the communication format between shuffle clients and servers. All messages extend `BlockTransferMessage` and support serialization to/from byte buffers for network transmission.

3

4

## Base Protocol Class

5

6

### BlockTransferMessage

7

8

```java { .api }

9

public abstract class BlockTransferMessage implements Encodable {

10

protected abstract Type type();

11

12

public ByteBuffer toByteBuffer();

13

14

public static class Decoder {

15

public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);

16

}

17

18

public static enum Type {

19

OPEN_BLOCKS(0),

20

UPLOAD_BLOCK(1),

21

REGISTER_EXECUTOR(2),

22

STREAM_HANDLE(3),

23

REGISTER_DRIVER(4);

24

25

public byte id();

26

}

27

}

28

```

29

30

Abstract base class for all network protocol messages. Provides serialization and deserialization capabilities.

31

32

**Key Methods:**

33

34

#### toByteBuffer

35

36

Serializes the message to a byte buffer for network transmission.

37

38

**Returns:**

39

- `ByteBuffer`: Serialized message with type byte prefix

40

41

#### fromByteBuffer (static)

42

43

Deserializes a message from a byte buffer received from the network.

44

45

**Parameters:**

46

- `msg` (ByteBuffer): Serialized message data

47

48

**Returns:**

49

- `BlockTransferMessage`: Deserialized message instance

50

51

**Throws:**

52

- `IllegalArgumentException`: If message type is unknown

53

54

## Configuration Messages

55

56

### ExecutorShuffleInfo

57

58

```java { .api }

59

public class ExecutorShuffleInfo implements Encodable {

60

public final String[] localDirs;

61

public final int subDirsPerLocalDir;

62

public final String shuffleManager;

63

64

public ExecutorShuffleInfo(

65

String[] localDirs,

66

int subDirsPerLocalDir,

67

String shuffleManager

68

);

69

70

public boolean equals(Object other);

71

public int hashCode();

72

public String toString();

73

74

public int encodedLength();

75

public void encode(ByteBuf buf);

76

public static ExecutorShuffleInfo decode(ByteBuf buf);

77

}

78

```

79

80

Configuration data describing where an executor stores its shuffle files and how they are organized.

81

82

**Fields:**

83

- `localDirs` (String[]): Base local directories where shuffle files are stored

84

- `subDirsPerLocalDir` (int): Number of subdirectories created within each local directory

85

- `shuffleManager` (String): Fully qualified class name of the shuffle manager (e.g., "org.apache.spark.shuffle.sort.SortShuffleManager")

86

87

## Registration Messages

88

89

### RegisterExecutor

90

91

```java { .api }

92

public class RegisterExecutor extends BlockTransferMessage {

93

public final String appId;

94

public final String execId;

95

public final ExecutorShuffleInfo executorInfo;

96

97

public RegisterExecutor(

98

String appId,

99

String execId,

100

ExecutorShuffleInfo executorInfo

101

);

102

103

protected Type type(); // Returns REGISTER_EXECUTOR

104

105

public boolean equals(Object other);

106

public int hashCode();

107

public String toString();

108

109

public int encodedLength();

110

public void encode(ByteBuf buf);

111

public static RegisterExecutor decode(ByteBuf buf);

112

}

113

```

114

115

Initial registration message sent from executor to shuffle server. Contains information needed to locate the executor's shuffle files.

116

117

**Fields:**

118

- `appId` (String): Spark application identifier

119

- `execId` (String): Executor identifier within the application

120

- `executorInfo` (ExecutorShuffleInfo): Configuration describing shuffle file locations

121

122

### RegisterDriver (Mesos)

123

124

```java { .api }

125

public class RegisterDriver extends BlockTransferMessage {

126

public final String appId;

127

128

public RegisterDriver(String appId);

129

130

protected Type type(); // Returns REGISTER_DRIVER

131

132

public boolean equals(Object other);

133

public int hashCode();

134

public String toString();

135

136

public int encodedLength();

137

public void encode(ByteBuf buf);

138

public static RegisterDriver decode(ByteBuf buf);

139

}

140

```

141

142

Mesos-specific message for registering the Spark driver with the shuffle service for proper cleanup.

143

144

**Fields:**

145

- `appId` (String): Spark application identifier

146

147

## Block Access Messages

148

149

### OpenBlocks

150

151

```java { .api }

152

public class OpenBlocks extends BlockTransferMessage {

153

public final String appId;

154

public final String execId;

155

public final String[] blockIds;

156

157

public OpenBlocks(String appId, String execId, String[] blockIds);

158

159

protected Type type(); // Returns OPEN_BLOCKS

160

161

public boolean equals(Object other);

162

public int hashCode();

163

public String toString();

164

165

public int encodedLength();

166

public void encode(ByteBuf buf);

167

public static OpenBlocks decode(ByteBuf buf);

168

}

169

```

170

171

Request to open and read a set of shuffle blocks. The server responds with a `StreamHandle` containing stream information.

172

173

**Fields:**

174

- `appId` (String): Application that owns the blocks

175

- `execId` (String): Executor that wrote the blocks

176

- `blockIds` (String[]): Array of block identifiers to read

177

178

### StreamHandle

179

180

```java { .api }

181

public class StreamHandle extends BlockTransferMessage {

182

public final long streamId;

183

public final int numChunks;

184

185

public StreamHandle(long streamId, int numChunks);

186

187

protected Type type(); // Returns STREAM_HANDLE

188

189

public boolean equals(Object other);

190

public int hashCode();

191

public String toString();

192

193

public int encodedLength();

194

public void encode(ByteBuf buf);

195

public static StreamHandle decode(ByteBuf buf);

196

}

197

```

198

199

Response to `OpenBlocks` request containing information about the opened stream for reading block data.

200

201

**Fields:**

202

- `streamId` (long): Unique identifier for the data stream

203

- `numChunks` (int): Number of chunks (blocks) available in the stream

204

205

### UploadBlock

206

207

```java { .api }

208

public class UploadBlock extends BlockTransferMessage {

209

public final String appId;

210

public final String execId;

211

public final String blockId;

212

public final byte[] metadata;

213

public final byte[] blockData;

214

215

public UploadBlock(

216

String appId,

217

String execId,

218

String blockId,

219

byte[] metadata,

220

byte[] blockData

221

);

222

223

protected Type type(); // Returns UPLOAD_BLOCK

224

225

public boolean equals(Object other);

226

public int hashCode();

227

public String toString();

228

229

public int encodedLength();

230

public void encode(ByteBuf buf);

231

public static UploadBlock decode(ByteBuf buf);

232

}

233

```

234

235

Request to upload a block with associated metadata. Used by `NettyBlockTransferService` but not typically by external shuffle service.

236

237

**Fields:**

238

- `appId` (String): Application identifier

239

- `execId` (String): Executor identifier

240

- `blockId` (String): Block identifier

241

- `metadata` (byte[]): Serialized block metadata (typically StorageLevel)

242

- `blockData` (byte[]): Raw block data bytes

243

244

## Usage Examples

245

246

### Creating Registration Message

247

248

```java

249

// Create executor shuffle info

250

ExecutorShuffleInfo info = new ExecutorShuffleInfo(

251

new String[]{"/tmp/spark-shuffle"},

252

64,

253

"org.apache.spark.shuffle.sort.SortShuffleManager"

254

);

255

256

// Create registration message

257

RegisterExecutor registerMsg = new RegisterExecutor("app-123", "executor-1", info);

258

259

// Serialize for network transmission

260

ByteBuffer serialized = registerMsg.toByteBuffer();

261

```

262

263

### Creating Block Request

264

265

```java

266

// Request multiple blocks

267

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};

268

OpenBlocks openMsg = new OpenBlocks("app-123", "executor-1", blockIds);

269

270

// Serialize message

271

ByteBuffer requestData = openMsg.toByteBuffer();

272

```

273

274

### Deserializing Messages

275

276

```java

277

// Received message from network

278

ByteBuffer receivedData = ...; // from network

279

280

// Deserialize

281

BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(receivedData);

282

283

// Handle based on type

284

if (message instanceof OpenBlocks) {

285

OpenBlocks openBlocks = (OpenBlocks) message;

286

System.out.println("Request for blocks: " + Arrays.toString(openBlocks.blockIds));

287

288

} else if (message instanceof RegisterExecutor) {

289

RegisterExecutor register = (RegisterExecutor) message;

290

System.out.println("Executor registration: " + register.execId);

291

292

} else if (message instanceof StreamHandle) {

293

StreamHandle handle = (StreamHandle) message;

294

System.out.println("Stream " + handle.streamId + " with " + handle.numChunks + " chunks");

295

}

296

```

297

298

### Mesos Driver Registration

299

300

```java

301

// Mesos-specific driver registration

302

RegisterDriver driverMsg = new RegisterDriver("spark-app-123");

303

ByteBuffer driverRegistration = driverMsg.toByteBuffer();

304

305

// Send to shuffle service for cleanup coordination

306

```

307

308

## Message Flow

309

310

**Normal Block Fetch Flow:**

311

1. Client sends `RegisterExecutor` (one-time setup)

312

2. Client sends `OpenBlocks` with block IDs to fetch

313

3. Server responds with `StreamHandle` containing stream info

314

4. Client reads block data from the stream using the handle

315

316

**Mesos Cleanup Flow:**

317

1. Driver sends `RegisterDriver` for cleanup coordination

318

2. Shuffle service tracks driver registration

319

3. On application completion, service can properly clean up shuffle data

320

321

**Block Upload Flow (for NettyBlockTransferService):**

322

1. Client sends `UploadBlock` with block data and metadata

323

2. Server stores the block and responds with acknowledgment

324

325

## Error Handling

326

327

Protocol-level errors are typically handled through:

328

- **Message validation**: Invalid message formats throw `IllegalArgumentException`

329

- **Serialization errors**: Encoding/decoding failures indicate protocol version mismatches

330

- **Network errors**: Transport layer handles connection failures and retries

331

332

```java

333

try {

334

BlockTransferMessage msg = BlockTransferMessage.Decoder.fromByteBuffer(data);

335

// Process message...

336

} catch (IllegalArgumentException e) {

337

System.err.println("Unknown message type or corrupted data: " + e.getMessage());

338

// Handle protocol error

339

}

340

```