Spring Boot starter for AMQP messaging with RabbitMQ including auto-configuration, connection management, and message processing capabilities
—
Health monitoring and metrics collection for RabbitMQ connections through Spring Boot Actuator endpoints, providing operational visibility into messaging infrastructure health and performance.
Auto-configured health indicator that monitors RabbitMQ connectivity and provides health status through the /actuator/health endpoint.
/**
* Auto-configuration for RabbitMQ health indicator
*/
@AutoConfiguration(after = RabbitAutoConfiguration.class)
@ConditionalOnClass(RabbitTemplate.class)
@ConditionalOnBean(RabbitTemplate.class)
@ConditionalOnEnabledHealthIndicator("rabbit")
public class RabbitHealthContributorAutoConfiguration {
/** Creates health contributor for RabbitMQ */
@Bean
@ConditionalOnMissingBean(name = {"rabbitHealthIndicator", "rabbitHealthContributor"})
public HealthContributor rabbitHealthContributor(ConfigurableListableBeanFactory beanFactory);
}
/**
* Health indicator implementation
*/
public class RabbitHealthIndicator implements HealthIndicator {
/** Check RabbitMQ health */
@Override
public Health health();
}Health Response Examples:
// Healthy RabbitMQ connection
{
"status": "UP",
"components": {
"rabbit": {
"status": "UP",
"details": {
"version": "3.12.4"
}
}
}
}
// Unhealthy RabbitMQ connection
{
"status": "DOWN",
"components": {
"rabbit": {
"status": "DOWN",
"details": {
"error": "org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused"
}
}
}
}Health Configuration:
management:
health:
rabbit:
enabled: true
endpoint:
health:
show-details: always
endpoints:
web:
exposure:
include: healthAuto-configured metrics collection for RabbitMQ connection factories using Micrometer, providing detailed operational metrics.
/**
* Auto-configuration for RabbitMQ metrics
*/
@AutoConfiguration(after = {MetricsAutoConfiguration.class, RabbitAutoConfiguration.class})
@ConditionalOnClass({ConnectionFactory.class, AbstractConnectionFactory.class})
@ConditionalOnBean({org.springframework.amqp.rabbit.connection.ConnectionFactory.class, MeterRegistry.class})
public class RabbitMetricsAutoConfiguration {
/** Creates metrics post-processor for connection factories */
@Bean
public static RabbitConnectionFactoryMetricsPostProcessor rabbitConnectionFactoryMetricsPostProcessor(
ApplicationContext applicationContext);
}
/**
* Post-processor that adds metrics to connection factories
*/
public class RabbitConnectionFactoryMetricsPostProcessor implements BeanPostProcessor {
/** Add metrics to connection factory after initialization */
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}Available Metrics:
/**
* RabbitMQ connection metrics available through Micrometer
*/
// Connection metrics
"rabbitmq.connections" - Gauge of active connections
"rabbitmq.connections.opened" - Counter of opened connections
"rabbitmq.connections.closed" - Counter of closed connections
// Channel metrics
"rabbitmq.channels" - Gauge of active channels
"rabbitmq.channels.opened" - Counter of opened channels
"rabbitmq.channels.closed" - Counter of closed channels
// Message metrics (when available)
"rabbitmq.published" - Counter of published messages
"rabbitmq.published.confirmed" - Counter of confirmed publishes
"rabbitmq.published.returned" - Counter of returned messages
"rabbitmq.consumed" - Counter of consumed messages
"rabbitmq.acknowledged" - Counter of acknowledged messages
"rabbitmq.rejected" - Counter of rejected messages
// Connection pool metrics (for caching connection factory)
"rabbitmq.connection.pool.size" - Current pool size
"rabbitmq.connection.pool.active" - Active connections in pool
"rabbitmq.connection.pool.idle" - Idle connections in poolMetrics Configuration:
management:
metrics:
enable:
rabbitmq: true
endpoints:
web:
exposure:
include: metrics, prometheusExtending health checks with custom RabbitMQ health indicators for specific business requirements.
/**
* Custom health indicator example
*/
@Component
public class CustomRabbitHealthIndicator implements HealthIndicator {
private final RabbitTemplate rabbitTemplate;
private final AmqpAdmin amqpAdmin;
public CustomRabbitHealthIndicator(RabbitTemplate rabbitTemplate, AmqpAdmin amqpAdmin) {
this.rabbitTemplate = rabbitTemplate;
this.amqpAdmin = amqpAdmin;
}
@Override
public Health health() {
try {
// Test connection by checking queue
Properties queueProperties = amqpAdmin.getQueueProperties("test.queue");
// Test message operations
rabbitTemplate.convertAndSend("health.check", "ping");
return Health.up()
.withDetail("connection", "OK")
.withDetail("queues", queueProperties != null ? "accessible" : "limited")
.withDetail("messaging", "operational")
.build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.withException(e)
.build();
}
}
}Adding custom metrics for application-specific RabbitMQ monitoring requirements.
/**
* Custom metrics configuration
*/
@Component
public class CustomRabbitMetrics {
private final Counter messagesProcessed;
private final Timer processingTime;
private final Gauge queueDepth;
public CustomRabbitMetrics(MeterRegistry meterRegistry, AmqpAdmin amqpAdmin) {
this.messagesProcessed = Counter.builder("rabbitmq.messages.processed")
.description("Number of messages processed successfully")
.tag("application", "myapp")
.register(meterRegistry);
this.processingTime = Timer.builder("rabbitmq.message.processing.time")
.description("Time taken to process messages")
.register(meterRegistry);
this.queueDepth = Gauge.builder("rabbitmq.queue.depth")
.description("Number of messages in specific queue")
.tag("queue", "important.queue")
.register(meterRegistry, this, metrics -> getQueueDepth(amqpAdmin, "important.queue"));
}
/** Record message processing */
public void recordMessageProcessed() {
messagesProcessed.increment();
}
/** Record processing time */
public Timer.Sample startProcessingTimer() {
return Timer.start(processingTime);
}
private double getQueueDepth(AmqpAdmin admin, String queueName) {
try {
Properties props = admin.getQueueProperties(queueName);
return props != null ? (Integer) props.get("QUEUE_MESSAGE_COUNT") : 0;
} catch (Exception e) {
return -1; // Error state
}
}
}Usage in Message Handlers:
@Component
public class MetricEnabledMessageHandler {
private final CustomRabbitMetrics metrics;
public MetricEnabledMessageHandler(CustomRabbitMetrics metrics) {
this.metrics = metrics;
}
@RabbitListener(queues = "monitored.queue")
public void handleMessage(String message) {
Timer.Sample timer = metrics.startProcessingTimer();
try {
// Process message
processMessage(message);
// Record success
metrics.recordMessageProcessed();
} finally {
timer.stop();
}
}
}Additional management endpoints for RabbitMQ monitoring and administration.
/**
* Custom management endpoint for RabbitMQ information
*/
@Component
@Endpoint(id = "rabbitmq")
public class RabbitManagementEndpoint {
private final CachingConnectionFactory connectionFactory;
private final AmqpAdmin amqpAdmin;
/** Get connection information */
@ReadOperation
public Map<String, Object> connectionInfo() {
Map<String, Object> info = new HashMap<>();
info.put("host", connectionFactory.getHost());
info.put("port", connectionFactory.getPort());
info.put("virtualHost", connectionFactory.getVirtualHost());
info.put("cacheMode", connectionFactory.getCacheMode());
info.put("channelCacheSize", connectionFactory.getChannelCacheSize());
return info;
}
/** Get queue information */
@ReadOperation
public Map<String, Object> queueInfo(@Selector String queueName) {
try {
Properties props = amqpAdmin.getQueueProperties(queueName);
if (props != null) {
Map<String, Object> queueInfo = new HashMap<>();
queueInfo.put("name", queueName);
queueInfo.put("messageCount", props.get("QUEUE_MESSAGE_COUNT"));
queueInfo.put("consumerCount", props.get("QUEUE_CONSUMER_COUNT"));
return queueInfo;
}
return Map.of("error", "Queue not found");
} catch (Exception e) {
return Map.of("error", e.getMessage());
}
}
/** Purge a queue */
@WriteOperation
public Map<String, Object> purgeQueue(@Selector String queueName) {
try {
amqpAdmin.purgeQueue(queueName, false);
return Map.of("status", "purged", "queue", queueName);
} catch (Exception e) {
return Map.of("error", e.getMessage());
}
}
}Management Endpoint Configuration:
management:
endpoints:
web:
exposure:
include: rabbitmq
endpoint:
rabbitmq:
enabled: trueEndpoint Usage Examples:
# Get connection information
curl http://localhost:8080/actuator/rabbitmq
# Get specific queue information
curl http://localhost:8080/actuator/rabbitmq/task.queue
# Purge a queue
curl -X POST http://localhost:8080/actuator/rabbitmq/test.queueIntegration with monitoring systems for RabbitMQ alerts and notifications.
/**
* Custom health indicator with alerting
*/
@Component
public class AlertingRabbitHealthIndicator implements HealthIndicator {
private final RabbitTemplate rabbitTemplate;
private final AlertService alertService;
private volatile boolean lastCheckFailed = false;
@Override
public Health health() {
try {
// Perform health check
rabbitTemplate.execute(channel -> {
channel.basicPublish("", "health.check", null, "ping".getBytes());
return null;
});
if (lastCheckFailed) {
alertService.sendRecoveryAlert("RabbitMQ connection restored");
lastCheckFailed = false;
}
return Health.up().build();
} catch (Exception e) {
if (!lastCheckFailed) {
alertService.sendAlert("RabbitMQ connection failed: " + e.getMessage());
lastCheckFailed = true;
}
return Health.down().withException(e).build();
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-amqp