Modern, JVM-based framework for building modular, easily testable microservice and serverless applications with compile-time DI and fast startup.
—
Micronaut provides comprehensive support for building message-driven applications with various messaging systems including RabbitMQ, Kafka, and in-memory messaging.
Create message listeners to handle incoming messages from various messaging systems.
/**
* Message listener for handling messages
*/
@MessageListener
public class UserEventHandler {
@MessageMapping("user.created")
void handleUserCreated(@MessageBody User user, @MessageHeader String correlationId) {
// Handle user creation event
System.out.println("User created: " + user.getName() + " (ID: " + correlationId + ")");
}
@MessageMapping("user.updated")
void handleUserUpdated(@MessageBody User user, @MessageHeaders MessageHeaders headers) {
// Handle user update event with all headers
String timestamp = headers.get("timestamp", String.class).orElse("unknown");
System.out.println("User updated at: " + timestamp);
}
@MessageMapping("user.deleted")
Single<String> handleUserDeleted(@MessageBody DeleteEvent event) {
// Reactive message handling with return value
return userService.cleanup(event.getUserId())
.map(result -> "Cleanup completed for user: " + event.getUserId());
}
}Create message producers to send messages to messaging systems.
/**
* Message producer interface
*/
@MessageProducer
public interface UserEventProducer {
@MessageMapping("user.created")
void sendUserCreated(@MessageBody User user);
@MessageMapping("user.updated")
@SendTo("user.events")
void sendUserUpdated(@MessageBody User user, @MessageHeader("correlationId") String id);
@MessageMapping("user.deleted")
Publisher<String> sendUserDeleted(@MessageBody DeleteEvent event);
}Configure messaging systems and connection properties.
/**
* RabbitMQ configuration
*/
@ConfigurationProperties("rabbitmq")
public class RabbitConfiguration {
private String uri = "amqp://localhost:5672";
private String username = "guest";
private String password = "guest";
// getters and setters
}
/**
* Kafka configuration
*/
@ConfigurationProperties("kafka")
public class KafkaConfiguration {
private String bootstrapServers = "localhost:9092";
private String groupId = "micronaut-app";
// getters and setters
}Handle message processing errors with retry and dead letter queues.
/**
* Message error handling
*/
@MessageListener
public class ErrorHandlingListener {
@MessageMapping("orders.process")
@MessageErrorHandler(ErrorHandler.RETRY)
@Retryable(attempts = "3", delay = "1s")
void processOrder(@MessageBody Order order) {
// Process order with retry on failure
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("Invalid order amount");
}
orderService.process(order);
}
@MessageMapping("orders.failed")
void handleFailedOrder(@MessageBody Order order, @MessageHeader String errorMessage) {
// Handle messages that failed processing
alertService.notifyFailedOrder(order, errorMessage);
}
}Use reactive types for asynchronous message processing.
/**
* Reactive message processing
*/
@MessageListener
public class ReactiveMessageHandler {
@MessageMapping("data.stream")
Flowable<ProcessedData> handleDataStream(@MessageBody Flowable<RawData> dataStream) {
return dataStream
.buffer(100)
.map(this::processBatch)
.flatMapIterable(list -> list);
}
@MessageMapping("notification.send")
Single<String> sendNotification(@MessageBody NotificationRequest request) {
return notificationService.sendAsync(request)
.map(result -> "Notification sent: " + result.getId());
}
}Configure serialization for message bodies with various formats.
/**
* Custom message serialization
*/
@Singleton
public class MessageSerializationConfiguration {
@Bean
@Primary
MessageBodySerializer<Object> jsonSerializer() {
return new JsonMessageBodySerializer();
}
@Bean
@Named("avro")
MessageBodySerializer<Object> avroSerializer() {
return new AvroMessageBodySerializer();
}
}// Messaging annotations
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageListener {
}
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageProducer {
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageMapping {
String value();
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SendTo {
String value();
}
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageBody {
}
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageHeader {
String value() default "";
}
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageHeaders {
}
// Core messaging interfaces
public interface Message<T> {
MessageHeaders getHeaders();
T getBody();
}
public interface MessageHeaders {
<T> Optional<T> get(String name, Class<T> type);
Set<String> names();
Map<String, Object> asMap();
}
public interface MessageBodySerializer<T> {
T serialize(Object object, MessageHeaders headers);
Object deserialize(T message, Class<?> type);
}
// Application lifecycle for messaging
public class MessagingApplication {
public static ApplicationContext run(String... args);
public static ApplicationContext run(Class<?> mainClass, String... args);
}Install with Tessl CLI
npx tessl i tessl/maven-micronaut