or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdhandler.mdindex.mdmesos.mdprotocol.mdresolver.mdsecurity.md

protocol.mddocs/

0

# Protocol Messages

1

2

Structured protocol messages for client-server communication, including executor registration, block requests, and streaming handles using Netty encoding.

3

4

## Capabilities

5

6

### BlockTransferMessage Base Class

7

8

Abstract base class for all shuffle protocol messages with serialization support.

9

10

```java { .api }

11

/**

12

* Base class for messages handled by ExternalShuffleBlockHandler and NettyBlockTransferService.

13

* All messages implement Encodable for efficient Netty-based serialization.

14

*/

15

public abstract class BlockTransferMessage implements Encodable {

16

/**

17

* Returns the message type identifier.

18

*/

19

protected abstract Type type();

20

21

/**

22

* Serializes the message to a ByteBuffer.

23

* Includes type byte followed by message-specific data.

24

*/

25

public ByteBuffer toByteBuffer();

26

27

/**

28

* Message type enumeration for protocol identification.

29

*/

30

public enum Type {

31

OPEN_BLOCKS(0), // Request to open blocks for streaming

32

UPLOAD_BLOCK(1), // Upload block data (NettyBlockTransferService only)

33

REGISTER_EXECUTOR(2), // Register executor with shuffle service

34

STREAM_HANDLE(3), // Handle for streaming blocks

35

REGISTER_DRIVER(4), // Register Mesos driver

36

HEARTBEAT(5); // Mesos heartbeat message

37

38

public byte id();

39

}

40

41

/**

42

* Message decoder for deserializing incoming messages.

43

*/

44

public static class Decoder {

45

/**

46

* Deserializes ByteBuffer to appropriate message type.

47

* Reads type byte and delegates to specific decode method.

48

*/

49

public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);

50

}

51

}

52

```

53

54

### ExecutorShuffleInfo

55

56

Configuration message containing all information needed to locate executor shuffle files.

57

58

```java { .api }

59

/**

60

* Contains all configuration necessary for locating the shuffle files of an executor.

61

* Sent during executor registration to inform shuffle service about file locations.

62

*/

63

public class ExecutorShuffleInfo implements Encodable {

64

/** The base set of local directories that the executor stores its shuffle files in. */

65

public final String[] localDirs;

66

67

/** Number of subdirectories created within each localDir. */

68

public final int subDirsPerLocalDir;

69

70

/** Shuffle manager (e.g., SortShuffleManager) that the executor is using. */

71

public final String shuffleManager;

72

73

/**

74

* Creates executor shuffle configuration.

75

*

76

* @param localDirs array of local directory paths for shuffle files

77

* @param subDirsPerLocalDir number of subdirectories per local directory

78

* @param shuffleManager fully qualified shuffle manager class name

79

*/

80

public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);

81

82

/**

83

* Encodes the shuffle info to a Netty ByteBuf.

84

*/

85

@Override

86

public void encode(ByteBuf buf);

87

88

/**

89

* Decodes shuffle info from a Netty ByteBuf.

90

*/

91

public static ExecutorShuffleInfo decode(ByteBuf buf);

92

93

/**

94

* Returns the encoded length in bytes.

95

*/

96

@Override

97

public int encodedLength();

98

99

@Override

100

public boolean equals(Object other);

101

102

@Override

103

public int hashCode();

104

105

@Override

106

public String toString();

107

}

108

```

109

110

**Usage Example:**

111

112

```java

113

import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;

114

115

// Create executor shuffle configuration

116

String[] localDirs = {

117

"/tmp/spark-local-dir1",

118

"/tmp/spark-local-dir2",

119

"/mnt/spark-ssd/local"

120

};

121

int subDirsPerLocalDir = 64; // Hash-based directory distribution

122

String shuffleManager = "org.apache.spark.shuffle.sort.SortShuffleManager";

123

124

ExecutorShuffleInfo info = new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);

125

126

// Shuffle info is automatically encoded when sent over network

127

System.out.println("Shuffle info: " + info.toString());

128

System.out.println("Encoded size: " + info.encodedLength() + " bytes");

129

```

130

131

### OpenBlocks

132

133

Message requesting the server to open shuffle blocks for streaming.

134

135

```java { .api }

136

/**

137

* Message to request opening blocks for streaming.

138

* Server responds with StreamHandle containing stream ID and chunk count.

139

*/

140

public class OpenBlocks extends BlockTransferMessage {

141

public final String appId;

142

public final String execId;

143

public final String[] blockIds;

144

145

/**

146

* Creates an open blocks request.

147

*

148

* @param appId application identifier

149

* @param execId executor identifier

150

* @param blockIds array of block identifiers to open

151

*/

152

public OpenBlocks(String appId, String execId, String[] blockIds);

153

154

@Override

155

protected Type type();

156

157

@Override

158

public void encode(ByteBuf buf);

159

160

public static OpenBlocks decode(ByteBuf buf);

161

162

@Override

163

public int encodedLength();

164

165

@Override

166

public boolean equals(Object other);

167

168

@Override

169

public int hashCode();

170

171

@Override

172

public String toString();

173

}

174

```

175

176

### RegisterExecutor

177

178

Message for registering an executor with the shuffle service.

179

180

```java { .api }

181

/**

182

* Message for registering an executor with the external shuffle service.

183

* Contains executor ID and shuffle configuration information.

184

*/

185

public class RegisterExecutor extends BlockTransferMessage {

186

public final String appId;

187

public final String execId;

188

public final ExecutorShuffleInfo executorInfo;

189

190

/**

191

* Creates an executor registration message.

192

*

193

* @param appId application identifier

194

* @param execId executor identifier

195

* @param executorInfo shuffle configuration for this executor

196

*/

197

public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

198

199

@Override

200

protected Type type();

201

202

@Override

203

public void encode(ByteBuf buf);

204

205

public static RegisterExecutor decode(ByteBuf buf);

206

207

@Override

208

public int encodedLength();

209

210

@Override

211

public boolean equals(Object other);

212

213

@Override

214

public int hashCode();

215

216

@Override

217

public String toString();

218

}

219

```

220

221

### StreamHandle

222

223

Response message containing stream information for block fetching.

224

225

```java { .api }

226

/**

227

* Handle for streaming blocks from server to client.

228

* Returned in response to OpenBlocks request.

229

*/

230

public class StreamHandle extends BlockTransferMessage {

231

public final long streamId;

232

public final int numChunks;

233

234

/**

235

* Creates a stream handle.

236

*

237

* @param streamId unique identifier for the stream

238

* @param numChunks number of chunks (blocks) in the stream

239

*/

240

public StreamHandle(long streamId, int numChunks);

241

242

@Override

243

protected Type type();

244

245

@Override

246

public void encode(ByteBuf buf);

247

248

public static StreamHandle decode(ByteBuf buf);

249

250

@Override

251

public int encodedLength();

252

253

@Override

254

public boolean equals(Object other);

255

256

@Override

257

public int hashCode();

258

259

@Override

260

public String toString();

261

}

262

```

263

264

### UploadBlock

265

266

Message for uploading block data (used by NettyBlockTransferService, not external shuffle service).

267

268

```java { .api }

269

/**

270

* Message for uploading block data to a remote executor.

271

* Used by NettyBlockTransferService, not by external shuffle service.

272

*/

273

public class UploadBlock extends BlockTransferMessage {

274

public final String appId;

275

public final String execId;

276

public final String blockId;

277

public final byte[] blockData;

278

public final String level;

279

280

/**

281

* Creates an upload block message.

282

*

283

* @param appId application identifier

284

* @param execId executor identifier

285

* @param blockId block identifier

286

* @param blockData block data bytes

287

* @param level storage level

288

*/

289

public UploadBlock(String appId, String execId, String blockId, byte[] blockData, String level);

290

291

@Override

292

protected Type type();

293

294

@Override

295

public void encode(ByteBuf buf);

296

297

public static UploadBlock decode(ByteBuf buf);

298

299

@Override

300

public int encodedLength();

301

302

@Override

303

public boolean equals(Object other);

304

305

@Override

306

public int hashCode();

307

308

@Override

309

public String toString();

310

}

311

```

312

313

## Protocol Usage Examples

314

315

### Client-Server Communication Flow

316

317

```java

318

// 1. Client registers executor with shuffle service

319

ExecutorShuffleInfo info = new ExecutorShuffleInfo(localDirs, 64, shuffleManager);

320

RegisterExecutor registerMsg = new RegisterExecutor(appId, execId, info);

321

ByteBuffer encoded = registerMsg.toByteBuffer();

322

client.sendRpcSync(encoded, timeout);

323

324

// 2. Client requests blocks for streaming

325

String[] blockIds = {"shuffle_0_1_0", "shuffle_0_1_1", "shuffle_0_2_0"};

326

OpenBlocks openMsg = new OpenBlocks(appId, execId, blockIds);

327

ByteBuffer request = openMsg.toByteBuffer();

328

ByteBuffer response = client.sendRpcSync(request, timeout);

329

330

// 3. Server responds with stream handle

331

BlockTransferMessage responseMsg = BlockTransferMessage.Decoder.fromByteBuffer(response);

332

if (responseMsg instanceof StreamHandle) {

333

StreamHandle handle = (StreamHandle) responseMsg;

334

System.out.println("Stream ID: " + handle.streamId);

335

System.out.println("Chunks: " + handle.numChunks);

336

337

// 4. Client fetches stream chunks using stream ID

338

// (handled by transport layer)

339

}

340

```

341

342

### Message Encoding and Decoding

343

344

```java

345

import org.apache.spark.network.shuffle.protocol.*;

346

import java.nio.ByteBuffer;

347

348

// Encoding messages for network transmission

349

RegisterExecutor message = new RegisterExecutor(appId, execId, executorInfo);

350

ByteBuffer encoded = message.toByteBuffer();

351

System.out.println("Encoded message size: " + encoded.remaining() + " bytes");

352

353

// Decoding messages from network

354

ByteBuffer received = // ... received from network

355

BlockTransferMessage decoded = BlockTransferMessage.Decoder.fromByteBuffer(received);

356

357

switch (decoded.type()) {

358

case REGISTER_EXECUTOR:

359

RegisterExecutor regMsg = (RegisterExecutor) decoded;

360

System.out.println("Received registration for executor: " + regMsg.execId);

361

break;

362

363

case OPEN_BLOCKS:

364

OpenBlocks openMsg = (OpenBlocks) decoded;

365

System.out.println("Received request for " + openMsg.blockIds.length + " blocks");

366

break;

367

368

case STREAM_HANDLE:

369

StreamHandle handle = (StreamHandle) decoded;

370

System.out.println("Received stream handle: " + handle.streamId);

371

break;

372

373

default:

374

System.err.println("Unknown message type: " + decoded.type());

375

}

376

```

377

378

### Custom Message Processing

379

380

```java

381

/**

382

* Example message handler implementation

383

*/

384

public class ShuffleMessageProcessor {

385

386

public void processMessage(ByteBuffer messageBuffer) {

387

try {

388

BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(messageBuffer);

389

390

switch (message.type()) {

391

case REGISTER_EXECUTOR:

392

handleRegisterExecutor((RegisterExecutor) message);

393

break;

394

395

case OPEN_BLOCKS:

396

handleOpenBlocks((OpenBlocks) message);

397

break;

398

399

default:

400

throw new UnsupportedOperationException("Unsupported message: " + message.type());

401

}

402

} catch (IllegalArgumentException e) {

403

System.err.println("Failed to decode message: " + e.getMessage());

404

}

405

}

406

407

private void handleRegisterExecutor(RegisterExecutor message) {

408

System.out.println("Registering executor " + message.execId +

409

" with " + message.executorInfo.localDirs.length + " local directories");

410

411

// Validate executor info

412

if (message.executorInfo.localDirs.length == 0) {

413

throw new IllegalArgumentException("Executor must have at least one local directory");

414

}

415

416

// Process registration...

417

}

418

419

private void handleOpenBlocks(OpenBlocks message) {

420

System.out.println("Opening " + message.blockIds.length + " blocks for executor " + message.execId);

421

422

// Validate block IDs

423

for (String blockId : message.blockIds) {

424

if (!blockId.startsWith("shuffle_")) {

425

throw new IllegalArgumentException("Invalid block ID format: " + blockId);

426

}

427

}

428

429

// Process block opening...

430

}

431

}

432

```

433

434

## Error Handling

435

436

Protocol messages can fail during encoding/decoding:

437

438

- **IllegalArgumentException**: Invalid message format, unknown message type

439

- **UnsupportedOperationException**: Unsupported message types

440

- **BufferUnderflowException**: Incomplete message data

441

- **BufferOverflowException**: Message too large for buffer

442

443

**Error Handling Example:**

444

445

```java

446

try {

447

BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(buffer);

448

// Process message

449

} catch (IllegalArgumentException e) {

450

System.err.println("Invalid message format: " + e.getMessage());

451

} catch (Exception e) {

452

System.err.println("Message processing failed: " + e.getMessage());

453

}

454

```