or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-api.mdevent-bus.mdfile-system.mdhttp.mdindex.mdnetworking.mdutilities.md

event-bus.mddocs/

0

# Event Bus

1

2

Distributed messaging system for inter-verticle and inter-node communication with publish/subscribe and request/response patterns, custom codecs, and delivery options.

3

4

## Capabilities

5

6

### Event Bus Access

7

8

Get the event bus instance for messaging operations.

9

10

```java { .api }

11

/**

12

* Get the event bus instance

13

* @return EventBus instance

14

*/

15

EventBus eventBus();

16

17

/**

18

* Event Bus interface for distributed messaging

19

*/

20

interface EventBus extends Measured {

21

/**

22

* Send a message to an address

23

* @param address Target address

24

* @param message Message to send

25

* @return this for chaining

26

*/

27

EventBus send(String address, Object message);

28

29

/**

30

* Send a message with delivery options

31

* @param address Target address

32

* @param message Message to send

33

* @param options Delivery options

34

* @return this for chaining

35

*/

36

EventBus send(String address, Object message, DeliveryOptions options);

37

38

/**

39

* Publish a message to all subscribers

40

* @param address Target address

41

* @param message Message to publish

42

* @return this for chaining

43

*/

44

EventBus publish(String address, Object message);

45

46

/**

47

* Publish a message with delivery options

48

* @param address Target address

49

* @param message Message to publish

50

* @param options Delivery options

51

* @return this for chaining

52

*/

53

EventBus publish(String address, Object message, DeliveryOptions options);

54

55

/**

56

* Send a message and expect a reply

57

* @param address Target address

58

* @param message Message to send

59

* @return Future that completes with reply message

60

*/

61

<T> Future<Message<T>> request(String address, Object message);

62

63

/**

64

* Send a message and expect a reply with options

65

* @param address Target address

66

* @param message Message to send

67

* @param options Delivery options

68

* @return Future that completes with reply message

69

*/

70

<T> Future<Message<T>> request(String address, Object message, DeliveryOptions options);

71

72

/**

73

* Create a message consumer for an address

74

* @param address Address to consume from

75

* @return MessageConsumer instance

76

*/

77

<T> MessageConsumer<T> consumer(String address);

78

79

/**

80

* Create a message consumer with handler

81

* @param address Address to consume from

82

* @param handler Message handler

83

* @return MessageConsumer instance

84

*/

85

<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);

86

87

/**

88

* Create a local message consumer (cluster-only)

89

* @param address Address to consume from

90

* @return MessageConsumer instance

91

*/

92

<T> MessageConsumer<T> localConsumer(String address);

93

94

/**

95

* Create a local message consumer with handler

96

* @param address Address to consume from

97

* @param handler Message handler

98

* @return MessageConsumer instance

99

*/

100

<T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);

101

102

/**

103

* Create a message sender for an address

104

* @param address Target address

105

* @return MessageProducer instance

106

*/

107

<T> MessageProducer<T> sender(String address);

108

109

/**

110

* Create a message sender with delivery options

111

* @param address Target address

112

* @param options Default delivery options

113

* @return MessageProducer instance

114

*/

115

<T> MessageProducer<T> sender(String address, DeliveryOptions options);

116

117

/**

118

* Create a message publisher for an address

119

* @param address Target address

120

* @return MessageProducer instance

121

*/

122

<T> MessageProducer<T> publisher(String address);

123

124

/**

125

* Create a message publisher with delivery options

126

* @param address Target address

127

* @param options Default delivery options

128

* @return MessageProducer instance

129

*/

130

<T> MessageProducer<T> publisher(String address, DeliveryOptions options);

131

132

/**

133

* Register a custom message codec

134

* @param codec Message codec to register

135

* @return this for chaining

136

*/

137

EventBus registerCodec(MessageCodec codec);

138

139

/**

140

* Unregister a message codec

141

* @param name Codec name to unregister

142

* @return this for chaining

143

*/

144

EventBus unregisterCodec(String name);

145

146

/**

147

* Register a default codec for a type

148

* @param clazz Class type

149

* @param codec Message codec for the type

150

* @return this for chaining

151

*/

152

<T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec);

153

154

/**

155

* Unregister a default codec

156

* @param clazz Class type

157

* @return this for chaining

158

*/

159

EventBus unregisterDefaultCodec(Class clazz);

160

161

/**

162

* Close the event bus

163

* @return Future that completes when closed

164

*/

165

Future<Void> close();

166

}

167

```

168

169

### Message Handling

170

171

Handle incoming messages with access to body, headers, and reply functionality.

172

173

```java { .api }

174

/**

175

* Event bus message interface

176

*/

177

interface Message<T> {

178

/**

179

* Get the message address

180

* @return Message address

181

*/

182

String address();

183

184

/**

185

* Get message headers

186

* @return Headers MultiMap

187

*/

188

MultiMap headers();

189

190

/**

191

* Get the message body

192

* @return Message body

193

*/

194

T body();

195

196

/**

197

* Get the reply address

198

* @return Reply address or null

199

*/

200

String replyAddress();

201

202

/**

203

* Check if this is a send (point-to-point) message

204

* @return true if send message

205

*/

206

boolean isSend();

207

208

/**

209

* Reply to this message

210

* @param message Reply message

211

*/

212

void reply(Object message);

213

214

/**

215

* Reply to this message with delivery options

216

* @param message Reply message

217

* @param options Delivery options for reply

218

*/

219

void reply(Object message, DeliveryOptions options);

220

221

/**

222

* Reply and expect a reply back

223

* @param message Reply message

224

* @return Future that completes with reply to reply

225

*/

226

<R> Future<Message<R>> replyAndRequest(Object message);

227

228

/**

229

* Reply and expect a reply back with options

230

* @param message Reply message

231

* @param options Delivery options

232

* @return Future that completes with reply to reply

233

*/

234

<R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options);

235

236

/**

237

* Fail the message handling

238

* @param failureCode Failure code

239

* @param message Failure message

240

*/

241

void fail(int failureCode, String message);

242

}

243

```

244

245

### Message Consumers

246

247

Consume messages from event bus addresses with lifecycle management.

248

249

```java { .api }

250

/**

251

* Message consumer for receiving messages

252

*/

253

interface MessageConsumer<T> extends ReadStream<Message<T>> {

254

/**

255

* Get the consumer address

256

* @return Consumer address

257

*/

258

String address();

259

260

/**

261

* Set the maximum number of buffered messages

262

* @param maxBufferedMessages Maximum buffered messages

263

* @return this for chaining

264

*/

265

MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages);

266

267

/**

268

* Check if consumer is registered

269

* @return true if registered

270

*/

271

boolean isRegistered();

272

273

/**

274

* Register the consumer

275

* @return Future that completes when registered

276

*/

277

Future<Void> register();

278

279

/**

280

* Unregister the consumer

281

* @return Future that completes when unregistered

282

*/

283

Future<Void> unregister();

284

285

/**

286

* Get completion future (completes when registered)

287

* @return Completion future

288

*/

289

Future<Void> completion();

290

291

/**

292

* Get stream of message bodies only

293

* @return ReadStream of message bodies

294

*/

295

ReadStream<T> bodyStream();

296

}

297

```

298

299

### Message Producers

300

301

Send messages to event bus addresses with producer pattern.

302

303

```java { .api }

304

/**

305

* Message producer for sending messages

306

*/

307

interface MessageProducer<T> extends WriteStream<T> {

308

/**

309

* Get the producer address

310

* @return Producer address

311

*/

312

String address();

313

314

/**

315

* Send a message

316

* @param message Message to send

317

* @return Future that completes when sent

318

*/

319

Future<Void> send(T message);

320

321

/**

322

* Write a message (alias for send)

323

* @param data Message to write

324

* @return Future that completes when written

325

*/

326

Future<Void> write(T data);

327

328

/**

329

* Get delivery options

330

* @return Current delivery options

331

*/

332

DeliveryOptions deliveryOptions();

333

334

/**

335

* Close the producer

336

* @return Future that completes when closed

337

*/

338

Future<Void> close();

339

}

340

```

341

342

### Custom Message Codecs

343

344

Define custom encoding/decoding for message types.

345

346

```java { .api }

347

/**

348

* Message codec for custom types

349

*/

350

interface MessageCodec<S, R> {

351

/**

352

* Encode message to wire format

353

* @param buffer Buffer to write to

354

* @param s Object to encode

355

*/

356

void encodeToWire(Buffer buffer, S s);

357

358

/**

359

* Decode message from wire format

360

* @param pos Position in buffer

361

* @param buffer Buffer to read from

362

* @return Decoded object

363

*/

364

R decodeFromWire(int pos, Buffer buffer);

365

366

/**

367

* Transform message for local delivery

368

* @param s Object to transform

369

* @return Transformed object

370

*/

371

R transform(S s);

372

373

/**

374

* Get codec name

375

* @return Codec name

376

*/

377

String name();

378

379

/**

380

* Get system codec ID

381

* @return System codec ID

382

*/

383

byte systemCodecID();

384

}

385

```

386

387

### Delivery Options

388

389

Configure message delivery behavior and headers.

390

391

```java { .api }

392

/**

393

* Options for message delivery

394

*/

395

class DeliveryOptions {

396

/**

397

* Set send timeout

398

* @param timeout Timeout in milliseconds

399

* @return this for chaining

400

*/

401

DeliveryOptions setSendTimeout(long timeout);

402

403

/**

404

* Add a header

405

* @param key Header key

406

* @param value Header value

407

* @return this for chaining

408

*/

409

DeliveryOptions addHeader(String key, String value);

410

411

/**

412

* Set headers

413

* @param headers Headers multimap

414

* @return this for chaining

415

*/

416

DeliveryOptions setHeaders(MultiMap headers);

417

418

/**

419

* Set local only delivery (no cluster)

420

* @param localOnly Whether to deliver locally only

421

* @return this for chaining

422

*/

423

DeliveryOptions setLocalOnly(boolean localOnly);

424

425

/**

426

* Set codec name for message encoding

427

* @param codecName Codec name

428

* @return this for chaining

429

*/

430

DeliveryOptions setCodecName(String codecName);

431

432

/**

433

* Set tracing policy

434

* @param tracingPolicy Tracing policy

435

* @return this for chaining

436

*/

437

DeliveryOptions setTracingPolicy(TracingPolicy tracingPolicy);

438

}

439

```

440

441

### Event Bus Configuration

442

443

Configure event bus behavior and clustering options.

444

445

```java { .api }

446

/**

447

* Event bus configuration options

448

*/

449

class EventBusOptions {

450

EventBusOptions setSendBufferSize(int sendBufferSize);

451

EventBusOptions setReceiveBufferSize(int receiveBufferSize);

452

EventBusOptions setClusterPublicHost(String clusterPublicHost);

453

EventBusOptions setClusterPublicPort(int clusterPublicPort);

454

EventBusOptions setClusterPingInterval(long clusterPingInterval);

455

EventBusOptions setClusterPingReplyInterval(long clusterPingReplyInterval);

456

EventBusOptions setPort(int port);

457

EventBusOptions setHost(String host);

458

EventBusOptions setAcceptBacklog(int acceptBacklog);

459

EventBusOptions setReconnectAttempts(int attempts);

460

EventBusOptions setReconnectInterval(long interval);

461

EventBusOptions setSsl(boolean ssl);

462

EventBusOptions setKeyCertOptions(KeyCertOptions options);

463

EventBusOptions setTrustOptions(TrustOptions options);

464

EventBusOptions setClientAuth(ClientAuth clientAuth);

465

}

466

```

467

468

### Exception Handling

469

470

Handle event bus failures and reply exceptions.

471

472

```java { .api }

473

/**

474

* Exception thrown when reply fails

475

*/

476

class ReplyException extends VertxException {

477

/**

478

* Get the failure type

479

* @return Failure type

480

*/

481

ReplyFailure failureType();

482

483

/**

484

* Get the failure code

485

* @return Failure code

486

*/

487

int failureCode();

488

489

/**

490

* Get the failure message

491

* @return Failure message

492

*/

493

String getMessage();

494

}

495

496

/**

497

* Types of reply failures

498

*/

499

enum ReplyFailure {

500

TIMEOUT, // Request timed out

501

NO_HANDLERS, // No handlers registered for address

502

RECIPIENT_FAILURE // Handler threw exception or called fail()

503

}

504

```

505

506

## Usage Examples

507

508

**Basic Message Sending:**

509

510

```java

511

import io.vertx.core.eventbus.EventBus;

512

513

EventBus eventBus = vertx.eventBus();

514

515

// Send a message

516

eventBus.send("user.notifications", "Hello User!");

517

518

// Publish to all subscribers

519

eventBus.publish("system.broadcast", "System maintenance in 5 minutes");

520

521

// Send with headers

522

DeliveryOptions options = new DeliveryOptions()

523

.addHeader("userId", "123")

524

.addHeader("priority", "high");

525

526

eventBus.send("user.email", "Important message", options);

527

```

528

529

**Message Consumer:**

530

531

```java

532

import io.vertx.core.eventbus.Message;

533

534

// Simple consumer

535

eventBus.consumer("user.notifications", message -> {

536

String body = message.body();

537

System.out.println("Received notification: " + body);

538

});

539

540

// Consumer with reply

541

eventBus.consumer("user.query", message -> {

542

String query = message.body();

543

System.out.println("Processing query: " + query);

544

545

// Process and reply

546

String result = processQuery(query);

547

message.reply(result);

548

});

549

550

// Consumer with error handling

551

eventBus.consumer("user.process", message -> {

552

try {

553

String data = message.body();

554

String result = processData(data);

555

message.reply(result);

556

} catch (Exception e) {

557

message.fail(500, "Processing failed: " + e.getMessage());

558

}

559

});

560

```

561

562

**Request-Response Pattern:**

563

564

```java

565

// Send request and handle response

566

eventBus.<String>request("user.service", "getUserInfo")

567

.onSuccess(reply -> {

568

String userInfo = reply.body();

569

System.out.println("User info: " + userInfo);

570

})

571

.onFailure(err -> {

572

if (err instanceof ReplyException) {

573

ReplyException replyErr = (ReplyException) err;

574

System.err.println("Request failed: " + replyErr.failureType() +

575

" - " + replyErr.getMessage());

576

}

577

});

578

579

// With timeout

580

DeliveryOptions options = new DeliveryOptions().setSendTimeout(5000);

581

eventBus.<JsonObject>request("slow.service", "process", options)

582

.onSuccess(reply -> {

583

JsonObject result = reply.body();

584

System.out.println("Got result: " + result.encode());

585

})

586

.onFailure(err -> {

587

System.err.println("Request timed out or failed: " + err.getMessage());

588

});

589

```

590

591

**Message Producer Pattern:**

592

593

```java

594

import io.vertx.core.eventbus.MessageProducer;

595

596

// Create a producer for repeated sending

597

MessageProducer<String> producer = eventBus.sender("log.events");

598

599

// Send messages

600

producer.send("Application started");

601

producer.send("User logged in");

602

producer.send("Processing completed");

603

604

// Close when done

605

producer.close();

606

607

// Publisher for broadcast

608

MessageProducer<JsonObject> publisher = eventBus.publisher("system.events");

609

610

JsonObject event = new JsonObject()

611

.put("type", "user_login")

612

.put("userId", "user123")

613

.put("timestamp", System.currentTimeMillis());

614

615

publisher.write(event);

616

```

617

618

**Custom Message Codec:**

619

620

```java

621

import io.vertx.core.buffer.Buffer;

622

import io.vertx.core.eventbus.MessageCodec;

623

624

public class PersonCodec implements MessageCodec<Person, Person> {

625

626

@Override

627

public void encodeToWire(Buffer buffer, Person person) {

628

byte[] nameBytes = person.getName().getBytes();

629

buffer.appendInt(nameBytes.length);

630

buffer.appendBytes(nameBytes);

631

buffer.appendInt(person.getAge());

632

}

633

634

@Override

635

public Person decodeFromWire(int pos, Buffer buffer) {

636

int nameLength = buffer.getInt(pos);

637

pos += 4;

638

String name = buffer.getString(pos, pos + nameLength);

639

pos += nameLength;

640

int age = buffer.getInt(pos);

641

return new Person(name, age);

642

}

643

644

@Override

645

public Person transform(Person person) {

646

// For local delivery, return as-is

647

return person;

648

}

649

650

@Override

651

public String name() {

652

return "personCodec";

653

}

654

655

@Override

656

public byte systemCodecID() {

657

return -1; // User codec

658

}

659

}

660

661

// Register the codec

662

eventBus.registerCodec(new PersonCodec());

663

664

// Use with custom objects

665

Person person = new Person("John", 30);

666

eventBus.send("person.created", person);

667

```

668

669

**Clustered Event Bus:**

670

671

```java

672

// In clustered Vert.x, messages are distributed across nodes

673

VertxOptions options = new VertxOptions()

674

.setHAEnabled(true)

675

.setEventBusOptions(new EventBusOptions()

676

.setClusterPublicHost("192.168.1.100")

677

.setClusterPublicPort(15701));

678

679

Vertx.clusteredVertx(options).onSuccess(vertx -> {

680

EventBus eventBus = vertx.eventBus();

681

682

// This consumer will receive messages from any node

683

eventBus.consumer("cluster.broadcast", message -> {

684

System.out.println("Received from cluster: " + message.body());

685

});

686

687

// Send message that can be received by any node

688

eventBus.publish("cluster.broadcast", "Hello from node " +

689

System.getProperty("node.id"));

690

});

691

```