CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-micronaut

Modern, JVM-based framework for building modular, easily testable microservice and serverless applications with compile-time DI and fast startup.

Pending
Overview
Eval results
Files

messaging.mddocs/

Messaging

Micronaut provides comprehensive support for building message-driven applications with various messaging systems including RabbitMQ, Kafka, and in-memory messaging.

Capabilities

Message Listeners

Create message listeners to handle incoming messages from various messaging systems.

/**
 * Message listener for handling messages
 */
@MessageListener
public class UserEventHandler {
    
    @MessageMapping("user.created")
    void handleUserCreated(@MessageBody User user, @MessageHeader String correlationId) {
        // Handle user creation event
        System.out.println("User created: " + user.getName() + " (ID: " + correlationId + ")");
    }
    
    @MessageMapping("user.updated")
    void handleUserUpdated(@MessageBody User user, @MessageHeaders MessageHeaders headers) {
        // Handle user update event with all headers
        String timestamp = headers.get("timestamp", String.class).orElse("unknown");
        System.out.println("User updated at: " + timestamp);
    }
    
    @MessageMapping("user.deleted")
    Single<String> handleUserDeleted(@MessageBody DeleteEvent event) {
        // Reactive message handling with return value
        return userService.cleanup(event.getUserId())
            .map(result -> "Cleanup completed for user: " + event.getUserId());
    }
}

Message Producers

Create message producers to send messages to messaging systems.

/**
 * Message producer interface
 */
@MessageProducer
public interface UserEventProducer {
    
    @MessageMapping("user.created")
    void sendUserCreated(@MessageBody User user);
    
    @MessageMapping("user.updated")
    @SendTo("user.events")
    void sendUserUpdated(@MessageBody User user, @MessageHeader("correlationId") String id);
    
    @MessageMapping("user.deleted")
    Publisher<String> sendUserDeleted(@MessageBody DeleteEvent event);
}

Message Configuration

Configure messaging systems and connection properties.

/**
 * RabbitMQ configuration
 */
@ConfigurationProperties("rabbitmq")
public class RabbitConfiguration {
    private String uri = "amqp://localhost:5672";
    private String username = "guest";
    private String password = "guest";
    
    // getters and setters
}

/**
 * Kafka configuration  
 */
@ConfigurationProperties("kafka")
public class KafkaConfiguration {
    private String bootstrapServers = "localhost:9092";
    private String groupId = "micronaut-app";
    
    // getters and setters
}

Error Handling

Handle message processing errors with retry and dead letter queues.

/**
 * Message error handling
 */
@MessageListener
public class ErrorHandlingListener {
    
    @MessageMapping("orders.process")
    @MessageErrorHandler(ErrorHandler.RETRY)
    @Retryable(attempts = "3", delay = "1s")
    void processOrder(@MessageBody Order order) {
        // Process order with retry on failure
        if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("Invalid order amount");
        }
        orderService.process(order);
    }
    
    @MessageMapping("orders.failed")
    void handleFailedOrder(@MessageBody Order order, @MessageHeader String errorMessage) {
        // Handle messages that failed processing
        alertService.notifyFailedOrder(order, errorMessage);
    }
}

Reactive Messaging

Use reactive types for asynchronous message processing.

/**
 * Reactive message processing
 */
@MessageListener
public class ReactiveMessageHandler {
    
    @MessageMapping("data.stream")
    Flowable<ProcessedData> handleDataStream(@MessageBody Flowable<RawData> dataStream) {
        return dataStream
            .buffer(100)
            .map(this::processBatch)
            .flatMapIterable(list -> list);
    }
    
    @MessageMapping("notification.send")
    Single<String> sendNotification(@MessageBody NotificationRequest request) {
        return notificationService.sendAsync(request)
            .map(result -> "Notification sent: " + result.getId());
    }
}

Message Serialization

Configure serialization for message bodies with various formats.

/**
 * Custom message serialization
 */
@Singleton
public class MessageSerializationConfiguration {
    
    @Bean
    @Primary
    MessageBodySerializer<Object> jsonSerializer() {
        return new JsonMessageBodySerializer();
    }
    
    @Bean
    @Named("avro")
    MessageBodySerializer<Object> avroSerializer() {
        return new AvroMessageBodySerializer();
    }
}

Types

// Messaging annotations
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageListener {
}

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME) 
public @interface MessageProducer {
}

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageMapping {
    String value();
}

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SendTo {
    String value();
}

@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageBody {
}

@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageHeader {
    String value() default "";
}

@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface MessageHeaders {
}

// Core messaging interfaces
public interface Message<T> {
    MessageHeaders getHeaders();
    T getBody();
}

public interface MessageHeaders {
    <T> Optional<T> get(String name, Class<T> type);
    Set<String> names();
    Map<String, Object> asMap();
}

public interface MessageBodySerializer<T> {
    T serialize(Object object, MessageHeaders headers);
    Object deserialize(T message, Class<?> type);
}

// Application lifecycle for messaging
public class MessagingApplication {
    public static ApplicationContext run(String... args);
    public static ApplicationContext run(Class<?> mainClass, String... args);
}

Install with Tessl CLI

npx tessl i tessl/maven-micronaut

docs

aop.md

configuration.md

dependency-injection.md

functions.md

http-client.md

http-server.md

index.md

management.md

messaging.md

reactive.md

retry.md

scheduling.md

websocket.md

tile.json