CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-vertx--vertx-core

Vert.x Core is a high-performance, reactive application toolkit for the JVM providing fundamental building blocks for asynchronous I/O operations.

Pending
Overview
Eval results
Files

event-bus.mddocs/

Event Bus

Distributed messaging system for inter-verticle and inter-node communication with publish/subscribe and request/response patterns, custom codecs, and delivery options.

Capabilities

Event Bus Access

Get the event bus instance for messaging operations.

/**
 * Get the event bus instance
 * @return EventBus instance
 */
EventBus eventBus();

/**
 * Event Bus interface for distributed messaging
 */
interface EventBus extends Measured {
  /**
   * Send a message to an address
   * @param address Target address
   * @param message Message to send
   * @return this for chaining
   */
  EventBus send(String address, Object message);

  /**
   * Send a message with delivery options
   * @param address Target address
   * @param message Message to send
   * @param options Delivery options
   * @return this for chaining
   */
  EventBus send(String address, Object message, DeliveryOptions options);

  /**
   * Publish a message to all subscribers
   * @param address Target address
   * @param message Message to publish
   * @return this for chaining
   */
  EventBus publish(String address, Object message);

  /**
   * Publish a message with delivery options
   * @param address Target address
   * @param message Message to publish
   * @param options Delivery options
   * @return this for chaining
   */
  EventBus publish(String address, Object message, DeliveryOptions options);

  /**
   * Send a message and expect a reply
   * @param address Target address
   * @param message Message to send
   * @return Future that completes with reply message
   */
  <T> Future<Message<T>> request(String address, Object message);

  /**
   * Send a message and expect a reply with options
   * @param address Target address
   * @param message Message to send
   * @param options Delivery options
   * @return Future that completes with reply message
   */
  <T> Future<Message<T>> request(String address, Object message, DeliveryOptions options);

  /**
   * Create a message consumer for an address
   * @param address Address to consume from
   * @return MessageConsumer instance
   */
  <T> MessageConsumer<T> consumer(String address);

  /**
   * Create a message consumer with handler
   * @param address Address to consume from
   * @param handler Message handler
   * @return MessageConsumer instance
   */
  <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);

  /**
   * Create a local message consumer (cluster-only)
   * @param address Address to consume from
   * @return MessageConsumer instance
   */
  <T> MessageConsumer<T> localConsumer(String address);

  /**
   * Create a local message consumer with handler
   * @param address Address to consume from
   * @param handler Message handler
   * @return MessageConsumer instance
   */
  <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler);

  /**
   * Create a message sender for an address
   * @param address Target address
   * @return MessageProducer instance
   */
  <T> MessageProducer<T> sender(String address);

  /**
   * Create a message sender with delivery options
   * @param address Target address
   * @param options Default delivery options
   * @return MessageProducer instance
   */
  <T> MessageProducer<T> sender(String address, DeliveryOptions options);

  /**
   * Create a message publisher for an address
   * @param address Target address
   * @return MessageProducer instance
   */
  <T> MessageProducer<T> publisher(String address);

  /**
   * Create a message publisher with delivery options
   * @param address Target address
   * @param options Default delivery options
   * @return MessageProducer instance
   */
  <T> MessageProducer<T> publisher(String address, DeliveryOptions options);

  /**
   * Register a custom message codec
   * @param codec Message codec to register
   * @return this for chaining
   */
  EventBus registerCodec(MessageCodec codec);

  /**
   * Unregister a message codec
   * @param name Codec name to unregister
   * @return this for chaining
   */
  EventBus unregisterCodec(String name);

  /**
   * Register a default codec for a type
   * @param clazz Class type
   * @param codec Message codec for the type
   * @return this for chaining
   */
  <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec);

  /**
   * Unregister a default codec
   * @param clazz Class type
   * @return this for chaining
   */
  EventBus unregisterDefaultCodec(Class clazz);

  /**
   * Close the event bus
   * @return Future that completes when closed
   */
  Future<Void> close();
}

Message Handling

Handle incoming messages with access to body, headers, and reply functionality.

/**
 * Event bus message interface
 */
interface Message<T> {
  /**
   * Get the message address
   * @return Message address
   */
  String address();

  /**
   * Get message headers
   * @return Headers MultiMap
   */
  MultiMap headers();

  /**
   * Get the message body
   * @return Message body
   */
  T body();

  /**
   * Get the reply address
   * @return Reply address or null
   */
  String replyAddress();

  /**
   * Check if this is a send (point-to-point) message
   * @return true if send message
   */
  boolean isSend();

  /**
   * Reply to this message
   * @param message Reply message
   */
  void reply(Object message);

  /**
   * Reply to this message with delivery options
   * @param message Reply message
   * @param options Delivery options for reply
   */
  void reply(Object message, DeliveryOptions options);

  /**
   * Reply and expect a reply back
   * @param message Reply message
   * @return Future that completes with reply to reply
   */
  <R> Future<Message<R>> replyAndRequest(Object message);

  /**
   * Reply and expect a reply back with options
   * @param message Reply message
   * @param options Delivery options
   * @return Future that completes with reply to reply
   */
  <R> Future<Message<R>> replyAndRequest(Object message, DeliveryOptions options);

  /**
   * Fail the message handling
   * @param failureCode Failure code
   * @param message Failure message
   */
  void fail(int failureCode, String message);
}

Message Consumers

Consume messages from event bus addresses with lifecycle management.

/**
 * Message consumer for receiving messages
 */
interface MessageConsumer<T> extends ReadStream<Message<T>> {
  /**
   * Get the consumer address
   * @return Consumer address
   */
  String address();

  /**
   * Set the maximum number of buffered messages
   * @param maxBufferedMessages Maximum buffered messages
   * @return this for chaining
   */
  MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages);

  /**
   * Check if consumer is registered
   * @return true if registered
   */
  boolean isRegistered();

  /**
   * Register the consumer
   * @return Future that completes when registered
   */
  Future<Void> register();

  /**
   * Unregister the consumer
   * @return Future that completes when unregistered
   */
  Future<Void> unregister();

  /**
   * Get completion future (completes when registered)
   * @return Completion future
   */
  Future<Void> completion();

  /**
   * Get stream of message bodies only
   * @return ReadStream of message bodies
   */
  ReadStream<T> bodyStream();
}

Message Producers

Send messages to event bus addresses with producer pattern.

/**
 * Message producer for sending messages
 */
interface MessageProducer<T> extends WriteStream<T> {
  /**
   * Get the producer address
   * @return Producer address
   */
  String address();

  /**
   * Send a message
   * @param message Message to send
   * @return Future that completes when sent
   */
  Future<Void> send(T message);

  /**
   * Write a message (alias for send)
   * @param data Message to write
   * @return Future that completes when written
   */
  Future<Void> write(T data);

  /**
   * Get delivery options
   * @return Current delivery options
   */
  DeliveryOptions deliveryOptions();

  /**
   * Close the producer
   * @return Future that completes when closed
   */
  Future<Void> close();
}

Custom Message Codecs

Define custom encoding/decoding for message types.

/**
 * Message codec for custom types
 */
interface MessageCodec<S, R> {
  /**
   * Encode message to wire format
   * @param buffer Buffer to write to
   * @param s Object to encode
   */
  void encodeToWire(Buffer buffer, S s);

  /**
   * Decode message from wire format
   * @param pos Position in buffer
   * @param buffer Buffer to read from
   * @return Decoded object
   */
  R decodeFromWire(int pos, Buffer buffer);

  /**
   * Transform message for local delivery
   * @param s Object to transform
   * @return Transformed object
   */
  R transform(S s);

  /**
   * Get codec name
   * @return Codec name
   */
  String name();

  /**
   * Get system codec ID
   * @return System codec ID
   */
  byte systemCodecID();
}

Delivery Options

Configure message delivery behavior and headers.

/**
 * Options for message delivery
 */
class DeliveryOptions {
  /**
   * Set send timeout
   * @param timeout Timeout in milliseconds
   * @return this for chaining
   */
  DeliveryOptions setSendTimeout(long timeout);

  /**
   * Add a header
   * @param key Header key
   * @param value Header value
   * @return this for chaining
   */
  DeliveryOptions addHeader(String key, String value);

  /**
   * Set headers
   * @param headers Headers multimap
   * @return this for chaining
   */
  DeliveryOptions setHeaders(MultiMap headers);

  /**
   * Set local only delivery (no cluster)
   * @param localOnly Whether to deliver locally only
   * @return this for chaining
   */
  DeliveryOptions setLocalOnly(boolean localOnly);

  /**
   * Set codec name for message encoding
   * @param codecName Codec name
   * @return this for chaining
   */
  DeliveryOptions setCodecName(String codecName);

  /**
   * Set tracing policy
   * @param tracingPolicy Tracing policy
   * @return this for chaining
   */
  DeliveryOptions setTracingPolicy(TracingPolicy tracingPolicy);
}

Event Bus Configuration

Configure event bus behavior and clustering options.

/**
 * Event bus configuration options
 */
class EventBusOptions {
  EventBusOptions setSendBufferSize(int sendBufferSize);
  EventBusOptions setReceiveBufferSize(int receiveBufferSize);
  EventBusOptions setClusterPublicHost(String clusterPublicHost);
  EventBusOptions setClusterPublicPort(int clusterPublicPort);
  EventBusOptions setClusterPingInterval(long clusterPingInterval);
  EventBusOptions setClusterPingReplyInterval(long clusterPingReplyInterval);
  EventBusOptions setPort(int port);
  EventBusOptions setHost(String host);
  EventBusOptions setAcceptBacklog(int acceptBacklog);
  EventBusOptions setReconnectAttempts(int attempts);
  EventBusOptions setReconnectInterval(long interval);
  EventBusOptions setSsl(boolean ssl);
  EventBusOptions setKeyCertOptions(KeyCertOptions options);
  EventBusOptions setTrustOptions(TrustOptions options);
  EventBusOptions setClientAuth(ClientAuth clientAuth);
}

Exception Handling

Handle event bus failures and reply exceptions.

/**
 * Exception thrown when reply fails
 */
class ReplyException extends VertxException {
  /**
   * Get the failure type
   * @return Failure type
   */
  ReplyFailure failureType();

  /**
   * Get the failure code
   * @return Failure code
   */
  int failureCode();

  /**
   * Get the failure message
   * @return Failure message
   */
  String getMessage();
}

/**
 * Types of reply failures
 */
enum ReplyFailure {
  TIMEOUT,           // Request timed out
  NO_HANDLERS,       // No handlers registered for address
  RECIPIENT_FAILURE  // Handler threw exception or called fail()
}

Usage Examples

Basic Message Sending:

import io.vertx.core.eventbus.EventBus;

EventBus eventBus = vertx.eventBus();

// Send a message
eventBus.send("user.notifications", "Hello User!");

// Publish to all subscribers
eventBus.publish("system.broadcast", "System maintenance in 5 minutes");

// Send with headers
DeliveryOptions options = new DeliveryOptions()
  .addHeader("userId", "123")
  .addHeader("priority", "high");

eventBus.send("user.email", "Important message", options);

Message Consumer:

import io.vertx.core.eventbus.Message;

// Simple consumer
eventBus.consumer("user.notifications", message -> {
  String body = message.body();
  System.out.println("Received notification: " + body);
});

// Consumer with reply
eventBus.consumer("user.query", message -> {
  String query = message.body();
  System.out.println("Processing query: " + query);
  
  // Process and reply
  String result = processQuery(query);
  message.reply(result);
});

// Consumer with error handling
eventBus.consumer("user.process", message -> {
  try {
    String data = message.body();
    String result = processData(data);
    message.reply(result);
  } catch (Exception e) {
    message.fail(500, "Processing failed: " + e.getMessage());
  }
});

Request-Response Pattern:

// Send request and handle response
eventBus.<String>request("user.service", "getUserInfo")
  .onSuccess(reply -> {
    String userInfo = reply.body();
    System.out.println("User info: " + userInfo);
  })
  .onFailure(err -> {
    if (err instanceof ReplyException) {
      ReplyException replyErr = (ReplyException) err;
      System.err.println("Request failed: " + replyErr.failureType() + 
                        " - " + replyErr.getMessage());
    }
  });

// With timeout
DeliveryOptions options = new DeliveryOptions().setSendTimeout(5000);
eventBus.<JsonObject>request("slow.service", "process", options)
  .onSuccess(reply -> {
    JsonObject result = reply.body();
    System.out.println("Got result: " + result.encode());
  })
  .onFailure(err -> {
    System.err.println("Request timed out or failed: " + err.getMessage());
  });

Message Producer Pattern:

import io.vertx.core.eventbus.MessageProducer;

// Create a producer for repeated sending
MessageProducer<String> producer = eventBus.sender("log.events");

// Send messages
producer.send("Application started");
producer.send("User logged in");
producer.send("Processing completed");

// Close when done
producer.close();

// Publisher for broadcast
MessageProducer<JsonObject> publisher = eventBus.publisher("system.events");

JsonObject event = new JsonObject()
  .put("type", "user_login")
  .put("userId", "user123")
  .put("timestamp", System.currentTimeMillis());

publisher.write(event);

Custom Message Codec:

import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;

public class PersonCodec implements MessageCodec<Person, Person> {
  
  @Override
  public void encodeToWire(Buffer buffer, Person person) {
    byte[] nameBytes = person.getName().getBytes();
    buffer.appendInt(nameBytes.length);
    buffer.appendBytes(nameBytes);
    buffer.appendInt(person.getAge());
  }
  
  @Override
  public Person decodeFromWire(int pos, Buffer buffer) {
    int nameLength = buffer.getInt(pos);
    pos += 4;
    String name = buffer.getString(pos, pos + nameLength);
    pos += nameLength;
    int age = buffer.getInt(pos);
    return new Person(name, age);
  }
  
  @Override
  public Person transform(Person person) {
    // For local delivery, return as-is
    return person;
  }
  
  @Override
  public String name() {
    return "personCodec";
  }
  
  @Override
  public byte systemCodecID() {
    return -1; // User codec
  }
}

// Register the codec
eventBus.registerCodec(new PersonCodec());

// Use with custom objects
Person person = new Person("John", 30);
eventBus.send("person.created", person);

Clustered Event Bus:

// In clustered Vert.x, messages are distributed across nodes
VertxOptions options = new VertxOptions()
  .setHAEnabled(true)
  .setEventBusOptions(new EventBusOptions()
    .setClusterPublicHost("192.168.1.100")
    .setClusterPublicPort(15701));

Vertx.clusteredVertx(options).onSuccess(vertx -> {
  EventBus eventBus = vertx.eventBus();
  
  // This consumer will receive messages from any node
  eventBus.consumer("cluster.broadcast", message -> {
    System.out.println("Received from cluster: " + message.body());
  });
  
  // Send message that can be received by any node
  eventBus.publish("cluster.broadcast", "Hello from node " + 
                   System.getProperty("node.id"));
});

Install with Tessl CLI

npx tessl i tessl/maven-io-vertx--vertx-core

docs

core-api.md

event-bus.md

file-system.md

http.md

index.md

networking.md

utilities.md

tile.json