Vert.x Core is a high-performance, reactive application toolkit for the JVM providing fundamental building blocks for asynchronous I/O operations.
—
Distributed messaging system for inter-verticle and inter-node communication with publish/subscribe and request/response patterns, custom codecs, and delivery options.
Get the event bus instance for messaging operations.
/**
* Get the event bus instance
* @return EventBus instance
*/
EventBus eventBus();
/**
* Event Bus interface for distributed messaging
*/
interface EventBus extends Measured {
/**
* Send a message to an address
* @param address Target address
* @param message Message to send
* @return this for chaining
*/
EventBus send(String address, Object message);
/**
* Send a message with delivery options
* @param address Target address
* @param message Message to send
* @param options Delivery options
* @return this for chaining
*/
EventBus send(String address, Object message, DeliveryOptions options);
/**
* Publish a message to all subscribers
* @param address Target address
* @param message Message to publish
* @return this for chaining
*/
EventBus publish(String address, Object message);
/**
* Publish a message with delivery options
* @param address Target address
* @param message Message to publish
* @param options Delivery options
* @return this for chaining
*/
EventBus publish(String address, Object message, DeliveryOptions options);
/**
* Send a message and expect a reply
* @param address Target address
* @param message Message to send
* @return Future that completes with reply message
*/
<T> Future<Message<T>> request(String address, Object message);
/**
* Send a message and expect a reply with options
* @param address Target address
* @param message Message to send
* @param options Delivery options
* @return Future that completes with reply message
*/
<T> Future<Message<T>> request(String address, Object message, DeliveryOptions options);
/**
* Create a message consumer for an address
* @param address Address to consume from
* @return MessageConsumer instance
*/
<T> MessageConsumer<T> consumer(String address);
/**
* Create a message consumer with handler
* @param address Address to consume from
* @param handler Message handler
* @return MessageConsumer instance
*/
<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);
/**
* Create a local message consumer (cluster-only)
* @param address Address to consume from
* @return MessageConsumer instance
*/
<T> MessageConsumer<T> localConsumer(String address);
/**
* Create a local message consumer with handler
* @param address Address to consume from
* @param handler Message handler
* @return MessageConsumer instance
*/
<T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);
/**
* Create a message sender for an address
* @param address Target address
* @return MessageProducer instance
*/
<T> MessageProducer<T> sender(String address);
/**
* Create a message sender with delivery options
* @param address Target address
* @param options Default delivery options
* @return MessageProducer instance
*/
<T> MessageProducer<T> sender(String address, DeliveryOptions options);
/**
* Create a message publisher for an address
* @param address Target address
* @return MessageProducer instance
*/
<T> MessageProducer<T> publisher(String address);
/**
* Create a message publisher with delivery options
* @param address Target address
* @param options Default delivery options
* @return MessageProducer instance
*/
<T> MessageProducer<T> publisher(String address, DeliveryOptions options);
/**
* Register a custom message codec
* @param codec Message codec to register
* @return this for chaining
*/
EventBus registerCodec(MessageCodec codec);
/**
* Unregister a message codec
* @param name Codec name to unregister
* @return this for chaining
*/
EventBus unregisterCodec(String name);
/**
* Register a default codec for a type
* @param clazz Class type
* @param codec Message codec for the type
* @return this for chaining
*/
<T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec);
/**
* Unregister a default codec
* @param clazz Class type
* @return this for chaining
*/
EventBus unregisterDefaultCodec(Class clazz);
/**
* Close the event bus
* @return Future that completes when closed
*/
Future<Void> close();
}Handle incoming messages with access to body, headers, and reply functionality.
/**
* Event bus message interface
*/
interface Message<T> {
/**
* Get the message address
* @return Message address
*/
String address();
/**
* Get message headers
* @return Headers MultiMap
*/
MultiMap headers();
/**
* Get the message body
* @return Message body
*/
T body();
/**
* Get the reply address
* @return Reply address or null
*/
String replyAddress();
/**
* Check if this is a send (point-to-point) message
* @return true if send message
*/
boolean isSend();
/**
* Reply to this message
* @param message Reply message
*/
void reply(Object message);
/**
* Reply to this message with delivery options
* @param message Reply message
* @param options Delivery options for reply
*/
void reply(Object message, DeliveryOptions options);
/**
* Reply and expect a reply back
* @param message Reply message
* @return Future that completes with reply to reply
*/
<R> Future<Message<R>> replyAndRequest(Object message);
/**
* Reply and expect a reply back with options
* @param message Reply message
* @param options Delivery options
* @return Future that completes with reply to reply
*/
<R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options);
/**
* Fail the message handling
* @param failureCode Failure code
* @param message Failure message
*/
void fail(int failureCode, String message);
}Consume messages from event bus addresses with lifecycle management.
/**
* Message consumer for receiving messages
*/
interface MessageConsumer<T> extends ReadStream<Message<T>> {
/**
* Get the consumer address
* @return Consumer address
*/
String address();
/**
* Set the maximum number of buffered messages
* @param maxBufferedMessages Maximum buffered messages
* @return this for chaining
*/
MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages);
/**
* Check if consumer is registered
* @return true if registered
*/
boolean isRegistered();
/**
* Register the consumer
* @return Future that completes when registered
*/
Future<Void> register();
/**
* Unregister the consumer
* @return Future that completes when unregistered
*/
Future<Void> unregister();
/**
* Get completion future (completes when registered)
* @return Completion future
*/
Future<Void> completion();
/**
* Get stream of message bodies only
* @return ReadStream of message bodies
*/
ReadStream<T> bodyStream();
}Send messages to event bus addresses with producer pattern.
/**
* Message producer for sending messages
*/
interface MessageProducer<T> extends WriteStream<T> {
/**
* Get the producer address
* @return Producer address
*/
String address();
/**
* Send a message
* @param message Message to send
* @return Future that completes when sent
*/
Future<Void> send(T message);
/**
* Write a message (alias for send)
* @param data Message to write
* @return Future that completes when written
*/
Future<Void> write(T data);
/**
* Get delivery options
* @return Current delivery options
*/
DeliveryOptions deliveryOptions();
/**
* Close the producer
* @return Future that completes when closed
*/
Future<Void> close();
}Define custom encoding/decoding for message types.
/**
* Message codec for custom types
*/
interface MessageCodec<S, R> {
/**
* Encode message to wire format
* @param buffer Buffer to write to
* @param s Object to encode
*/
void encodeToWire(Buffer buffer, S s);
/**
* Decode message from wire format
* @param pos Position in buffer
* @param buffer Buffer to read from
* @return Decoded object
*/
R decodeFromWire(int pos, Buffer buffer);
/**
* Transform message for local delivery
* @param s Object to transform
* @return Transformed object
*/
R transform(S s);
/**
* Get codec name
* @return Codec name
*/
String name();
/**
* Get system codec ID
* @return System codec ID
*/
byte systemCodecID();
}Configure message delivery behavior and headers.
/**
* Options for message delivery
*/
class DeliveryOptions {
/**
* Set send timeout
* @param timeout Timeout in milliseconds
* @return this for chaining
*/
DeliveryOptions setSendTimeout(long timeout);
/**
* Add a header
* @param key Header key
* @param value Header value
* @return this for chaining
*/
DeliveryOptions addHeader(String key, String value);
/**
* Set headers
* @param headers Headers multimap
* @return this for chaining
*/
DeliveryOptions setHeaders(MultiMap headers);
/**
* Set local only delivery (no cluster)
* @param localOnly Whether to deliver locally only
* @return this for chaining
*/
DeliveryOptions setLocalOnly(boolean localOnly);
/**
* Set codec name for message encoding
* @param codecName Codec name
* @return this for chaining
*/
DeliveryOptions setCodecName(String codecName);
/**
* Set tracing policy
* @param tracingPolicy Tracing policy
* @return this for chaining
*/
DeliveryOptions setTracingPolicy(TracingPolicy tracingPolicy);
}Configure event bus behavior and clustering options.
/**
* Event bus configuration options
*/
class EventBusOptions {
EventBusOptions setSendBufferSize(int sendBufferSize);
EventBusOptions setReceiveBufferSize(int receiveBufferSize);
EventBusOptions setClusterPublicHost(String clusterPublicHost);
EventBusOptions setClusterPublicPort(int clusterPublicPort);
EventBusOptions setClusterPingInterval(long clusterPingInterval);
EventBusOptions setClusterPingReplyInterval(long clusterPingReplyInterval);
EventBusOptions setPort(int port);
EventBusOptions setHost(String host);
EventBusOptions setAcceptBacklog(int acceptBacklog);
EventBusOptions setReconnectAttempts(int attempts);
EventBusOptions setReconnectInterval(long interval);
EventBusOptions setSsl(boolean ssl);
EventBusOptions setKeyCertOptions(KeyCertOptions options);
EventBusOptions setTrustOptions(TrustOptions options);
EventBusOptions setClientAuth(ClientAuth clientAuth);
}Handle event bus failures and reply exceptions.
/**
* Exception thrown when reply fails
*/
class ReplyException extends VertxException {
/**
* Get the failure type
* @return Failure type
*/
ReplyFailure failureType();
/**
* Get the failure code
* @return Failure code
*/
int failureCode();
/**
* Get the failure message
* @return Failure message
*/
String getMessage();
}
/**
* Types of reply failures
*/
enum ReplyFailure {
TIMEOUT, // Request timed out
NO_HANDLERS, // No handlers registered for address
RECIPIENT_FAILURE // Handler threw exception or called fail()
}Basic Message Sending:
import io.vertx.core.eventbus.EventBus;
EventBus eventBus = vertx.eventBus();
// Send a message
eventBus.send("user.notifications", "Hello User!");
// Publish to all subscribers
eventBus.publish("system.broadcast", "System maintenance in 5 minutes");
// Send with headers
DeliveryOptions options = new DeliveryOptions()
.addHeader("userId", "123")
.addHeader("priority", "high");
eventBus.send("user.email", "Important message", options);Message Consumer:
import io.vertx.core.eventbus.Message;
// Simple consumer
eventBus.consumer("user.notifications", message -> {
String body = message.body();
System.out.println("Received notification: " + body);
});
// Consumer with reply
eventBus.consumer("user.query", message -> {
String query = message.body();
System.out.println("Processing query: " + query);
// Process and reply
String result = processQuery(query);
message.reply(result);
});
// Consumer with error handling
eventBus.consumer("user.process", message -> {
try {
String data = message.body();
String result = processData(data);
message.reply(result);
} catch (Exception e) {
message.fail(500, "Processing failed: " + e.getMessage());
}
});Request-Response Pattern:
// Send request and handle response
eventBus.<String>request("user.service", "getUserInfo")
.onSuccess(reply -> {
String userInfo = reply.body();
System.out.println("User info: " + userInfo);
})
.onFailure(err -> {
if (err instanceof ReplyException) {
ReplyException replyErr = (ReplyException) err;
System.err.println("Request failed: " + replyErr.failureType() +
" - " + replyErr.getMessage());
}
});
// With timeout
DeliveryOptions options = new DeliveryOptions().setSendTimeout(5000);
eventBus.<JsonObject>request("slow.service", "process", options)
.onSuccess(reply -> {
JsonObject result = reply.body();
System.out.println("Got result: " + result.encode());
})
.onFailure(err -> {
System.err.println("Request timed out or failed: " + err.getMessage());
});Message Producer Pattern:
import io.vertx.core.eventbus.MessageProducer;
// Create a producer for repeated sending
MessageProducer<String> producer = eventBus.sender("log.events");
// Send messages
producer.send("Application started");
producer.send("User logged in");
producer.send("Processing completed");
// Close when done
producer.close();
// Publisher for broadcast
MessageProducer<JsonObject> publisher = eventBus.publisher("system.events");
JsonObject event = new JsonObject()
.put("type", "user_login")
.put("userId", "user123")
.put("timestamp", System.currentTimeMillis());
publisher.write(event);Custom Message Codec:
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
public class PersonCodec implements MessageCodec<Person, Person> {
@Override
public void encodeToWire(Buffer buffer, Person person) {
byte[] nameBytes = person.getName().getBytes();
buffer.appendInt(nameBytes.length);
buffer.appendBytes(nameBytes);
buffer.appendInt(person.getAge());
}
@Override
public Person decodeFromWire(int pos, Buffer buffer) {
int nameLength = buffer.getInt(pos);
pos += 4;
String name = buffer.getString(pos, pos + nameLength);
pos += nameLength;
int age = buffer.getInt(pos);
return new Person(name, age);
}
@Override
public Person transform(Person person) {
// For local delivery, return as-is
return person;
}
@Override
public String name() {
return "personCodec";
}
@Override
public byte systemCodecID() {
return -1; // User codec
}
}
// Register the codec
eventBus.registerCodec(new PersonCodec());
// Use with custom objects
Person person = new Person("John", 30);
eventBus.send("person.created", person);Clustered Event Bus:
// In clustered Vert.x, messages are distributed across nodes
VertxOptions options = new VertxOptions()
.setHAEnabled(true)
.setEventBusOptions(new EventBusOptions()
.setClusterPublicHost("192.168.1.100")
.setClusterPublicPort(15701));
Vertx.clusteredVertx(options).onSuccess(vertx -> {
EventBus eventBus = vertx.eventBus();
// This consumer will receive messages from any node
eventBus.consumer("cluster.broadcast", message -> {
System.out.println("Received from cluster: " + message.body());
});
// Send message that can be received by any node
eventBus.publish("cluster.broadcast", "Hello from node " +
System.getProperty("node.id"));
});Install with Tessl CLI
npx tessl i tessl/maven-io-vertx--vertx-core