0
# Messaging
1
2
Micronaut provides comprehensive support for building message-driven applications with various messaging systems including RabbitMQ, Kafka, and in-memory messaging.
3
4
## Capabilities
5
6
### Message Listeners
7
8
Create message listeners to handle incoming messages from various messaging systems.
9
10
```java { .api }
11
/**
12
* Message listener for handling messages
13
*/
14
@MessageListener
15
public class UserEventHandler {
16
17
@MessageMapping("user.created")
18
void handleUserCreated(@MessageBody User user, @MessageHeader String correlationId) {
19
// Handle user creation event
20
System.out.println("User created: " + user.getName() + " (ID: " + correlationId + ")");
21
}
22
23
@MessageMapping("user.updated")
24
void handleUserUpdated(@MessageBody User user, @MessageHeaders MessageHeaders headers) {
25
// Handle user update event with all headers
26
String timestamp = headers.get("timestamp", String.class).orElse("unknown");
27
System.out.println("User updated at: " + timestamp);
28
}
29
30
@MessageMapping("user.deleted")
31
Single<String> handleUserDeleted(@MessageBody DeleteEvent event) {
32
// Reactive message handling with return value
33
return userService.cleanup(event.getUserId())
34
.map(result -> "Cleanup completed for user: " + event.getUserId());
35
}
36
}
37
```
38
39
### Message Producers
40
41
Create message producers to send messages to messaging systems.
42
43
```java { .api }
44
/**
45
* Message producer interface
46
*/
47
@MessageProducer
48
public interface UserEventProducer {
49
50
@MessageMapping("user.created")
51
void sendUserCreated(@MessageBody User user);
52
53
@MessageMapping("user.updated")
54
@SendTo("user.events")
55
void sendUserUpdated(@MessageBody User user, @MessageHeader("correlationId") String id);
56
57
@MessageMapping("user.deleted")
58
Publisher<String> sendUserDeleted(@MessageBody DeleteEvent event);
59
}
60
```
61
62
### Message Configuration
63
64
Configure messaging systems and connection properties.
65
66
```java { .api }
67
/**
68
* RabbitMQ configuration
69
*/
70
@ConfigurationProperties("rabbitmq")
71
public class RabbitConfiguration {
72
private String uri = "amqp://localhost:5672";
73
private String username = "guest";
74
private String password = "guest";
75
76
// getters and setters
77
}
78
79
/**
80
* Kafka configuration
81
*/
82
@ConfigurationProperties("kafka")
83
public class KafkaConfiguration {
84
private String bootstrapServers = "localhost:9092";
85
private String groupId = "micronaut-app";
86
87
// getters and setters
88
}
89
```
90
91
### Error Handling
92
93
Handle message processing errors with retry and dead letter queues.
94
95
```java { .api }
96
/**
97
* Message error handling
98
*/
99
@MessageListener
100
public class ErrorHandlingListener {
101
102
@MessageMapping("orders.process")
103
@MessageErrorHandler(ErrorHandler.RETRY)
104
@Retryable(attempts = "3", delay = "1s")
105
void processOrder(@MessageBody Order order) {
106
// Process order with retry on failure
107
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
108
throw new IllegalArgumentException("Invalid order amount");
109
}
110
orderService.process(order);
111
}
112
113
@MessageMapping("orders.failed")
114
void handleFailedOrder(@MessageBody Order order, @MessageHeader String errorMessage) {
115
// Handle messages that failed processing
116
alertService.notifyFailedOrder(order, errorMessage);
117
}
118
}
119
```
120
121
### Reactive Messaging
122
123
Use reactive types for asynchronous message processing.
124
125
```java { .api }
126
/**
127
* Reactive message processing
128
*/
129
@MessageListener
130
public class ReactiveMessageHandler {
131
132
@MessageMapping("data.stream")
133
Flowable<ProcessedData> handleDataStream(@MessageBody Flowable<RawData> dataStream) {
134
return dataStream
135
.buffer(100)
136
.map(this::processBatch)
137
.flatMapIterable(list -> list);
138
}
139
140
@MessageMapping("notification.send")
141
Single<String> sendNotification(@MessageBody NotificationRequest request) {
142
return notificationService.sendAsync(request)
143
.map(result -> "Notification sent: " + result.getId());
144
}
145
}
146
```
147
148
### Message Serialization
149
150
Configure serialization for message bodies with various formats.
151
152
```java { .api }
153
/**
154
* Custom message serialization
155
*/
156
@Singleton
157
public class MessageSerializationConfiguration {
158
159
@Bean
160
@Primary
161
MessageBodySerializer<Object> jsonSerializer() {
162
return new JsonMessageBodySerializer();
163
}
164
165
@Bean
166
@Named("avro")
167
MessageBodySerializer<Object> avroSerializer() {
168
return new AvroMessageBodySerializer();
169
}
170
}
171
```
172
173
## Types
174
175
```java { .api }
176
// Messaging annotations
177
@Target({ElementType.TYPE})
178
@Retention(RetentionPolicy.RUNTIME)
179
public @interface MessageListener {
180
}
181
182
@Target({ElementType.TYPE})
183
@Retention(RetentionPolicy.RUNTIME)
184
public @interface MessageProducer {
185
}
186
187
@Target({ElementType.METHOD})
188
@Retention(RetentionPolicy.RUNTIME)
189
public @interface MessageMapping {
190
String value();
191
}
192
193
@Target({ElementType.METHOD})
194
@Retention(RetentionPolicy.RUNTIME)
195
public @interface SendTo {
196
String value();
197
}
198
199
@Target({ElementType.PARAMETER})
200
@Retention(RetentionPolicy.RUNTIME)
201
public @interface MessageBody {
202
}
203
204
@Target({ElementType.PARAMETER})
205
@Retention(RetentionPolicy.RUNTIME)
206
public @interface MessageHeader {
207
String value() default "";
208
}
209
210
@Target({ElementType.PARAMETER})
211
@Retention(RetentionPolicy.RUNTIME)
212
public @interface MessageHeaders {
213
}
214
215
// Core messaging interfaces
216
public interface Message<T> {
217
MessageHeaders getHeaders();
218
T getBody();
219
}
220
221
public interface MessageHeaders {
222
<T> Optional<T> get(String name, Class<T> type);
223
Set<String> names();
224
Map<String, Object> asMap();
225
}
226
227
public interface MessageBodySerializer<T> {
228
T serialize(Object object, MessageHeaders headers);
229
Object deserialize(T message, Class<?> type);
230
}
231
232
// Application lifecycle for messaging
233
public class MessagingApplication {
234
public static ApplicationContext run(String... args);
235
public static ApplicationContext run(Class<?> mainClass, String... args);
236
}
237
```