or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md

protocol.mddocs/

0

# Protocol Messages

1

2

Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.

3

4

## Capabilities

5

6

### BlockTransferMessage Base Class

7

8

Base class for all shuffle protocol messages.

9

10

```java { .api }

11

/**

12

* Base class for all shuffle protocol messages

13

*/

14

public abstract class BlockTransferMessage implements Encodable {

15

/**

16

* Convert the message to a ByteBuffer for network transmission

17

* @return ByteBuffer containing the serialized message

18

*/

19

public ByteBuffer toByteBuffer();

20

21

/**

22

* Enumeration of all supported message types

23

*/

24

public enum Type {

25

OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,

26

REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM

27

}

28

29

/**

30

* Decoder for deserializing messages from ByteBuffer

31

*/

32

public static class Decoder {

33

/**

34

* Deserialize a message from ByteBuffer

35

* @param msg - ByteBuffer containing serialized message

36

* @return Deserialized BlockTransferMessage

37

*/

38

public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);

39

}

40

}

41

```

42

43

### ExecutorShuffleInfo

44

45

Contains executor configuration for locating shuffle files.

46

47

```java { .api }

48

/**

49

* Contains executor configuration for locating shuffle files

50

*/

51

public class ExecutorShuffleInfo implements Encodable {

52

/**

53

* Local directories where shuffle files are stored

54

*/

55

public final String[] localDirs;

56

57

/**

58

* Number of subdirectories per local directory

59

*/

60

public final int subDirsPerLocalDir;

61

62

/**

63

* Shuffle manager class name

64

*/

65

public final String shuffleManager;

66

67

/**

68

* Create executor shuffle information

69

* @param localDirs - Array of local directory paths for shuffle files

70

* @param subDirsPerLocalDir - Number of subdirectories per local directory

71

* @param shuffleManager - Name of the shuffle manager implementation

72

*/

73

public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);

74

75

public boolean equals(Object other);

76

public int hashCode();

77

public String toString();

78

}

79

```

80

81

### OpenBlocks Message

82

83

Request to read a set of blocks.

84

85

```java { .api }

86

/**

87

* Request to read a set of blocks from the shuffle service

88

*/

89

public class OpenBlocks extends BlockTransferMessage {

90

/**

91

* Application ID

92

*/

93

public final String appId;

94

95

/**

96

* Executor ID

97

*/

98

public final String execId;

99

100

/**

101

* Array of block IDs to open

102

*/

103

public final String[] blockIds;

104

105

/**

106

* Create an open blocks request

107

* @param appId - Application ID

108

* @param execId - Executor ID

109

* @param blockIds - Array of block IDs to request

110

*/

111

public OpenBlocks(String appId, String execId, String[] blockIds);

112

113

public boolean equals(Object other);

114

public int hashCode();

115

public String toString();

116

}

117

```

118

119

### RegisterExecutor Message

120

121

Initial registration message between executor and shuffle server.

122

123

```java { .api }

124

/**

125

* Initial registration message between executor and shuffle server

126

*/

127

public class RegisterExecutor extends BlockTransferMessage {

128

/**

129

* Application ID

130

*/

131

public final String appId;

132

133

/**

134

* Executor ID

135

*/

136

public final String execId;

137

138

/**

139

* Executor shuffle configuration information

140

*/

141

public final ExecutorShuffleInfo executorInfo;

142

143

/**

144

* Create an executor registration message

145

* @param appId - Application ID

146

* @param execId - Executor ID

147

* @param executorInfo - Executor shuffle configuration

148

*/

149

public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);

150

151

public boolean equals(Object other);

152

public int hashCode();

153

public String toString();

154

}

155

```

156

157

### StreamHandle Message

158

159

Identifier for fixed number of chunks in a stream.

160

161

```java { .api }

162

/**

163

* Identifier for a fixed number of chunks in a stream

164

*/

165

public class StreamHandle extends BlockTransferMessage {

166

/**

167

* Stream identifier

168

*/

169

public final long streamId;

170

171

/**

172

* Number of chunks in the stream

173

*/

174

public final int numChunks;

175

176

/**

177

* Create a stream handle

178

* @param streamId - Unique stream identifier

179

* @param numChunks - Number of chunks in the stream

180

*/

181

public StreamHandle(long streamId, int numChunks);

182

183

public boolean equals(Object other);

184

public int hashCode();

185

public String toString();

186

}

187

```

188

189

### UploadBlock Message

190

191

Request to upload a block with storage level.

192

193

```java { .api }

194

/**

195

* Request to upload a block with storage level

196

*/

197

public class UploadBlock extends BlockTransferMessage {

198

/**

199

* Application ID

200

*/

201

public final String appId;

202

203

/**

204

* Executor ID

205

*/

206

public final String execId;

207

208

/**

209

* Block ID

210

*/

211

public final String blockId;

212

213

/**

214

* Block metadata

215

*/

216

public final byte[] metadata;

217

218

/**

219

* Block data

220

*/

221

public final byte[] blockData;

222

223

/**

224

* Create an upload block request

225

* @param appId - Application ID

226

* @param execId - Executor ID

227

* @param blockId - Block ID to upload

228

* @param metadata - Block metadata

229

* @param blockData - Block data bytes

230

*/

231

public UploadBlock(String appId, String execId, String blockId, byte[] metadata, byte[] blockData);

232

233

public boolean equals(Object other);

234

public int hashCode();

235

public String toString();

236

}

237

```

238

239

### UploadBlockStream Message

240

241

Request to upload block as stream.

242

243

```java { .api }

244

/**

245

* Request to upload block as stream

246

*/

247

public class UploadBlockStream extends BlockTransferMessage {

248

/**

249

* Block ID

250

*/

251

public final String blockId;

252

253

/**

254

* Block metadata

255

*/

256

public final byte[] metadata;

257

258

/**

259

* Create an upload block stream request

260

* @param blockId - Block ID to upload

261

* @param metadata - Block metadata

262

*/

263

public UploadBlockStream(String blockId, byte[] metadata);

264

265

public boolean equals(Object other);

266

public int hashCode();

267

public String toString();

268

}

269

```

270

271

### Mesos Protocol Messages

272

273

#### RegisterDriver Message

274

275

Message for driver registration with Mesos external shuffle service.

276

277

```java { .api }

278

/**

279

* Message for driver registration with Mesos external shuffle service

280

*/

281

public class RegisterDriver extends BlockTransferMessage {

282

/**

283

* Create a driver registration message

284

* @param appId - Application ID

285

* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds

286

*/

287

public RegisterDriver(String appId, long heartbeatTimeoutMs);

288

289

/**

290

* Get the application ID

291

* @return Application ID

292

*/

293

public String getAppId();

294

295

/**

296

* Get the heartbeat timeout

297

* @return Heartbeat timeout in milliseconds

298

*/

299

public long getHeartbeatTimeoutMs();

300

301

public boolean equals(Object other);

302

public int hashCode();

303

public String toString();

304

}

305

```

306

307

#### ShuffleServiceHeartbeat Message

308

309

Heartbeat message from driver to Mesos external shuffle service.

310

311

```java { .api }

312

/**

313

* Heartbeat message from driver to Mesos external shuffle service

314

*/

315

public class ShuffleServiceHeartbeat extends BlockTransferMessage {

316

/**

317

* Create a heartbeat message

318

* @param appId - Application ID

319

*/

320

public ShuffleServiceHeartbeat(String appId);

321

322

/**

323

* Get the application ID

324

* @return Application ID

325

*/

326

public String getAppId();

327

328

public boolean equals(Object other);

329

public int hashCode();

330

public String toString();

331

}

332

```

333

334

**Usage Examples:**

335

336

```java

337

import org.apache.spark.network.shuffle.protocol.*;

338

import org.apache.spark.network.shuffle.protocol.mesos.*;

339

340

// Example 1: Executor registration

341

String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};

342

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");

343

RegisterExecutor registerMsg = new RegisterExecutor("app-001", "executor-1", executorInfo);

344

345

// Serialize for network transmission

346

ByteBuffer serialized = registerMsg.toByteBuffer();

347

System.out.println("Serialized registration message: " + serialized.remaining() + " bytes");

348

349

// Deserialize received message

350

BlockTransferMessage received = BlockTransferMessage.Decoder.fromByteBuffer(serialized);

351

if (received instanceof RegisterExecutor) {

352

RegisterExecutor regMsg = (RegisterExecutor) received;

353

System.out.println("Received registration for app: " + regMsg.appId +

354

", executor: " + regMsg.execId);

355

}

356

357

// Example 2: Block request

358

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};

359

OpenBlocks openBlocks = new OpenBlocks("app-001", "executor-1", blockIds);

360

361

ByteBuffer openBlocksBuffer = openBlocks.toByteBuffer();

362

System.out.println("Open blocks request size: " + openBlocksBuffer.remaining() + " bytes");

363

364

// Example 3: Stream handling

365

StreamHandle streamHandle = new StreamHandle(12345L, 3);

366

System.out.println("Stream handle: ID=" + streamHandle.streamId +

367

", chunks=" + streamHandle.numChunks);

368

369

// Example 4: Block upload

370

byte[] metadata = "block-metadata".getBytes();

371

byte[] blockData = "actual-block-data-here".getBytes();

372

UploadBlock uploadBlock = new UploadBlock("app-001", "executor-1", "block-123", metadata, blockData);

373

374

System.out.println("Upload block: " + uploadBlock.blockId +

375

", metadata size: " + uploadBlock.metadata.length +

376

", data size: " + uploadBlock.blockData.length);

377

378

// Example 5: Mesos driver registration

379

RegisterDriver driverReg = new RegisterDriver("app-001", 30000L);

380

System.out.println("Mesos driver registration: app=" + driverReg.getAppId() +

381

", timeout=" + driverReg.getHeartbeatTimeoutMs() + "ms");

382

383

// Example 6: Heartbeat

384

ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat("app-001");

385

System.out.println("Heartbeat for app: " + heartbeat.getAppId());

386

```

387

388

### Protocol Message Flow

389

390

The typical message flow between shuffle clients and servers:

391

392

1. **Registration Phase**:

393

- Client sends `RegisterExecutor` with executor configuration

394

- Server acknowledges and stores executor information

395

396

2. **Block Request Phase**:

397

- Client sends `OpenBlocks` with list of required block IDs

398

- Server responds with `StreamHandle` containing stream information

399

400

3. **Data Transfer Phase**:

401

- Client receives block data through established stream

402

- Multiple blocks can be transferred through a single stream

403

404

4. **Upload Operations** (if needed):

405

- Client sends `UploadBlock` or `UploadBlockStream` for block storage

406

- Server stores blocks according to shuffle configuration

407

408

5. **Mesos-Specific Flow**:

409

- Driver sends `RegisterDriver` to establish connection

410

- Periodic `ShuffleServiceHeartbeat` messages maintain connection

411

412

### Message Serialization

413

414

All protocol messages implement the `Encodable` interface and provide:

415

416

- **Encoding**: Convert message objects to ByteBuffer for network transmission

417

- **Decoding**: Reconstruct message objects from received ByteBuffer

418

- **Type Safety**: Message type identification for proper deserialization

419

- **Efficiency**: Optimized serialization for high-throughput shuffle operations

420

421

### Error Handling

422

423

Protocol messages include built-in error handling:

424

425

- **Validation**: Input parameter validation during message creation

426

- **Serialization Errors**: Proper exception handling during encoding/decoding

427

- **Version Compatibility**: Forward/backward compatibility for protocol evolution

428

- **Corruption Detection**: Built-in mechanisms to detect corrupted messages