Spring Boot starter for AMQP messaging with RabbitMQ including auto-configuration, connection management, and message processing capabilities
—
Core messaging operations using auto-configured RabbitTemplate, RabbitMessagingTemplate, and AmqpAdmin for sending messages, receiving responses, and managing RabbitMQ infrastructure components.
Primary template for RabbitMQ operations with support for message conversion, routing, publisher confirms, and mandatory returns.
/**
* Main template for RabbitMQ operations
*/
public class RabbitTemplate implements RabbitOperations {
/** Send a message to the default exchange */
public void send(String routingKey, Message message);
/** Send a message to a specific exchange */
public void send(String exchange, String routingKey, Message message);
/** Convert and send an object as a message */
public void convertAndSend(String routingKey, Object object);
/** Convert and send an object to a specific exchange */
public void convertAndSend(String exchange, String routingKey, Object object);
/** Convert and send with message post-processor */
public void convertAndSend(String exchange, String routingKey, Object object,
MessagePostProcessor messagePostProcessor);
/** Receive a message from a queue */
public Message receive(String queueName);
/** Receive with timeout */
public Message receive(String queueName, long timeoutMillis);
/** Convert and receive an object */
public Object receiveAndConvert(String queueName);
/** Send and receive (RPC pattern) */
public Object convertSendAndReceive(String routingKey, Object object);
/** Send and receive with specific exchange */
public Object convertSendAndReceive(String exchange, String routingKey, Object object);
/** Execute operations within a channel callback */
public <T> T execute(ChannelCallback<T> action);
}Usage Examples:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessagingService {
@Autowired
private RabbitTemplate rabbitTemplate;
// Basic send
public void sendSimpleMessage(String message) {
rabbitTemplate.convertAndSend("myQueue", message);
}
// Send to specific exchange
public void sendToExchange(String exchange, String routingKey, Object data) {
rabbitTemplate.convertAndSend(exchange, routingKey, data);
}
// RPC call
public String requestResponse(String message) {
return (String) rabbitTemplate.convertSendAndReceive("rpc.queue", message);
}
// Receive message synchronously
public String receiveMessage(String queueName) {
return (String) rabbitTemplate.receiveAndConvert(queueName);
}
}Spring messaging template wrapper that provides integration with Spring's messaging abstraction and message conversion.
/**
* Template that wraps RabbitTemplate with Spring messaging abstractions
*/
public class RabbitMessagingTemplate implements RabbitMessageOperations {
/** Convert and send using Spring messaging Message */
public void convertAndSend(String destinationName, Object payload);
/** Convert and send with headers */
public void convertAndSend(String destinationName, Object payload, Map<String, Object> headers);
/** Convert and send with message post-processor */
public void convertAndSend(String destinationName, Object payload,
MessagePostProcessor postProcessor);
/** Receive and convert using Spring messaging abstractions */
public <T> T receiveAndConvert(String destinationName, Class<T> targetClass);
/** Send and receive (RPC) using Spring messaging */
public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass);
}Usage Example:
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;
@Service
public class SpringMessagingService {
@Autowired
private RabbitMessagingTemplate messagingTemplate;
public void sendWithHeaders(String queue, Object payload, String userId) {
Map<String, Object> headers = new HashMap<>();
headers.put("userId", userId);
headers.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSend(queue, payload, headers);
}
}Administrative interface for creating and managing queues, exchanges, and bindings programmatically.
/**
* Administrative operations for AMQP infrastructure
*/
public interface AmqpAdmin {
/** Declare a queue */
void declareQueue(Queue queue);
/** Declare an exchange */
void declareExchange(Exchange exchange);
/** Declare a binding between queue and exchange */
void declareBinding(Binding binding);
/** Delete a queue */
boolean deleteQueue(String queueName);
/** Delete an exchange */
boolean deleteExchange(String exchangeName);
/** Remove a binding */
void removeBinding(Binding binding);
/** Get queue properties */
Properties getQueueProperties(String queueName);
/** Get queue info including message count */
QueueInformation getQueueInfo(String queueName);
/** Purge a queue (remove all messages) */
void purgeQueue(String queueName, boolean noWait);
}Usage Example:
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
@Component
public class QueueSetup {
@Autowired
private AmqpAdmin amqpAdmin;
@PostConstruct
public void setupQueues() {
// Declare queue
Queue queue = QueueBuilder.durable("task.queue").build();
amqpAdmin.declareQueue(queue);
// Declare exchange
Exchange exchange = ExchangeBuilder.topicExchange("task.exchange").build();
amqpAdmin.declareExchange(exchange);
// Declare binding
Binding binding = BindingBuilder.bind(queue).to(TopicExchange.class.cast(exchange))
.with("task.*");
amqpAdmin.declareBinding(binding);
}
}Abstraction layer for RabbitMQ connection details that can be implemented for different configuration sources.
/**
* Abstraction for RabbitMQ connection details
*/
public interface RabbitConnectionDetails {
/** Get connection username */
String getUsername();
/** Get connection password */
String getPassword();
/** Get RabbitMQ host */
String getHost();
/** Get RabbitMQ port */
int getPort();
/** Get virtual host */
String getVirtualHost();
/** Get connection addresses for cluster setup */
List<String> getAddresses();
/** Get connection URI if available */
URI getUri();
}
/**
* Default implementation using RabbitProperties
*/
public class PropertiesRabbitConnectionDetails implements RabbitConnectionDetails {
// Implementation backed by RabbitProperties
}Auto-configured caching connection factory with connection pooling, SSL support, and customization options.
/**
* Auto-configured connection factory (typically CachingConnectionFactory)
*/
public interface ConnectionFactory {
/** Create an AMQP connection */
Connection createConnection() throws AmqpException;
/** Get connection host */
String getHost();
/** Get connection port */
int getPort();
/** Get virtual host */
String getVirtualHost();
/** Get username */
String getUsername();
}
/**
* Caching connection factory implementation
*/
public class CachingConnectionFactory extends AbstractConnectionFactory {
/** Set cache mode (CONNECTION or CHANNEL) */
public void setCacheMode(CacheMode cacheMode);
/** Set connection cache size */
public void setConnectionCacheSize(int connectionCacheSize);
/** Set channel cache size */
public void setChannelCacheSize(int channelCacheSize);
/** Enable/disable publisher confirms */
public void setPublisherConfirmType(ConfirmType confirmType);
/** Enable/disable publisher returns */
public void setPublisherReturns(boolean publisherReturns);
}Support for automatic message conversion using configurable message converters.
/**
* Message converter interface for automatic conversion
*/
public interface MessageConverter {
/** Convert object to Message */
Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;
/** Convert Message to object */
Object fromMessage(Message message) throws MessageConversionException;
}
/**
* Common message converters available
*/
// JSON converter using Jackson
SimpleMessageConverter simpleMessageConverter;
Jackson2JsonMessageConverter jsonMessageConverter;
MarshallingMessageConverter xmlMessageConverter;Usage Example:
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
@Configuration
public class RabbitConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}Spring Boot provides several interfaces for customizing auto-configured AMQP components.
/**
* Callback interface for customizing RabbitTemplate instances
*/
@FunctionalInterface
public interface RabbitTemplateCustomizer {
/** Customize a RabbitTemplate instance */
void customize(RabbitTemplate rabbitTemplate);
}
/**
* Callback interface for customizing the RabbitMQ ConnectionFactory
*/
@FunctionalInterface
public interface ConnectionFactoryCustomizer {
/** Customize the native RabbitMQ ConnectionFactory */
void customize(com.rabbitmq.client.ConnectionFactory factory);
}
/**
* Callback interface for customizing RabbitMQ retry templates
*/
@FunctionalInterface
public interface RabbitRetryTemplateCustomizer {
/** Customize the retry template */
void customize(Target target, RetryTemplate retryTemplate);
/** Target enumeration for retry customization */
enum Target {
SENDER, LISTENER
}
}Customization Usage Examples:
import org.springframework.boot.autoconfigure.amqp.RabbitTemplateCustomizer;
import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer;
@Configuration
public class RabbitCustomizationConfig {
@Bean
public RabbitTemplateCustomizer rabbitTemplateCustomizer() {
return (template) -> {
template.setReceiveTimeout(5000);
template.setReplyTimeout(10000);
template.setMandatory(true);
};
}
@Bean
public ConnectionFactoryCustomizer connectionFactoryCustomizer() {
return (factory) -> {
factory.setRequestedHeartbeat(30);
factory.setConnectionTimeout(10000);
factory.setHandshakeTimeout(20000);
};
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-amqp