0
# Core Messaging
1
2
Core messaging operations using auto-configured RabbitTemplate, RabbitMessagingTemplate, and AmqpAdmin for sending messages, receiving responses, and managing RabbitMQ infrastructure components.
3
4
## Capabilities
5
6
### RabbitTemplate
7
8
Primary template for RabbitMQ operations with support for message conversion, routing, publisher confirms, and mandatory returns.
9
10
```java { .api }
11
/**
12
* Main template for RabbitMQ operations
13
*/
14
public class RabbitTemplate implements RabbitOperations {
15
16
/** Send a message to the default exchange */
17
public void send(String routingKey, Message message);
18
19
/** Send a message to a specific exchange */
20
public void send(String exchange, String routingKey, Message message);
21
22
/** Convert and send an object as a message */
23
public void convertAndSend(String routingKey, Object object);
24
25
/** Convert and send an object to a specific exchange */
26
public void convertAndSend(String exchange, String routingKey, Object object);
27
28
/** Convert and send with message post-processor */
29
public void convertAndSend(String exchange, String routingKey, Object object,
30
MessagePostProcessor messagePostProcessor);
31
32
/** Receive a message from a queue */
33
public Message receive(String queueName);
34
35
/** Receive with timeout */
36
public Message receive(String queueName, long timeoutMillis);
37
38
/** Convert and receive an object */
39
public Object receiveAndConvert(String queueName);
40
41
/** Send and receive (RPC pattern) */
42
public Object convertSendAndReceive(String routingKey, Object object);
43
44
/** Send and receive with specific exchange */
45
public Object convertSendAndReceive(String exchange, String routingKey, Object object);
46
47
/** Execute operations within a channel callback */
48
public <T> T execute(ChannelCallback<T> action);
49
}
50
```
51
52
**Usage Examples:**
53
54
```java
55
import org.springframework.amqp.rabbit.core.RabbitTemplate;
56
import org.springframework.beans.factory.annotation.Autowired;
57
import org.springframework.stereotype.Service;
58
59
@Service
60
public class MessagingService {
61
62
@Autowired
63
private RabbitTemplate rabbitTemplate;
64
65
// Basic send
66
public void sendSimpleMessage(String message) {
67
rabbitTemplate.convertAndSend("myQueue", message);
68
}
69
70
// Send to specific exchange
71
public void sendToExchange(String exchange, String routingKey, Object data) {
72
rabbitTemplate.convertAndSend(exchange, routingKey, data);
73
}
74
75
// RPC call
76
public String requestResponse(String message) {
77
return (String) rabbitTemplate.convertSendAndReceive("rpc.queue", message);
78
}
79
80
// Receive message synchronously
81
public String receiveMessage(String queueName) {
82
return (String) rabbitTemplate.receiveAndConvert(queueName);
83
}
84
}
85
```
86
87
### RabbitMessagingTemplate
88
89
Spring messaging template wrapper that provides integration with Spring's messaging abstraction and message conversion.
90
91
```java { .api }
92
/**
93
* Template that wraps RabbitTemplate with Spring messaging abstractions
94
*/
95
public class RabbitMessagingTemplate implements RabbitMessageOperations {
96
97
/** Convert and send using Spring messaging Message */
98
public void convertAndSend(String destinationName, Object payload);
99
100
/** Convert and send with headers */
101
public void convertAndSend(String destinationName, Object payload, Map<String, Object> headers);
102
103
/** Convert and send with message post-processor */
104
public void convertAndSend(String destinationName, Object payload,
105
MessagePostProcessor postProcessor);
106
107
/** Receive and convert using Spring messaging abstractions */
108
public <T> T receiveAndConvert(String destinationName, Class<T> targetClass);
109
110
/** Send and receive (RPC) using Spring messaging */
111
public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass);
112
}
113
```
114
115
**Usage Example:**
116
117
```java
118
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
119
import org.springframework.messaging.support.MessageBuilder;
120
121
@Service
122
public class SpringMessagingService {
123
124
@Autowired
125
private RabbitMessagingTemplate messagingTemplate;
126
127
public void sendWithHeaders(String queue, Object payload, String userId) {
128
Map<String, Object> headers = new HashMap<>();
129
headers.put("userId", userId);
130
headers.put("timestamp", System.currentTimeMillis());
131
132
messagingTemplate.convertAndSend(queue, payload, headers);
133
}
134
}
135
```
136
137
### AmqpAdmin
138
139
Administrative interface for creating and managing queues, exchanges, and bindings programmatically.
140
141
```java { .api }
142
/**
143
* Administrative operations for AMQP infrastructure
144
*/
145
public interface AmqpAdmin {
146
147
/** Declare a queue */
148
void declareQueue(Queue queue);
149
150
/** Declare an exchange */
151
void declareExchange(Exchange exchange);
152
153
/** Declare a binding between queue and exchange */
154
void declareBinding(Binding binding);
155
156
/** Delete a queue */
157
boolean deleteQueue(String queueName);
158
159
/** Delete an exchange */
160
boolean deleteExchange(String exchangeName);
161
162
/** Remove a binding */
163
void removeBinding(Binding binding);
164
165
/** Get queue properties */
166
Properties getQueueProperties(String queueName);
167
168
/** Get queue info including message count */
169
QueueInformation getQueueInfo(String queueName);
170
171
/** Purge a queue (remove all messages) */
172
void purgeQueue(String queueName, boolean noWait);
173
}
174
```
175
176
**Usage Example:**
177
178
```java
179
import org.springframework.amqp.core.*;
180
import org.springframework.beans.factory.annotation.Autowired;
181
182
@Component
183
public class QueueSetup {
184
185
@Autowired
186
private AmqpAdmin amqpAdmin;
187
188
@PostConstruct
189
public void setupQueues() {
190
// Declare queue
191
Queue queue = QueueBuilder.durable("task.queue").build();
192
amqpAdmin.declareQueue(queue);
193
194
// Declare exchange
195
Exchange exchange = ExchangeBuilder.topicExchange("task.exchange").build();
196
amqpAdmin.declareExchange(exchange);
197
198
// Declare binding
199
Binding binding = BindingBuilder.bind(queue).to(TopicExchange.class.cast(exchange))
200
.with("task.*");
201
amqpAdmin.declareBinding(binding);
202
}
203
}
204
```
205
206
### RabbitConnectionDetails
207
208
Abstraction layer for RabbitMQ connection details that can be implemented for different configuration sources.
209
210
```java { .api }
211
/**
212
* Abstraction for RabbitMQ connection details
213
*/
214
public interface RabbitConnectionDetails {
215
216
/** Get connection username */
217
String getUsername();
218
219
/** Get connection password */
220
String getPassword();
221
222
/** Get RabbitMQ host */
223
String getHost();
224
225
/** Get RabbitMQ port */
226
int getPort();
227
228
/** Get virtual host */
229
String getVirtualHost();
230
231
/** Get connection addresses for cluster setup */
232
List<String> getAddresses();
233
234
/** Get connection URI if available */
235
URI getUri();
236
}
237
238
/**
239
* Default implementation using RabbitProperties
240
*/
241
public class PropertiesRabbitConnectionDetails implements RabbitConnectionDetails {
242
// Implementation backed by RabbitProperties
243
}
244
```
245
246
### Connection Factory
247
248
Auto-configured caching connection factory with connection pooling, SSL support, and customization options.
249
250
```java { .api }
251
/**
252
* Auto-configured connection factory (typically CachingConnectionFactory)
253
*/
254
public interface ConnectionFactory {
255
256
/** Create an AMQP connection */
257
Connection createConnection() throws AmqpException;
258
259
/** Get connection host */
260
String getHost();
261
262
/** Get connection port */
263
int getPort();
264
265
/** Get virtual host */
266
String getVirtualHost();
267
268
/** Get username */
269
String getUsername();
270
}
271
272
/**
273
* Caching connection factory implementation
274
*/
275
public class CachingConnectionFactory extends AbstractConnectionFactory {
276
277
/** Set cache mode (CONNECTION or CHANNEL) */
278
public void setCacheMode(CacheMode cacheMode);
279
280
/** Set connection cache size */
281
public void setConnectionCacheSize(int connectionCacheSize);
282
283
/** Set channel cache size */
284
public void setChannelCacheSize(int channelCacheSize);
285
286
/** Enable/disable publisher confirms */
287
public void setPublisherConfirmType(ConfirmType confirmType);
288
289
/** Enable/disable publisher returns */
290
public void setPublisherReturns(boolean publisherReturns);
291
}
292
```
293
294
### Message Conversion
295
296
Support for automatic message conversion using configurable message converters.
297
298
```java { .api }
299
/**
300
* Message converter interface for automatic conversion
301
*/
302
public interface MessageConverter {
303
304
/** Convert object to Message */
305
Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;
306
307
/** Convert Message to object */
308
Object fromMessage(Message message) throws MessageConversionException;
309
}
310
311
/**
312
* Common message converters available
313
*/
314
// JSON converter using Jackson
315
SimpleMessageConverter simpleMessageConverter;
316
Jackson2JsonMessageConverter jsonMessageConverter;
317
MarshallingMessageConverter xmlMessageConverter;
318
```
319
320
**Usage Example:**
321
322
```java
323
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
324
import org.springframework.context.annotation.Bean;
325
326
@Configuration
327
public class RabbitConfig {
328
329
@Bean
330
public Jackson2JsonMessageConverter messageConverter() {
331
return new Jackson2JsonMessageConverter();
332
}
333
}
334
```
335
336
### Customization Interfaces
337
338
Spring Boot provides several interfaces for customizing auto-configured AMQP components.
339
340
```java { .api }
341
/**
342
* Callback interface for customizing RabbitTemplate instances
343
*/
344
@FunctionalInterface
345
public interface RabbitTemplateCustomizer {
346
347
/** Customize a RabbitTemplate instance */
348
void customize(RabbitTemplate rabbitTemplate);
349
}
350
351
/**
352
* Callback interface for customizing the RabbitMQ ConnectionFactory
353
*/
354
@FunctionalInterface
355
public interface ConnectionFactoryCustomizer {
356
357
/** Customize the native RabbitMQ ConnectionFactory */
358
void customize(com.rabbitmq.client.ConnectionFactory factory);
359
}
360
361
/**
362
* Callback interface for customizing RabbitMQ retry templates
363
*/
364
@FunctionalInterface
365
public interface RabbitRetryTemplateCustomizer {
366
367
/** Customize the retry template */
368
void customize(Target target, RetryTemplate retryTemplate);
369
370
/** Target enumeration for retry customization */
371
enum Target {
372
SENDER, LISTENER
373
}
374
}
375
```
376
377
**Customization Usage Examples:**
378
379
```java
380
import org.springframework.boot.autoconfigure.amqp.RabbitTemplateCustomizer;
381
import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer;
382
383
@Configuration
384
public class RabbitCustomizationConfig {
385
386
@Bean
387
public RabbitTemplateCustomizer rabbitTemplateCustomizer() {
388
return (template) -> {
389
template.setReceiveTimeout(5000);
390
template.setReplyTimeout(10000);
391
template.setMandatory(true);
392
};
393
}
394
395
@Bean
396
public ConnectionFactoryCustomizer connectionFactoryCustomizer() {
397
return (factory) -> {
398
factory.setRequestedHeartbeat(30);
399
factory.setConnectionTimeout(10000);
400
factory.setHandshakeTimeout(20000);
401
};
402
}
403
}
404
```