or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-management.mdindex.mdmessage-protocol.mdsecurity-authentication.mdserver-operations.mdshuffle-database.mdtransport-context.md

message-protocol.mddocs/

0

# Message Protocol

1

2

The message protocol API defines the comprehensive communication system for Apache Spark's networking layer. It provides a structured approach to different types of network communication including RPC calls, streaming operations, and chunk fetching through a type-safe message system built on Netty.

3

4

## Capabilities

5

6

### Message Interface

7

8

Base interface for all network messages in the Spark transport protocol.

9

10

```java { .api }

11

public interface Message extends Encodable {

12

/**

13

* Get the type of this message

14

* @return Type enum indicating the specific message type

15

*/

16

Type type();

17

18

/**

19

* Get the body data of this message

20

* @return ManagedBuffer containing the message payload

21

*/

22

ManagedBuffer body();

23

24

/**

25

* Check if the message body is included in the frame

26

* @return boolean indicating if body is in-frame (true) or separate (false)

27

*/

28

boolean isBodyInFrame();

29

30

/**

31

* Enumeration of all supported message types

32

*/

33

enum Type {

34

ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),

35

RpcRequest(3), RpcResponse(4), RpcFailure(5),

36

StreamRequest(6), StreamResponse(7), StreamFailure(8),

37

OneWayMessage(9), UploadStream(10),

38

MergedBlockMetaRequest(11), MergedBlockMetaSuccess(12),

39

User(-1);

40

41

private final byte id;

42

Type(int id) { this.id = (byte) id; }

43

public byte id() { return id; }

44

}

45

}

46

```

47

48

### Encodable Interface

49

50

Interface for objects that can be encoded to ByteBuf for network transmission.

51

52

```java { .api }

53

public interface Encodable {

54

/**

55

* Get the encoded length of this object in bytes

56

* @return int representing the number of bytes needed for encoding

57

*/

58

int encodedLength();

59

60

/**

61

* Encode this object into the provided ByteBuf

62

* @param buf - ByteBuf to write the encoded data to

63

*/

64

void encode(ByteBuf buf);

65

}

66

```

67

68

### Message Categories

69

70

Messages are categorized into request and response types for structured communication patterns.

71

72

```java { .api }

73

/**

74

* Marker interface for request messages

75

*/

76

public interface RequestMessage extends Message {

77

}

78

79

/**

80

* Marker interface for response messages

81

*/

82

public interface ResponseMessage extends Message {

83

}

84

```

85

86

## RPC Messages

87

88

### RpcRequest

89

90

Message for sending remote procedure calls to the server.

91

92

```java { .api }

93

public final class RpcRequest extends AbstractMessage implements RequestMessage {

94

/**

95

* Create an RPC request message

96

* @param requestId - Unique identifier for this RPC request

97

* @param message - ManagedBuffer containing the RPC data

98

*/

99

public RpcRequest(long requestId, ManagedBuffer message);

100

101

/**

102

* Get the request identifier

103

* @return long representing the unique request ID

104

*/

105

public long requestId();

106

107

@Override

108

public Type type();

109

110

@Override

111

public ManagedBuffer body();

112

113

@Override

114

public boolean isBodyInFrame();

115

}

116

```

117

118

### RpcResponse

119

120

Message for returning successful RPC call results.

121

122

```java { .api }

123

public final class RpcResponse extends AbstractResponseMessage {

124

/**

125

* Create an RPC response message

126

* @param requestId - ID of the original RPC request

127

* @param message - ManagedBuffer containing the response data

128

*/

129

public RpcResponse(long requestId, ManagedBuffer message);

130

131

/**

132

* Get the request identifier this response corresponds to

133

* @return long representing the original request ID

134

*/

135

public long requestId();

136

137

@Override

138

public Type type();

139

140

@Override

141

public ManagedBuffer body();

142

}

143

```

144

145

### RpcFailure

146

147

Message for returning RPC call failures and error information.

148

149

```java { .api }

150

public final class RpcFailure extends AbstractResponseMessage {

151

/**

152

* Create an RPC failure message

153

* @param requestId - ID of the failed RPC request

154

* @param errorString - String describing the error that occurred

155

*/

156

public RpcFailure(long requestId, String errorString);

157

158

/**

159

* Get the request identifier this failure corresponds to

160

* @return long representing the original request ID

161

*/

162

public long requestId();

163

164

/**

165

* Get the error message describing the failure

166

* @return String containing error details

167

*/

168

public String errorString();

169

170

@Override

171

public Type type();

172

}

173

```

174

175

## Chunk Transfer Messages

176

177

### ChunkFetchRequest

178

179

Message for requesting specific chunks of data from a stream.

180

181

```java { .api }

182

public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {

183

/**

184

* Create a chunk fetch request

185

* @param streamChunkId - StreamChunkId identifying the chunk to fetch

186

*/

187

public ChunkFetchRequest(StreamChunkId streamChunkId);

188

189

/**

190

* Get the stream chunk identifier

191

* @return StreamChunkId specifying which chunk to fetch

192

*/

193

public StreamChunkId streamChunkId();

194

195

@Override

196

public Type type();

197

}

198

```

199

200

### ChunkFetchSuccess

201

202

Message for returning successfully fetched chunk data.

203

204

```java { .api }

205

public final class ChunkFetchSuccess extends AbstractResponseMessage {

206

/**

207

* Create a successful chunk fetch response

208

* @param streamChunkId - StreamChunkId of the fetched chunk

209

* @param buffer - ManagedBuffer containing the chunk data

210

*/

211

public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);

212

213

/**

214

* Get the stream chunk identifier

215

* @return StreamChunkId of the returned chunk

216

*/

217

public StreamChunkId streamChunkId();

218

219

@Override

220

public Type type();

221

222

@Override

223

public ManagedBuffer body();

224

}

225

```

226

227

### ChunkFetchFailure

228

229

Message for reporting chunk fetch failures.

230

231

```java { .api }

232

public final class ChunkFetchFailure extends AbstractResponseMessage {

233

/**

234

* Create a chunk fetch failure response

235

* @param streamChunkId - StreamChunkId of the failed chunk

236

* @param errorString - String describing the fetch error

237

*/

238

public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);

239

240

/**

241

* Get the stream chunk identifier

242

* @return StreamChunkId of the failed chunk

243

*/

244

public StreamChunkId streamChunkId();

245

246

/**

247

* Get the error message describing the failure

248

* @return String containing error details

249

*/

250

public String errorString();

251

252

@Override

253

public Type type();

254

}

255

```

256

257

### StreamChunkId

258

259

Identifier for specific chunks within streams.

260

261

```java { .api }

262

public final class StreamChunkId implements Encodable {

263

/**

264

* Create a stream chunk identifier

265

* @param streamId - Identifier of the stream

266

* @param chunkIndex - Index of the chunk within the stream

267

*/

268

public StreamChunkId(long streamId, int chunkIndex);

269

270

/**

271

* Get the stream identifier

272

* @return long representing the stream ID

273

*/

274

public long streamId();

275

276

/**

277

* Get the chunk index

278

* @return int representing the chunk index within the stream

279

*/

280

public int chunkIndex();

281

282

@Override

283

public int encodedLength();

284

285

@Override

286

public void encode(ByteBuf buf);

287

}

288

```

289

290

## Streaming Messages

291

292

### StreamRequest

293

294

Message for requesting to open a named stream.

295

296

```java { .api }

297

public final class StreamRequest extends AbstractMessage implements RequestMessage {

298

/**

299

* Create a stream request

300

* @param streamId - String identifier of the stream to open

301

*/

302

public StreamRequest(String streamId);

303

304

/**

305

* Get the stream identifier

306

* @return String representing the stream ID

307

*/

308

public String streamId();

309

310

@Override

311

public Type type();

312

}

313

```

314

315

### StreamResponse

316

317

Message for returning streaming data.

318

319

```java { .api }

320

public final class StreamResponse extends AbstractResponseMessage {

321

/**

322

* Create a stream response

323

* @param streamId - String identifier of the stream

324

* @param byteCount - Number of bytes in the stream

325

* @param body - ManagedBuffer containing the stream data

326

*/

327

public StreamResponse(String streamId, long byteCount, ManagedBuffer body);

328

329

/**

330

* Get the stream identifier

331

* @return String representing the stream ID

332

*/

333

public String streamId();

334

335

/**

336

* Get the number of bytes in the stream

337

* @return long representing the byte count

338

*/

339

public long byteCount();

340

341

@Override

342

public Type type();

343

344

@Override

345

public ManagedBuffer body();

346

}

347

```

348

349

### StreamFailure

350

351

Message for reporting stream operation failures.

352

353

```java { .api }

354

public final class StreamFailure extends AbstractResponseMessage {

355

/**

356

* Create a stream failure response

357

* @param streamId - String identifier of the failed stream

358

* @param errorString - String describing the stream error

359

*/

360

public StreamFailure(String streamId, String errorString);

361

362

/**

363

* Get the stream identifier

364

* @return String representing the failed stream ID

365

*/

366

public String streamId();

367

368

/**

369

* Get the error message

370

* @return String containing error details

371

*/

372

public String errorString();

373

374

@Override

375

public Type type();

376

}

377

```

378

379

## Upload and One-Way Messages

380

381

### UploadStream

382

383

Message for uploading stream data to the server.

384

385

```java { .api }

386

public final class UploadStream extends AbstractMessage implements RequestMessage {

387

/**

388

* Create an upload stream message

389

* @param requestId - Unique identifier for this upload request

390

* @param meta - ManagedBuffer containing metadata about the upload

391

* @param body - ManagedBuffer containing the data to upload

392

*/

393

public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body);

394

395

/**

396

* Get the request identifier

397

* @return long representing the upload request ID

398

*/

399

public long requestId();

400

401

/**

402

* Get the metadata buffer

403

* @return ManagedBuffer containing upload metadata

404

*/

405

public ManagedBuffer meta();

406

407

@Override

408

public Type type();

409

410

@Override

411

public ManagedBuffer body();

412

}

413

```

414

415

### OneWayMessage

416

417

Message for one-way communication where no response is expected.

418

419

```java { .api }

420

public final class OneWayMessage extends AbstractMessage implements RequestMessage {

421

/**

422

* Create a one-way message

423

* @param body - ManagedBuffer containing the message data

424

*/

425

public OneWayMessage(ManagedBuffer body);

426

427

@Override

428

public Type type();

429

430

@Override

431

public ManagedBuffer body();

432

433

@Override

434

public boolean isBodyInFrame();

435

}

436

```

437

438

## Merged Block Messages

439

440

### MergedBlockMetaRequest

441

442

Message for requesting metadata about merged blocks in shuffle operations.

443

444

```java { .api }

445

public final class MergedBlockMetaRequest extends AbstractMessage implements RequestMessage {

446

/**

447

* Create a merged block metadata request

448

* @param requestId - Unique identifier for this request

449

* @param appId - Application identifier

450

* @param shuffleId - Shuffle operation identifier

451

* @param shuffleMergeId - Shuffle merge identifier

452

* @param reduceId - Reducer task identifier

453

*/

454

public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, int shuffleMergeId, int reduceId);

455

456

/**

457

* Get the request identifier

458

* @return long representing the request ID

459

*/

460

public long requestId();

461

462

/**

463

* Get the application identifier

464

* @return String representing the app ID

465

*/

466

public String appId();

467

468

/**

469

* Get the shuffle identifier

470

* @return int representing the shuffle ID

471

*/

472

public int shuffleId();

473

474

/**

475

* Get the shuffle merge identifier

476

* @return int representing the shuffle merge ID

477

*/

478

public int shuffleMergeId();

479

480

/**

481

* Get the reduce task identifier

482

* @return int representing the reduce ID

483

*/

484

public int reduceId();

485

486

@Override

487

public Type type();

488

}

489

```

490

491

### MergedBlockMetaSuccess

492

493

Message for returning successful merged block metadata.

494

495

```java { .api }

496

public final class MergedBlockMetaSuccess extends AbstractResponseMessage {

497

/**

498

* Create a successful merged block metadata response

499

* @param requestId - ID of the original request

500

* @param numChunks - Number of chunks in the merged block

501

* @param buffer - ManagedBuffer containing the metadata

502

*/

503

public MergedBlockMetaSuccess(long requestId, int numChunks, ManagedBuffer buffer);

504

505

/**

506

* Get the request identifier

507

* @return long representing the original request ID

508

*/

509

public long requestId();

510

511

/**

512

* Get the number of chunks

513

* @return int representing the chunk count

514

*/

515

public int numChunks();

516

517

@Override

518

public Type type();

519

520

@Override

521

public ManagedBuffer body();

522

}

523

```

524

525

## Message Encoding and Decoding

526

527

### Encoders Utility Class

528

529

Utility class for encoding and decoding protocol messages.

530

531

```java { .api }

532

public class Encoders {

533

/**

534

* Decode a message from ByteBuf

535

* @param msgType - Message type byte

536

* @param in - ByteBuf containing the encoded message

537

* @return Message instance decoded from the buffer

538

*/

539

public static Message decode(Message.Type msgType, ByteBuf in);

540

541

/**

542

* Encode a message to ByteBuf

543

* @param msg - Message to encode

544

* @param out - ByteBuf to write the encoded message to

545

*/

546

public static void encode(Message msg, ByteBuf out);

547

}

548

```

549

550

### MessageEncoder

551

552

Netty encoder for converting Message objects to ByteBuf for network transmission.

553

554

```java { .api }

555

public final class MessageEncoder extends MessageToByteEncoder<Message> {

556

public static final MessageEncoder INSTANCE = new MessageEncoder();

557

558

@Override

559

protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;

560

}

561

```

562

563

### MessageDecoder

564

565

Netty decoder for converting ByteBuf data into Message objects.

566

567

```java { .api }

568

public final class MessageDecoder extends LengthFieldBasedFrameDecoder {

569

public static final MessageDecoder INSTANCE = new MessageDecoder();

570

571

@Override

572

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;

573

}

574

```

575

576

## Usage Examples

577

578

### Creating and Sending RPC Messages

579

580

```java

581

import org.apache.spark.network.protocol.*;

582

import org.apache.spark.network.buffer.NioManagedBuffer;

583

584

// Create RPC request

585

String rpcData = "{ \"method\": \"process\", \"params\": [1, 2, 3] }";

586

ManagedBuffer requestBuffer = new NioManagedBuffer(ByteBuffer.wrap(rpcData.getBytes()));

587

long requestId = System.currentTimeMillis();

588

RpcRequest request = new RpcRequest(requestId, requestBuffer);

589

590

System.out.println("Created RPC request with ID: " + request.requestId());

591

System.out.println("Message type: " + request.type());

592

System.out.println("Body in frame: " + request.isBodyInFrame());

593

594

// Create RPC response

595

String responseData = "{ \"result\": 6, \"status\": \"success\" }";

596

ManagedBuffer responseBuffer = new NioManagedBuffer(ByteBuffer.wrap(responseData.getBytes()));

597

RpcResponse response = new RpcResponse(requestId, responseBuffer);

598

599

// Create RPC failure

600

RpcFailure failure = new RpcFailure(requestId, "Method not found: process");

601

System.out.println("Failure message: " + failure.errorString());

602

```

603

604

### Working with Chunk Fetch Messages

605

606

```java

607

import org.apache.spark.network.protocol.*;

608

609

// Create stream chunk identifier

610

long streamId = 12345L;

611

int chunkIndex = 0;

612

StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);

613

614

System.out.println("Stream ID: " + chunkId.streamId());

615

System.out.println("Chunk index: " + chunkId.chunkIndex());

616

617

// Create chunk fetch request

618

ChunkFetchRequest fetchRequest = new ChunkFetchRequest(chunkId);

619

System.out.println("Fetch request type: " + fetchRequest.type());

620

621

// Create successful chunk response

622

byte[] chunkData = "This is chunk data".getBytes();

623

ManagedBuffer chunkBuffer = new NioManagedBuffer(ByteBuffer.wrap(chunkData));

624

ChunkFetchSuccess fetchSuccess = new ChunkFetchSuccess(chunkId, chunkBuffer);

625

626

System.out.println("Success response type: " + fetchSuccess.type());

627

System.out.println("Chunk size: " + fetchSuccess.body().size());

628

629

// Create chunk fetch failure

630

ChunkFetchFailure fetchFailure = new ChunkFetchFailure(chunkId, "Chunk not found");

631

System.out.println("Failure error: " + fetchFailure.errorString());

632

```

633

634

### Streaming Message Examples

635

636

```java

637

// Create stream request

638

String streamId = "data-stream-001";

639

StreamRequest streamRequest = new StreamRequest(streamId);

640

System.out.println("Requesting stream: " + streamRequest.streamId());

641

642

// Create stream response

643

String streamData = "Line 1\nLine 2\nLine 3\n";

644

ManagedBuffer streamBuffer = new NioManagedBuffer(ByteBuffer.wrap(streamData.getBytes()));

645

StreamResponse streamResponse = new StreamResponse(streamId, streamData.length(), streamBuffer);

646

647

System.out.println("Stream response for: " + streamResponse.streamId());

648

System.out.println("Byte count: " + streamResponse.byteCount());

649

650

// Create stream failure

651

StreamFailure streamFailure = new StreamFailure(streamId, "Stream corrupted");

652

System.out.println("Stream failure: " + streamFailure.errorString());

653

```

654

655

### Upload and One-Way Messages

656

657

```java

658

// Create upload stream message

659

String metadata = "{ \"filename\": \"data.txt\", \"size\": 1024 }";

660

ManagedBuffer metaBuffer = new NioManagedBuffer(ByteBuffer.wrap(metadata.getBytes()));

661

662

String uploadData = "File content to upload";

663

ManagedBuffer dataBuffer = new NioManagedBuffer(ByteBuffer.wrap(uploadData.getBytes()));

664

665

long uploadRequestId = System.currentTimeMillis();

666

UploadStream uploadMessage = new UploadStream(uploadRequestId, metaBuffer, dataBuffer);

667

668

System.out.println("Upload request ID: " + uploadMessage.requestId());

669

System.out.println("Upload metadata size: " + uploadMessage.meta().size());

670

System.out.println("Upload data size: " + uploadMessage.body().size());

671

672

// Create one-way message

673

String oneWayData = "Fire and forget message";

674

ManagedBuffer oneWayBuffer = new NioManagedBuffer(ByteBuffer.wrap(oneWayData.getBytes()));

675

OneWayMessage oneWayMessage = new OneWayMessage(oneWayBuffer);

676

677

System.out.println("One-way message type: " + oneWayMessage.type());

678

System.out.println("One-way body in frame: " + oneWayMessage.isBodyInFrame());

679

```

680

681

### Message Encoding and Decoding

682

683

```java

684

import io.netty.buffer.ByteBuf;

685

import io.netty.buffer.Unpooled;

686

687

// Encode a message

688

RpcRequest request = new RpcRequest(123L, new NioManagedBuffer(ByteBuffer.wrap("test".getBytes())));

689

ByteBuf encodedBuffer = Unpooled.buffer();

690

691

try {

692

Encoders.encode(request, encodedBuffer);

693

System.out.println("Encoded message size: " + encodedBuffer.readableBytes());

694

695

// Decode the message back

696

encodedBuffer.resetReaderIndex();

697

byte msgTypeByte = encodedBuffer.readByte();

698

Message.Type msgType = Message.Type.values()[msgTypeByte];

699

700

Message decodedMessage = Encoders.decode(msgType, encodedBuffer);

701

System.out.println("Decoded message type: " + decodedMessage.type());

702

703

if (decodedMessage instanceof RpcRequest) {

704

RpcRequest decodedRequest = (RpcRequest) decodedMessage;

705

System.out.println("Decoded request ID: " + decodedRequest.requestId());

706

}

707

708

} finally {

709

encodedBuffer.release();

710

}

711

```

712

713

### Custom Message Handling

714

715

```java

716

// Process different message types

717

public void handleMessage(Message message) {

718

switch (message.type()) {

719

case RpcRequest:

720

RpcRequest rpcReq = (RpcRequest) message;

721

System.out.println("Handling RPC request: " + rpcReq.requestId());

722

break;

723

724

case ChunkFetchRequest:

725

ChunkFetchRequest chunkReq = (ChunkFetchRequest) message;

726

System.out.println("Handling chunk fetch for stream: " + chunkReq.streamChunkId().streamId());

727

break;

728

729

case StreamRequest:

730

StreamRequest streamReq = (StreamRequest) message;

731

System.out.println("Handling stream request: " + streamReq.streamId());

732

break;

733

734

case OneWayMessage:

735

OneWayMessage oneWay = (OneWayMessage) message;

736

System.out.println("Handling one-way message with body size: " + oneWay.body().size());

737

break;

738

739

default:

740

System.out.println("Unknown message type: " + message.type());

741

}

742

}

743

```

744

745

## Abstract Base Classes

746

747

### AbstractMessage

748

749

Base implementation for common message functionality.

750

751

```java { .api }

752

public abstract class AbstractMessage implements Message {

753

@Override

754

public boolean isBodyInFrame();

755

756

@Override

757

public int encodedLength();

758

759

@Override

760

public void encode(ByteBuf buf);

761

}

762

```

763

764

### AbstractResponseMessage

765

766

Base implementation for response messages.

767

768

```java { .api }

769

public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {

770

@Override

771

public ManagedBuffer body();

772

773

@Override

774

public boolean isBodyInFrame();

775

}

776

```