or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-apis.mdindex.mdprotocol-messages.mdsecurity.mdserver-apis.md

protocol-messages.mddocs/

0

# Protocol Messages

1

2

Structured communication protocol between shuffle clients and servers with efficient binary serialization using Netty ByteBuf for network transport.

3

4

## Capabilities

5

6

### Block Transfer Message Base

7

8

Abstract base class for all protocol messages handled by the external shuffle block handler.

9

10

```java { .api }

11

/**

12

* Base class for messages handled by ExternalShuffleBlockHandler

13

*/

14

public abstract class BlockTransferMessage implements Encodable {

15

/**

16

* Get the message type identifier

17

* @return Message type enum value

18

*/

19

protected abstract Type type();

20

21

/**

22

* Serialize message to byte buffer for network transport

23

* @return ByteBuffer containing serialized message

24

*/

25

public ByteBuffer toByteBuffer();

26

27

/**

28

* Message type enumeration

29

*/

30

public static enum Type {

31

/** Request to open blocks for reading */

32

OPEN_BLOCKS(0),

33

/** Request to upload a block */

34

UPLOAD_BLOCK(1),

35

/** Request to register an executor */

36

REGISTER_EXECUTOR(2),

37

/** Response containing stream handle */

38

STREAM_HANDLE(3),

39

/** Request to register a driver (Mesos) */

40

REGISTER_DRIVER(4);

41

42

/**

43

* Get the numeric identifier for this type

44

* @return Numeric type ID

45

*/

46

public byte id();

47

}

48

49

/**

50

* Message decoder for deserializing from byte buffers

51

*/

52

public static class Decoder {

53

/**

54

* Decode message from byte buffer

55

* @param msg - ByteBuffer containing serialized message

56

* @return Decoded BlockTransferMessage instance

57

*/

58

public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);

59

}

60

}

61

```

62

63

### Executor Shuffle Information

64

65

Configuration information for locating shuffle files of a registered executor.

66

67

```java { .api }

68

/**

69

* Contains configuration for locating shuffle files of an executor

70

*/

71

public class ExecutorShuffleInfo implements Encodable {

72

/** Array of local directories where shuffle files are stored */

73

public final String[] localDirs;

74

/** Number of subdirectories per local directory */

75

public final int subDirsPerLocalDir;

76

/** Shuffle manager class name */

77

public final String shuffleManager;

78

79

/**

80

* Create executor shuffle information

81

* @param localDirs - Array of local directories for shuffle files

82

* @param subDirsPerLocalDir - Number of subdirectories per local directory

83

* @param shuffleManager - Shuffle manager class name

84

*/

85

public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);

86

87

/**

88

* Generate hash code for this configuration

89

* @return hash code

90

*/

91

public int hashCode();

92

93

/**

94

* String representation of this configuration

95

* @return string representation

96

*/

97

public String toString();

98

99

/**

100

* Check equality with another ExecutorShuffleInfo

101

* @param other - Object to compare

102

* @return true if equal

103

*/

104

public boolean equals(Object other);

105

106

/**

107

* Get encoded length of this message

108

* @return Length in bytes when encoded

109

*/

110

public int encodedLength();

111

112

/**

113

* Encode this message to byte buffer

114

* @param buf - ByteBuf to write encoded data

115

*/

116

public void encode(ByteBuf buf);

117

118

/**

119

* Decode ExecutorShuffleInfo from byte buffer

120

* @param buf - ByteBuf containing encoded data

121

* @return Decoded ExecutorShuffleInfo instance

122

*/

123

public static ExecutorShuffleInfo decode(ByteBuf buf);

124

}

125

```

126

127

**Usage Example:**

128

129

```java

130

// Create executor shuffle info

131

ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(

132

new String[]{"/tmp/spark-local-1", "/tmp/spark-local-2"},

133

64,

134

"org.apache.spark.shuffle.sort.SortShuffleManager"

135

);

136

137

// Serialize for network transport

138

ByteBuf buffer = Unpooled.buffer(shuffleInfo.encodedLength());

139

shuffleInfo.encode(buffer);

140

141

// Deserialize from network

142

ExecutorShuffleInfo decoded = ExecutorShuffleInfo.decode(buffer);

143

```

144

145

### Open Blocks Request

146

147

Request message to read a set of blocks, which returns a StreamHandle for streaming the block data.

148

149

```java { .api }

150

/**

151

* Request to read a set of blocks, returns StreamHandle

152

*/

153

public class OpenBlocks extends BlockTransferMessage {

154

/** Application identifier */

155

public final String appId;

156

/** Executor identifier */

157

public final String execId;

158

/** Array of block identifiers to open */

159

public final String[] blockIds;

160

161

/**

162

* Create open blocks request

163

* @param appId - Application identifier

164

* @param execId - Executor identifier

165

* @param blockIds - Array of block identifiers to open

166

*/

167

public OpenBlocks(String appId, String execId, String[] blockIds);

168

169

/**

170

* Generate hash code for this request

171

* @return hash code

172

*/

173

public int hashCode();

174

175

/**

176

* String representation of this request

177

* @return string representation

178

*/

179

public String toString();

180

181

/**

182

* Check equality with another OpenBlocks

183

* @param other - Object to compare

184

* @return true if equal

185

*/

186

public boolean equals(Object other);

187

188

/**

189

* Get encoded length of this message

190

* @return Length in bytes when encoded

191

*/

192

public int encodedLength();

193

194

/**

195

* Encode this message to byte buffer

196

* @param buf - ByteBuf to write encoded data

197

*/

198

public void encode(ByteBuf buf);

199

200

/**

201

* Decode OpenBlocks from byte buffer

202

* @param buf - ByteBuf containing encoded data

203

* @return Decoded OpenBlocks instance

204

*/

205

public static OpenBlocks decode(ByteBuf buf);

206

}

207

```

208

209

### Register Executor Request

210

211

Initial registration message sent between executor and shuffle server to establish the executor's configuration.

212

213

```java { .api }

214

/**

215

* Initial registration message between executor and shuffle server

216

*/

217

public class RegisterExecutor extends BlockTransferMessage {

218

/** Application identifier */

219

public final String appId;

220

/** Executor identifier */

221

public final String execId;

222

/** Executor configuration information */

223

public final ExecutorShuffleInfo executorInfo;

224

225

/**

226

* Create register executor request

227

* @param appId - Application identifier

228

* @param execId - Executor identifier

229

* @param executorInfo - Executor configuration information

230

*/

231

public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

232

233

/**

234

* Generate hash code for this request

235

* @return hash code

236

*/

237

public int hashCode();

238

239

/**

240

* String representation of this request

241

* @return string representation

242

*/

243

public String toString();

244

245

/**

246

* Check equality with another RegisterExecutor

247

* @param other - Object to compare

248

* @return true if equal

249

*/

250

public boolean equals(Object other);

251

252

/**

253

* Get encoded length of this message

254

* @return Length in bytes when encoded

255

*/

256

public int encodedLength();

257

258

/**

259

* Encode this message to byte buffer

260

* @param buf - ByteBuf to write encoded data

261

*/

262

public void encode(ByteBuf buf);

263

264

/**

265

* Decode RegisterExecutor from byte buffer

266

* @param buf - ByteBuf containing encoded data

267

* @return Decoded RegisterExecutor instance

268

*/

269

public static RegisterExecutor decode(ByteBuf buf);

270

}

271

```

272

273

### Stream Handle Response

274

275

Response message containing an identifier for a fixed number of chunks to read from a stream.

276

277

```java { .api }

278

/**

279

* Identifier for fixed number of chunks to read from stream

280

*/

281

public class StreamHandle extends BlockTransferMessage {

282

/** Stream identifier for reading chunks */

283

public final long streamId;

284

/** Number of chunks available in the stream */

285

public final int numChunks;

286

287

/**

288

* Create stream handle response

289

* @param streamId - Stream identifier for reading chunks

290

* @param numChunks - Number of chunks available in the stream

291

*/

292

public StreamHandle(long streamId, int numChunks);

293

294

/**

295

* Generate hash code for this handle

296

* @return hash code

297

*/

298

public int hashCode();

299

300

/**

301

* String representation of this handle

302

* @return string representation

303

*/

304

public String toString();

305

306

/**

307

* Check equality with another StreamHandle

308

* @param other - Object to compare

309

* @return true if equal

310

*/

311

public boolean equals(Object other);

312

313

/**

314

* Get encoded length of this message

315

* @return Length in bytes when encoded

316

*/

317

public int encodedLength();

318

319

/**

320

* Encode this message to byte buffer

321

* @param buf - ByteBuf to write encoded data

322

*/

323

public void encode(ByteBuf buf);

324

325

/**

326

* Decode StreamHandle from byte buffer

327

* @param buf - ByteBuf containing encoded data

328

* @return Decoded StreamHandle instance

329

*/

330

public static StreamHandle decode(ByteBuf buf);

331

}

332

```

333

334

### Upload Block Request

335

336

Request message to upload a block with associated metadata and storage level information.

337

338

```java { .api }

339

/**

340

* Request to upload a block with StorageLevel

341

*/

342

public class UploadBlock extends BlockTransferMessage {

343

/** Application identifier */

344

public final String appId;

345

/** Executor identifier */

346

public final String execId;

347

/** Block identifier */

348

public final String blockId;

349

/** Block metadata as byte array */

350

public final byte[] metadata;

351

/** Block data as byte array */

352

public final byte[] blockData;

353

354

/**

355

* Create upload block request

356

* @param appId - Application identifier

357

* @param execId - Executor identifier

358

* @param blockId - Block identifier

359

* @param metadata - Block metadata as byte array

360

* @param blockData - Block data as byte array

361

*/

362

public UploadBlock(String appId, String execId, String blockId, byte[] metadata, byte[] blockData);

363

364

/**

365

* Generate hash code for this request

366

* @return hash code

367

*/

368

public int hashCode();

369

370

/**

371

* String representation of this request

372

* @return string representation

373

*/

374

public String toString();

375

376

/**

377

* Check equality with another UploadBlock

378

* @param other - Object to compare

379

* @return true if equal

380

*/

381

public boolean equals(Object other);

382

383

/**

384

* Get encoded length of this message

385

* @return Length in bytes when encoded

386

*/

387

public int encodedLength();

388

389

/**

390

* Encode this message to byte buffer

391

* @param buf - ByteBuf to write encoded data

392

*/

393

public void encode(ByteBuf buf);

394

395

/**

396

* Decode UploadBlock from byte buffer

397

* @param buf - ByteBuf containing encoded data

398

* @return Decoded UploadBlock instance

399

*/

400

public static UploadBlock decode(ByteBuf buf);

401

}

402

```

403

404

## Mesos Protocol Extension

405

406

### Register Driver Request

407

408

Special registration message for driver registration with MesosExternalShuffleService in coarse-grained mode.

409

410

```java { .api }

411

/**

412

* Message for driver registration with MesosExternalShuffleService

413

*/

414

public class RegisterDriver extends BlockTransferMessage {

415

/**

416

* Create register driver request

417

* @param appId - Application identifier

418

*/

419

public RegisterDriver(String appId);

420

421

/**

422

* Get application identifier

423

* @return Application identifier

424

*/

425

public String getAppId();

426

427

/**

428

* Get encoded length of this message

429

* @return Length in bytes when encoded

430

*/

431

public int encodedLength();

432

433

/**

434

* Encode this message to byte buffer

435

* @param buf - ByteBuf to write encoded data

436

*/

437

public void encode(ByteBuf buf);

438

439

/**

440

* Generate hash code for this request

441

* @return hash code

442

*/

443

public int hashCode();

444

445

/**

446

* Decode RegisterDriver from byte buffer

447

* @param buf - ByteBuf containing encoded data

448

* @return Decoded RegisterDriver instance

449

*/

450

public static RegisterDriver decode(ByteBuf buf);

451

}

452

```

453

454

## Protocol Usage Examples

455

456

### Client-Server Communication Flow

457

458

```java

459

// 1. Client registers executor with server

460

ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(

461

new String[]{"/tmp/spark-local"},

462

64,

463

"org.apache.spark.shuffle.sort.SortShuffleManager"

464

);

465

RegisterExecutor registerMsg = new RegisterExecutor("app-123", "executor-1", shuffleInfo);

466

467

// 2. Client requests to open blocks

468

OpenBlocks openMsg = new OpenBlocks("app-123", "executor-1",

469

new String[]{"shuffle_0_1_0", "shuffle_0_1_1"});

470

471

// 3. Server responds with stream handle

472

StreamHandle handleMsg = new StreamHandle(12345L, 2);

473

474

// 4. Client can now read chunks using the stream ID

475

```

476

477

### Message Serialization

478

479

```java

480

// Serialize any protocol message

481

BlockTransferMessage message = new OpenBlocks("app-123", "executor-1", new String[]{"shuffle_0_1_0"});

482

ByteBuffer serialized = message.toByteBuffer();

483

484

// Deserialize from byte buffer

485

BlockTransferMessage deserialized = BlockTransferMessage.Decoder.fromByteBuffer(serialized);

486

487

// Type-safe casting

488

if (deserialized instanceof OpenBlocks) {

489

OpenBlocks openBlocks = (OpenBlocks) deserialized;

490

// Process open blocks request

491

}

492

```