0
# Message Operations
1
2
Core messaging operations using RabbitTemplate and RabbitMessagingTemplate for publishing, consuming, and managing messages in RabbitMQ.
3
4
## RabbitTemplate Operations
5
6
### RabbitTemplate Configuration
7
8
```java { .api }
9
// Template Configurer
10
public class RabbitTemplateConfigurer {
11
public void configure(RabbitTemplate template, ConnectionFactory connectionFactory) {
12
// Configures template with default properties and connection factory
13
}
14
}
15
16
@Configuration
17
public class TemplateConfig {
18
19
@Bean
20
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
21
RabbitTemplateConfigurer configurer) {
22
RabbitTemplate template = new RabbitTemplate();
23
configurer.configure(template, connectionFactory);
24
return template;
25
}
26
}
27
```
28
29
### Basic Send Operations
30
31
```java { .api }
32
public class RabbitTemplate {
33
34
// Send with default exchange and routing key
35
void send(Message message);
36
37
// Send to specific routing key
38
void send(String routingKey, Message message);
39
40
// Send to specific exchange and routing key
41
void send(String exchange, String routingKey, Message message);
42
43
// Convert and send object
44
void convertAndSend(Object message);
45
void convertAndSend(String routingKey, Object message);
46
void convertAndSend(String exchange, String routingKey, Object message);
47
48
// Convert and send with message post processor
49
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor);
50
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor);
51
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor);
52
}
53
```
54
55
### Receive Operations
56
57
```java { .api }
58
public class RabbitTemplate {
59
60
// Receive message
61
Message receive();
62
Message receive(String queueName);
63
Message receive(long timeoutMillis);
64
Message receive(String queueName, long timeoutMillis);
65
66
// Receive and convert to object
67
<T> T receiveAndConvert();
68
<T> T receiveAndConvert(String queueName);
69
<T> T receiveAndConvert(long timeoutMillis);
70
<T> T receiveAndConvert(String queueName, long timeoutMillis);
71
<T> T receiveAndConvert(ParameterizedTypeReference<T> type);
72
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type);
73
}
74
```
75
76
### Request-Reply Operations
77
78
```java { .api }
79
public class RabbitTemplate {
80
81
// Send and receive reply
82
Message sendAndReceive(Message message);
83
Message sendAndReceive(String routingKey, Message message);
84
Message sendAndReceive(String exchange, String routingKey, Message message);
85
86
// Convert, send and receive reply
87
<T> T convertSendAndReceive(Object message, Class<T> responseType);
88
<T> T convertSendAndReceive(String routingKey, Object message, Class<T> responseType);
89
<T> T convertSendAndReceive(String exchange, String routingKey, Object message, Class<T> responseType);
90
91
// With message post processor
92
<T> T convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor, Class<T> responseType);
93
<T> T convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor, Class<T> responseType);
94
<T> T convertSendAndReceive(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, Class<T> responseType);
95
}
96
```
97
98
## RabbitMessagingTemplate Operations
99
100
### Spring Messaging Integration
101
102
```java { .api }
103
public class RabbitMessagingTemplate {
104
105
// Send operations using Spring Messaging API
106
void send(String destinationName, org.springframework.messaging.Message<?> message);
107
void send(org.springframework.messaging.Message<?> message);
108
109
// Convert and send
110
void convertAndSend(String destinationName, Object payload);
111
void convertAndSend(Object payload);
112
void convertAndSend(String destinationName, Object payload, Map<String, Object> headers);
113
void convertAndSend(Object payload, Map<String, Object> headers);
114
void convertAndSend(String destinationName, Object payload, MessagePostProcessor postProcessor);
115
void convertAndSend(Object payload, MessagePostProcessor postProcessor);
116
void convertAndSend(String destinationName, Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor);
117
void convertAndSend(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor);
118
119
// Receive operations
120
org.springframework.messaging.Message<?> receive(String destinationName);
121
org.springframework.messaging.Message<?> receive();
122
123
// Receive and convert
124
<T> T receiveAndConvert(String destinationName, Class<T> targetClass);
125
<T> T receiveAndConvert(Class<T> targetClass);
126
127
// Send and receive
128
org.springframework.messaging.Message<?> sendAndReceive(String destinationName, org.springframework.messaging.Message<?> requestMessage);
129
org.springframework.messaging.Message<?> sendAndReceive(org.springframework.messaging.Message<?> requestMessage);
130
131
// Convert, send and receive
132
<T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass);
133
<T> T convertSendAndReceive(Object request, Class<T> targetClass);
134
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers, Class<T> targetClass);
135
<T> T convertSendAndReceive(Object request, Map<String, Object> headers, Class<T> targetClass);
136
<T> T convertSendAndReceive(String destinationName, Object request, MessagePostProcessor requestPostProcessor, Class<T> targetClass);
137
<T> T convertSendAndReceive(Object request, MessagePostProcessor requestPostProcessor, Class<T> targetClass);
138
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers, MessagePostProcessor requestPostProcessor, Class<T> targetClass);
139
<T> T convertSendAndReceive(Object request, Map<String, Object> headers, MessagePostProcessor requestPostProcessor, Class<T> targetClass);
140
}
141
```
142
143
## Message Conversion
144
145
### Built-in Message Converters
146
147
```java { .api }
148
// JSON message converter
149
public class Jackson2JsonMessageConverter implements MessageConverter {
150
public Jackson2JsonMessageConverter();
151
public Jackson2JsonMessageConverter(ObjectMapper jsonObjectMapper);
152
}
153
154
// Simple message converter (default)
155
public class SimpleMessageConverter implements MessageConverter {
156
public SimpleMessageConverter();
157
}
158
159
// Content type based converter
160
public class ContentTypeDelegatingMessageConverter implements MessageConverter {
161
public ContentTypeDelegatingMessageConverter();
162
public ContentTypeDelegatingMessageConverter(MessageConverter defaultConverter);
163
}
164
```
165
166
### Custom Message Converter Configuration
167
168
```java { .api }
169
@Configuration
170
public class MessageConverterConfig {
171
172
@Bean
173
public MessageConverter messageConverter() {
174
return new Jackson2JsonMessageConverter();
175
}
176
177
@Bean
178
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
179
RabbitTemplate template = new RabbitTemplate(connectionFactory);
180
template.setMessageConverter(messageConverter);
181
return template;
182
}
183
}
184
```
185
186
## Message Properties and Headers
187
188
### Message Properties
189
190
```java { .api }
191
public class MessageProperties {
192
193
// Basic properties
194
void setContentType(String contentType);
195
String getContentType();
196
197
void setContentEncoding(String contentEncoding);
198
String getContentEncoding();
199
200
void setMessageId(String messageId);
201
String getMessageId();
202
203
void setCorrelationId(String correlationId);
204
String getCorrelationId();
205
206
void setReplyTo(String replyTo);
207
String getReplyTo();
208
209
void setExpiration(String expiration);
210
String getExpiration();
211
212
void setTimestamp(Date timestamp);
213
Date getTimestamp();
214
215
void setType(String type);
216
String getType();
217
218
void setUserId(String userId);
219
String getUserId();
220
221
void setAppId(String appId);
222
String getAppId();
223
224
void setClusterId(String clusterId);
225
String getClusterId();
226
227
// Delivery properties
228
void setDeliveryMode(MessageDeliveryMode deliveryMode);
229
MessageDeliveryMode getDeliveryMode();
230
231
void setPriority(Integer priority);
232
Integer getPriority();
233
234
// Headers
235
Map<String, Object> getHeaders();
236
void setHeaders(Map<String, Object> headers);
237
void setHeader(String key, Object value);
238
}
239
```
240
241
### Custom Headers Usage
242
243
```java
244
@Service
245
public class MessagingService {
246
247
@Autowired
248
private RabbitTemplate rabbitTemplate;
249
250
public void sendWithCustomHeaders(String routingKey, Object message) {
251
rabbitTemplate.convertAndSend(routingKey, message, msg -> {
252
msg.getMessageProperties().setHeader("custom-header", "value");
253
msg.getMessageProperties().setHeader("timestamp", System.currentTimeMillis());
254
msg.getMessageProperties().setPriority(5);
255
return msg;
256
});
257
}
258
}
259
```
260
261
## Batch Operations
262
263
### Batch Publishing
264
265
```java { .api }
266
public class BatchingRabbitTemplate extends RabbitTemplate {
267
268
public BatchingRabbitTemplate();
269
public BatchingRabbitTemplate(ConnectionFactory connectionFactory, BatchingStrategy batchingStrategy, TaskScheduler taskScheduler);
270
271
// Batching strategy configuration
272
void setBatchingStrategy(BatchingStrategy batchingStrategy);
273
BatchingStrategy getBatchingStrategy();
274
}
275
276
public interface BatchingStrategy {
277
boolean canDebatch(MessageProperties properties);
278
void addToBatch(String exchange, String routingKey, Message message);
279
Collection<Message> releaseBatch();
280
}
281
```
282
283
### Simple Batching Strategy
284
285
```java { .api }
286
public class SimpleBatchingStrategy implements BatchingStrategy {
287
288
public SimpleBatchingStrategy(int batchSize, int bufferLimit, long timeout);
289
290
void setBatchSize(int batchSize);
291
int getBatchSize();
292
293
void setBufferLimit(int bufferLimit);
294
int getBufferLimit();
295
296
void setTimeout(long timeout);
297
long getTimeout();
298
}
299
```
300
301
## Usage Examples
302
303
### Basic Message Publishing
304
305
```java
306
@Service
307
public class NotificationService {
308
309
@Autowired
310
private RabbitTemplate rabbitTemplate;
311
312
public void sendEmailNotification(String email, String subject, String body) {
313
EmailNotification notification = new EmailNotification(email, subject, body);
314
rabbitTemplate.convertAndSend("notifications.email", notification);
315
}
316
317
public void sendOrderConfirmation(Order order) {
318
rabbitTemplate.convertAndSend("orders.exchange", "order.confirmed", order, message -> {
319
message.getMessageProperties().setExpiration("300000"); // 5 minutes TTL
320
message.getMessageProperties().setPriority(5);
321
return message;
322
});
323
}
324
}
325
```
326
327
### Request-Reply Pattern
328
329
```java
330
@Service
331
public class UserService {
332
333
@Autowired
334
private RabbitTemplate rabbitTemplate;
335
336
public UserProfile getUserProfile(String userId) {
337
UserProfileRequest request = new UserProfileRequest(userId);
338
return rabbitTemplate.convertSendAndReceive("user.profile.queue", request, UserProfile.class);
339
}
340
341
public ValidationResult validateUser(User user) {
342
return rabbitTemplate.convertSendAndReceive("validation.exchange", "user.validate",
343
user, ValidationResult.class);
344
}
345
}
346
```
347
348
### Message Correlation
349
350
```java
351
@Service
352
public class OrderProcessingService {
353
354
@Autowired
355
private RabbitTemplate rabbitTemplate;
356
357
public void processOrder(Order order) {
358
String correlationId = UUID.randomUUID().toString();
359
360
rabbitTemplate.convertAndSend("orders.processing", order, message -> {
361
message.getMessageProperties().setCorrelationId(correlationId);
362
message.getMessageProperties().setReplyTo("orders.results");
363
return message;
364
});
365
}
366
}
367
```