or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aop.mdconfiguration.mddependency-injection.mdfunctions.mdhttp-client.mdhttp-server.mdindex.mdmanagement.mdmessaging.mdreactive.mdretry.mdscheduling.mdwebsocket.md

messaging.mddocs/

0

# Messaging

1

2

Micronaut provides comprehensive support for building message-driven applications with various messaging systems including RabbitMQ, Kafka, and in-memory messaging.

3

4

## Capabilities

5

6

### Message Listeners

7

8

Create message listeners to handle incoming messages from various messaging systems.

9

10

```java { .api }

11

/**

12

* Message listener for handling messages

13

*/

14

@MessageListener

15

public class UserEventHandler {

16

17

@MessageMapping("user.created")

18

void handleUserCreated(@MessageBody User user, @MessageHeader String correlationId) {

19

// Handle user creation event

20

System.out.println("User created: " + user.getName() + " (ID: " + correlationId + ")");

21

}

22

23

@MessageMapping("user.updated")

24

void handleUserUpdated(@MessageBody User user, @MessageHeaders MessageHeaders headers) {

25

// Handle user update event with all headers

26

String timestamp = headers.get("timestamp", String.class).orElse("unknown");

27

System.out.println("User updated at: " + timestamp);

28

}

29

30

@MessageMapping("user.deleted")

31

Single<String> handleUserDeleted(@MessageBody DeleteEvent event) {

32

// Reactive message handling with return value

33

return userService.cleanup(event.getUserId())

34

.map(result -> "Cleanup completed for user: " + event.getUserId());

35

}

36

}

37

```

38

39

### Message Producers

40

41

Create message producers to send messages to messaging systems.

42

43

```java { .api }

44

/**

45

* Message producer interface

46

*/

47

@MessageProducer

48

public interface UserEventProducer {

49

50

@MessageMapping("user.created")

51

void sendUserCreated(@MessageBody User user);

52

53

@MessageMapping("user.updated")

54

@SendTo("user.events")

55

void sendUserUpdated(@MessageBody User user, @MessageHeader("correlationId") String id);

56

57

@MessageMapping("user.deleted")

58

Publisher<String> sendUserDeleted(@MessageBody DeleteEvent event);

59

}

60

```

61

62

### Message Configuration

63

64

Configure messaging systems and connection properties.

65

66

```java { .api }

67

/**

68

* RabbitMQ configuration

69

*/

70

@ConfigurationProperties("rabbitmq")

71

public class RabbitConfiguration {

72

private String uri = "amqp://localhost:5672";

73

private String username = "guest";

74

private String password = "guest";

75

76

// getters and setters

77

}

78

79

/**

80

* Kafka configuration

81

*/

82

@ConfigurationProperties("kafka")

83

public class KafkaConfiguration {

84

private String bootstrapServers = "localhost:9092";

85

private String groupId = "micronaut-app";

86

87

// getters and setters

88

}

89

```

90

91

### Error Handling

92

93

Handle message processing errors with retry and dead letter queues.

94

95

```java { .api }

96

/**

97

* Message error handling

98

*/

99

@MessageListener

100

public class ErrorHandlingListener {

101

102

@MessageMapping("orders.process")

103

@MessageErrorHandler(ErrorHandler.RETRY)

104

@Retryable(attempts = "3", delay = "1s")

105

void processOrder(@MessageBody Order order) {

106

// Process order with retry on failure

107

if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {

108

throw new IllegalArgumentException("Invalid order amount");

109

}

110

orderService.process(order);

111

}

112

113

@MessageMapping("orders.failed")

114

void handleFailedOrder(@MessageBody Order order, @MessageHeader String errorMessage) {

115

// Handle messages that failed processing

116

alertService.notifyFailedOrder(order, errorMessage);

117

}

118

}

119

```

120

121

### Reactive Messaging

122

123

Use reactive types for asynchronous message processing.

124

125

```java { .api }

126

/**

127

* Reactive message processing

128

*/

129

@MessageListener

130

public class ReactiveMessageHandler {

131

132

@MessageMapping("data.stream")

133

Flowable<ProcessedData> handleDataStream(@MessageBody Flowable<RawData> dataStream) {

134

return dataStream

135

.buffer(100)

136

.map(this::processBatch)

137

.flatMapIterable(list -> list);

138

}

139

140

@MessageMapping("notification.send")

141

Single<String> sendNotification(@MessageBody NotificationRequest request) {

142

return notificationService.sendAsync(request)

143

.map(result -> "Notification sent: " + result.getId());

144

}

145

}

146

```

147

148

### Message Serialization

149

150

Configure serialization for message bodies with various formats.

151

152

```java { .api }

153

/**

154

* Custom message serialization

155

*/

156

@Singleton

157

public class MessageSerializationConfiguration {

158

159

@Bean

160

@Primary

161

MessageBodySerializer<Object> jsonSerializer() {

162

return new JsonMessageBodySerializer();

163

}

164

165

@Bean

166

@Named("avro")

167

MessageBodySerializer<Object> avroSerializer() {

168

return new AvroMessageBodySerializer();

169

}

170

}

171

```

172

173

## Types

174

175

```java { .api }

176

// Messaging annotations

177

@Target({ElementType.TYPE})

178

@Retention(RetentionPolicy.RUNTIME)

179

public @interface MessageListener {

180

}

181

182

@Target({ElementType.TYPE})

183

@Retention(RetentionPolicy.RUNTIME)

184

public @interface MessageProducer {

185

}

186

187

@Target({ElementType.METHOD})

188

@Retention(RetentionPolicy.RUNTIME)

189

public @interface MessageMapping {

190

String value();

191

}

192

193

@Target({ElementType.METHOD})

194

@Retention(RetentionPolicy.RUNTIME)

195

public @interface SendTo {

196

String value();

197

}

198

199

@Target({ElementType.PARAMETER})

200

@Retention(RetentionPolicy.RUNTIME)

201

public @interface MessageBody {

202

}

203

204

@Target({ElementType.PARAMETER})

205

@Retention(RetentionPolicy.RUNTIME)

206

public @interface MessageHeader {

207

String value() default "";

208

}

209

210

@Target({ElementType.PARAMETER})

211

@Retention(RetentionPolicy.RUNTIME)

212

public @interface MessageHeaders {

213

}

214

215

// Core messaging interfaces

216

public interface Message<T> {

217

MessageHeaders getHeaders();

218

T getBody();

219

}

220

221

public interface MessageHeaders {

222

<T> Optional<T> get(String name, Class<T> type);

223

Set<String> names();

224

Map<String, Object> asMap();

225

}

226

227

public interface MessageBodySerializer<T> {

228

T serialize(Object object, MessageHeaders headers);

229

Object deserialize(T message, Class<?> type);

230

}

231

232

// Application lifecycle for messaging

233

public class MessagingApplication {

234

public static ApplicationContext run(String... args);

235

public static ApplicationContext run(Class<?> mainClass, String... args);

236

}

237

```