0
# Event Bus
1
2
Distributed messaging system for inter-verticle and inter-node communication with publish/subscribe and request/response patterns, custom codecs, and delivery options.
3
4
## Capabilities
5
6
### Event Bus Access
7
8
Get the event bus instance for messaging operations.
9
10
```java { .api }
11
/**
12
* Get the event bus instance
13
* @return EventBus instance
14
*/
15
EventBus eventBus();
16
17
/**
18
* Event Bus interface for distributed messaging
19
*/
20
interface EventBus extends Measured {
21
/**
22
* Send a message to an address
23
* @param address Target address
24
* @param message Message to send
25
* @return this for chaining
26
*/
27
EventBus send(String address, Object message);
28
29
/**
30
* Send a message with delivery options
31
* @param address Target address
32
* @param message Message to send
33
* @param options Delivery options
34
* @return this for chaining
35
*/
36
EventBus send(String address, Object message, DeliveryOptions options);
37
38
/**
39
* Publish a message to all subscribers
40
* @param address Target address
41
* @param message Message to publish
42
* @return this for chaining
43
*/
44
EventBus publish(String address, Object message);
45
46
/**
47
* Publish a message with delivery options
48
* @param address Target address
49
* @param message Message to publish
50
* @param options Delivery options
51
* @return this for chaining
52
*/
53
EventBus publish(String address, Object message, DeliveryOptions options);
54
55
/**
56
* Send a message and expect a reply
57
* @param address Target address
58
* @param message Message to send
59
* @return Future that completes with reply message
60
*/
61
<T> Future<Message<T>> request(String address, Object message);
62
63
/**
64
* Send a message and expect a reply with options
65
* @param address Target address
66
* @param message Message to send
67
* @param options Delivery options
68
* @return Future that completes with reply message
69
*/
70
<T> Future<Message<T>> request(String address, Object message, DeliveryOptions options);
71
72
/**
73
* Create a message consumer for an address
74
* @param address Address to consume from
75
* @return MessageConsumer instance
76
*/
77
<T> MessageConsumer<T> consumer(String address);
78
79
/**
80
* Create a message consumer with handler
81
* @param address Address to consume from
82
* @param handler Message handler
83
* @return MessageConsumer instance
84
*/
85
<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);
86
87
/**
88
* Create a local message consumer (cluster-only)
89
* @param address Address to consume from
90
* @return MessageConsumer instance
91
*/
92
<T> MessageConsumer<T> localConsumer(String address);
93
94
/**
95
* Create a local message consumer with handler
96
* @param address Address to consume from
97
* @param handler Message handler
98
* @return MessageConsumer instance
99
*/
100
<T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);
101
102
/**
103
* Create a message sender for an address
104
* @param address Target address
105
* @return MessageProducer instance
106
*/
107
<T> MessageProducer<T> sender(String address);
108
109
/**
110
* Create a message sender with delivery options
111
* @param address Target address
112
* @param options Default delivery options
113
* @return MessageProducer instance
114
*/
115
<T> MessageProducer<T> sender(String address, DeliveryOptions options);
116
117
/**
118
* Create a message publisher for an address
119
* @param address Target address
120
* @return MessageProducer instance
121
*/
122
<T> MessageProducer<T> publisher(String address);
123
124
/**
125
* Create a message publisher with delivery options
126
* @param address Target address
127
* @param options Default delivery options
128
* @return MessageProducer instance
129
*/
130
<T> MessageProducer<T> publisher(String address, DeliveryOptions options);
131
132
/**
133
* Register a custom message codec
134
* @param codec Message codec to register
135
* @return this for chaining
136
*/
137
EventBus registerCodec(MessageCodec codec);
138
139
/**
140
* Unregister a message codec
141
* @param name Codec name to unregister
142
* @return this for chaining
143
*/
144
EventBus unregisterCodec(String name);
145
146
/**
147
* Register a default codec for a type
148
* @param clazz Class type
149
* @param codec Message codec for the type
150
* @return this for chaining
151
*/
152
<T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec);
153
154
/**
155
* Unregister a default codec
156
* @param clazz Class type
157
* @return this for chaining
158
*/
159
EventBus unregisterDefaultCodec(Class clazz);
160
161
/**
162
* Close the event bus
163
* @return Future that completes when closed
164
*/
165
Future<Void> close();
166
}
167
```
168
169
### Message Handling
170
171
Handle incoming messages with access to body, headers, and reply functionality.
172
173
```java { .api }
174
/**
175
* Event bus message interface
176
*/
177
interface Message<T> {
178
/**
179
* Get the message address
180
* @return Message address
181
*/
182
String address();
183
184
/**
185
* Get message headers
186
* @return Headers MultiMap
187
*/
188
MultiMap headers();
189
190
/**
191
* Get the message body
192
* @return Message body
193
*/
194
T body();
195
196
/**
197
* Get the reply address
198
* @return Reply address or null
199
*/
200
String replyAddress();
201
202
/**
203
* Check if this is a send (point-to-point) message
204
* @return true if send message
205
*/
206
boolean isSend();
207
208
/**
209
* Reply to this message
210
* @param message Reply message
211
*/
212
void reply(Object message);
213
214
/**
215
* Reply to this message with delivery options
216
* @param message Reply message
217
* @param options Delivery options for reply
218
*/
219
void reply(Object message, DeliveryOptions options);
220
221
/**
222
* Reply and expect a reply back
223
* @param message Reply message
224
* @return Future that completes with reply to reply
225
*/
226
<R> Future<Message<R>> replyAndRequest(Object message);
227
228
/**
229
* Reply and expect a reply back with options
230
* @param message Reply message
231
* @param options Delivery options
232
* @return Future that completes with reply to reply
233
*/
234
<R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options);
235
236
/**
237
* Fail the message handling
238
* @param failureCode Failure code
239
* @param message Failure message
240
*/
241
void fail(int failureCode, String message);
242
}
243
```
244
245
### Message Consumers
246
247
Consume messages from event bus addresses with lifecycle management.
248
249
```java { .api }
250
/**
251
* Message consumer for receiving messages
252
*/
253
interface MessageConsumer<T> extends ReadStream<Message<T>> {
254
/**
255
* Get the consumer address
256
* @return Consumer address
257
*/
258
String address();
259
260
/**
261
* Set the maximum number of buffered messages
262
* @param maxBufferedMessages Maximum buffered messages
263
* @return this for chaining
264
*/
265
MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages);
266
267
/**
268
* Check if consumer is registered
269
* @return true if registered
270
*/
271
boolean isRegistered();
272
273
/**
274
* Register the consumer
275
* @return Future that completes when registered
276
*/
277
Future<Void> register();
278
279
/**
280
* Unregister the consumer
281
* @return Future that completes when unregistered
282
*/
283
Future<Void> unregister();
284
285
/**
286
* Get completion future (completes when registered)
287
* @return Completion future
288
*/
289
Future<Void> completion();
290
291
/**
292
* Get stream of message bodies only
293
* @return ReadStream of message bodies
294
*/
295
ReadStream<T> bodyStream();
296
}
297
```
298
299
### Message Producers
300
301
Send messages to event bus addresses with producer pattern.
302
303
```java { .api }
304
/**
305
* Message producer for sending messages
306
*/
307
interface MessageProducer<T> extends WriteStream<T> {
308
/**
309
* Get the producer address
310
* @return Producer address
311
*/
312
String address();
313
314
/**
315
* Send a message
316
* @param message Message to send
317
* @return Future that completes when sent
318
*/
319
Future<Void> send(T message);
320
321
/**
322
* Write a message (alias for send)
323
* @param data Message to write
324
* @return Future that completes when written
325
*/
326
Future<Void> write(T data);
327
328
/**
329
* Get delivery options
330
* @return Current delivery options
331
*/
332
DeliveryOptions deliveryOptions();
333
334
/**
335
* Close the producer
336
* @return Future that completes when closed
337
*/
338
Future<Void> close();
339
}
340
```
341
342
### Custom Message Codecs
343
344
Define custom encoding/decoding for message types.
345
346
```java { .api }
347
/**
348
* Message codec for custom types
349
*/
350
interface MessageCodec<S, R> {
351
/**
352
* Encode message to wire format
353
* @param buffer Buffer to write to
354
* @param s Object to encode
355
*/
356
void encodeToWire(Buffer buffer, S s);
357
358
/**
359
* Decode message from wire format
360
* @param pos Position in buffer
361
* @param buffer Buffer to read from
362
* @return Decoded object
363
*/
364
R decodeFromWire(int pos, Buffer buffer);
365
366
/**
367
* Transform message for local delivery
368
* @param s Object to transform
369
* @return Transformed object
370
*/
371
R transform(S s);
372
373
/**
374
* Get codec name
375
* @return Codec name
376
*/
377
String name();
378
379
/**
380
* Get system codec ID
381
* @return System codec ID
382
*/
383
byte systemCodecID();
384
}
385
```
386
387
### Delivery Options
388
389
Configure message delivery behavior and headers.
390
391
```java { .api }
392
/**
393
* Options for message delivery
394
*/
395
class DeliveryOptions {
396
/**
397
* Set send timeout
398
* @param timeout Timeout in milliseconds
399
* @return this for chaining
400
*/
401
DeliveryOptions setSendTimeout(long timeout);
402
403
/**
404
* Add a header
405
* @param key Header key
406
* @param value Header value
407
* @return this for chaining
408
*/
409
DeliveryOptions addHeader(String key, String value);
410
411
/**
412
* Set headers
413
* @param headers Headers multimap
414
* @return this for chaining
415
*/
416
DeliveryOptions setHeaders(MultiMap headers);
417
418
/**
419
* Set local only delivery (no cluster)
420
* @param localOnly Whether to deliver locally only
421
* @return this for chaining
422
*/
423
DeliveryOptions setLocalOnly(boolean localOnly);
424
425
/**
426
* Set codec name for message encoding
427
* @param codecName Codec name
428
* @return this for chaining
429
*/
430
DeliveryOptions setCodecName(String codecName);
431
432
/**
433
* Set tracing policy
434
* @param tracingPolicy Tracing policy
435
* @return this for chaining
436
*/
437
DeliveryOptions setTracingPolicy(TracingPolicy tracingPolicy);
438
}
439
```
440
441
### Event Bus Configuration
442
443
Configure event bus behavior and clustering options.
444
445
```java { .api }
446
/**
447
* Event bus configuration options
448
*/
449
class EventBusOptions {
450
EventBusOptions setSendBufferSize(int sendBufferSize);
451
EventBusOptions setReceiveBufferSize(int receiveBufferSize);
452
EventBusOptions setClusterPublicHost(String clusterPublicHost);
453
EventBusOptions setClusterPublicPort(int clusterPublicPort);
454
EventBusOptions setClusterPingInterval(long clusterPingInterval);
455
EventBusOptions setClusterPingReplyInterval(long clusterPingReplyInterval);
456
EventBusOptions setPort(int port);
457
EventBusOptions setHost(String host);
458
EventBusOptions setAcceptBacklog(int acceptBacklog);
459
EventBusOptions setReconnectAttempts(int attempts);
460
EventBusOptions setReconnectInterval(long interval);
461
EventBusOptions setSsl(boolean ssl);
462
EventBusOptions setKeyCertOptions(KeyCertOptions options);
463
EventBusOptions setTrustOptions(TrustOptions options);
464
EventBusOptions setClientAuth(ClientAuth clientAuth);
465
}
466
```
467
468
### Exception Handling
469
470
Handle event bus failures and reply exceptions.
471
472
```java { .api }
473
/**
474
* Exception thrown when reply fails
475
*/
476
class ReplyException extends VertxException {
477
/**
478
* Get the failure type
479
* @return Failure type
480
*/
481
ReplyFailure failureType();
482
483
/**
484
* Get the failure code
485
* @return Failure code
486
*/
487
int failureCode();
488
489
/**
490
* Get the failure message
491
* @return Failure message
492
*/
493
String getMessage();
494
}
495
496
/**
497
* Types of reply failures
498
*/
499
enum ReplyFailure {
500
TIMEOUT, // Request timed out
501
NO_HANDLERS, // No handlers registered for address
502
RECIPIENT_FAILURE // Handler threw exception or called fail()
503
}
504
```
505
506
## Usage Examples
507
508
**Basic Message Sending:**
509
510
```java
511
import io.vertx.core.eventbus.EventBus;
512
513
EventBus eventBus = vertx.eventBus();
514
515
// Send a message
516
eventBus.send("user.notifications", "Hello User!");
517
518
// Publish to all subscribers
519
eventBus.publish("system.broadcast", "System maintenance in 5 minutes");
520
521
// Send with headers
522
DeliveryOptions options = new DeliveryOptions()
523
.addHeader("userId", "123")
524
.addHeader("priority", "high");
525
526
eventBus.send("user.email", "Important message", options);
527
```
528
529
**Message Consumer:**
530
531
```java
532
import io.vertx.core.eventbus.Message;
533
534
// Simple consumer
535
eventBus.consumer("user.notifications", message -> {
536
String body = message.body();
537
System.out.println("Received notification: " + body);
538
});
539
540
// Consumer with reply
541
eventBus.consumer("user.query", message -> {
542
String query = message.body();
543
System.out.println("Processing query: " + query);
544
545
// Process and reply
546
String result = processQuery(query);
547
message.reply(result);
548
});
549
550
// Consumer with error handling
551
eventBus.consumer("user.process", message -> {
552
try {
553
String data = message.body();
554
String result = processData(data);
555
message.reply(result);
556
} catch (Exception e) {
557
message.fail(500, "Processing failed: " + e.getMessage());
558
}
559
});
560
```
561
562
**Request-Response Pattern:**
563
564
```java
565
// Send request and handle response
566
eventBus.<String>request("user.service", "getUserInfo")
567
.onSuccess(reply -> {
568
String userInfo = reply.body();
569
System.out.println("User info: " + userInfo);
570
})
571
.onFailure(err -> {
572
if (err instanceof ReplyException) {
573
ReplyException replyErr = (ReplyException) err;
574
System.err.println("Request failed: " + replyErr.failureType() +
575
" - " + replyErr.getMessage());
576
}
577
});
578
579
// With timeout
580
DeliveryOptions options = new DeliveryOptions().setSendTimeout(5000);
581
eventBus.<JsonObject>request("slow.service", "process", options)
582
.onSuccess(reply -> {
583
JsonObject result = reply.body();
584
System.out.println("Got result: " + result.encode());
585
})
586
.onFailure(err -> {
587
System.err.println("Request timed out or failed: " + err.getMessage());
588
});
589
```
590
591
**Message Producer Pattern:**
592
593
```java
594
import io.vertx.core.eventbus.MessageProducer;
595
596
// Create a producer for repeated sending
597
MessageProducer<String> producer = eventBus.sender("log.events");
598
599
// Send messages
600
producer.send("Application started");
601
producer.send("User logged in");
602
producer.send("Processing completed");
603
604
// Close when done
605
producer.close();
606
607
// Publisher for broadcast
608
MessageProducer<JsonObject> publisher = eventBus.publisher("system.events");
609
610
JsonObject event = new JsonObject()
611
.put("type", "user_login")
612
.put("userId", "user123")
613
.put("timestamp", System.currentTimeMillis());
614
615
publisher.write(event);
616
```
617
618
**Custom Message Codec:**
619
620
```java
621
import io.vertx.core.buffer.Buffer;
622
import io.vertx.core.eventbus.MessageCodec;
623
624
public class PersonCodec implements MessageCodec<Person, Person> {
625
626
@Override
627
public void encodeToWire(Buffer buffer, Person person) {
628
byte[] nameBytes = person.getName().getBytes();
629
buffer.appendInt(nameBytes.length);
630
buffer.appendBytes(nameBytes);
631
buffer.appendInt(person.getAge());
632
}
633
634
@Override
635
public Person decodeFromWire(int pos, Buffer buffer) {
636
int nameLength = buffer.getInt(pos);
637
pos += 4;
638
String name = buffer.getString(pos, pos + nameLength);
639
pos += nameLength;
640
int age = buffer.getInt(pos);
641
return new Person(name, age);
642
}
643
644
@Override
645
public Person transform(Person person) {
646
// For local delivery, return as-is
647
return person;
648
}
649
650
@Override
651
public String name() {
652
return "personCodec";
653
}
654
655
@Override
656
public byte systemCodecID() {
657
return -1; // User codec
658
}
659
}
660
661
// Register the codec
662
eventBus.registerCodec(new PersonCodec());
663
664
// Use with custom objects
665
Person person = new Person("John", 30);
666
eventBus.send("person.created", person);
667
```
668
669
**Clustered Event Bus:**
670
671
```java
672
// In clustered Vert.x, messages are distributed across nodes
673
VertxOptions options = new VertxOptions()
674
.setHAEnabled(true)
675
.setEventBusOptions(new EventBusOptions()
676
.setClusterPublicHost("192.168.1.100")
677
.setClusterPublicPort(15701));
678
679
Vertx.clusteredVertx(options).onSuccess(vertx -> {
680
EventBus eventBus = vertx.eventBus();
681
682
// This consumer will receive messages from any node
683
eventBus.consumer("cluster.broadcast", message -> {
684
System.out.println("Received from cluster: " + message.body());
685
});
686
687
// Send message that can be received by any node
688
eventBus.publish("cluster.broadcast", "Hello from node " +
689
System.getProperty("node.id"));
690
});
691
```