CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-boot--spring-boot-starter-amqp

Spring Boot starter for AMQP messaging with RabbitMQ including auto-configuration, connection management, and message processing capabilities

Pending
Overview
Eval results
Files

actuator.mddocs/

Actuator Integration

Health monitoring and metrics collection for RabbitMQ connections through Spring Boot Actuator endpoints, providing operational visibility into messaging infrastructure health and performance.

Capabilities

Health Indicator

Auto-configured health indicator that monitors RabbitMQ connectivity and provides health status through the /actuator/health endpoint.

/**
 * Auto-configuration for RabbitMQ health indicator
 */
@AutoConfiguration(after = RabbitAutoConfiguration.class)
@ConditionalOnClass(RabbitTemplate.class)
@ConditionalOnBean(RabbitTemplate.class)
@ConditionalOnEnabledHealthIndicator("rabbit")
public class RabbitHealthContributorAutoConfiguration {
    
    /** Creates health contributor for RabbitMQ */
    @Bean
    @ConditionalOnMissingBean(name = {"rabbitHealthIndicator", "rabbitHealthContributor"})
    public HealthContributor rabbitHealthContributor(ConfigurableListableBeanFactory beanFactory);
}

/**
 * Health indicator implementation
 */
public class RabbitHealthIndicator implements HealthIndicator {
    
    /** Check RabbitMQ health */
    @Override
    public Health health();
}

Health Response Examples:

// Healthy RabbitMQ connection
{
  "status": "UP",
  "components": {
    "rabbit": {
      "status": "UP",
      "details": {
        "version": "3.12.4"
      }
    }
  }
}

// Unhealthy RabbitMQ connection
{
  "status": "DOWN",
  "components": {
    "rabbit": {
      "status": "DOWN",
      "details": {
        "error": "org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused"
      }
    }
  }
}

Health Configuration:

management:
  health:
    rabbit:
      enabled: true
  endpoint:
    health:
      show-details: always
  endpoints:
    web:
      exposure:
        include: health

Metrics Collection

Auto-configured metrics collection for RabbitMQ connection factories using Micrometer, providing detailed operational metrics.

/**
 * Auto-configuration for RabbitMQ metrics
 */
@AutoConfiguration(after = {MetricsAutoConfiguration.class, RabbitAutoConfiguration.class})
@ConditionalOnClass({ConnectionFactory.class, AbstractConnectionFactory.class})
@ConditionalOnBean({org.springframework.amqp.rabbit.connection.ConnectionFactory.class, MeterRegistry.class})
public class RabbitMetricsAutoConfiguration {
    
    /** Creates metrics post-processor for connection factories */
    @Bean
    public static RabbitConnectionFactoryMetricsPostProcessor rabbitConnectionFactoryMetricsPostProcessor(
        ApplicationContext applicationContext);
}

/**
 * Post-processor that adds metrics to connection factories
 */
public class RabbitConnectionFactoryMetricsPostProcessor implements BeanPostProcessor {
    
    /** Add metrics to connection factory after initialization */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}

Available Metrics:

/**
 * RabbitMQ connection metrics available through Micrometer
 */
// Connection metrics
"rabbitmq.connections" - Gauge of active connections
"rabbitmq.connections.opened" - Counter of opened connections  
"rabbitmq.connections.closed" - Counter of closed connections

// Channel metrics
"rabbitmq.channels" - Gauge of active channels
"rabbitmq.channels.opened" - Counter of opened channels
"rabbitmq.channels.closed" - Counter of closed channels

// Message metrics (when available)
"rabbitmq.published" - Counter of published messages
"rabbitmq.published.confirmed" - Counter of confirmed publishes
"rabbitmq.published.returned" - Counter of returned messages
"rabbitmq.consumed" - Counter of consumed messages
"rabbitmq.acknowledged" - Counter of acknowledged messages
"rabbitmq.rejected" - Counter of rejected messages

// Connection pool metrics (for caching connection factory)
"rabbitmq.connection.pool.size" - Current pool size
"rabbitmq.connection.pool.active" - Active connections in pool
"rabbitmq.connection.pool.idle" - Idle connections in pool

Metrics Configuration:

management:
  metrics:
    enable:
      rabbitmq: true
  endpoints:
    web:
      exposure:
        include: metrics, prometheus

Custom Health Checks

Extending health checks with custom RabbitMQ health indicators for specific business requirements.

/**
 * Custom health indicator example
 */
@Component
public class CustomRabbitHealthIndicator implements HealthIndicator {
    
    private final RabbitTemplate rabbitTemplate;
    private final AmqpAdmin amqpAdmin;
    
    public CustomRabbitHealthIndicator(RabbitTemplate rabbitTemplate, AmqpAdmin amqpAdmin) {
        this.rabbitTemplate = rabbitTemplate;
        this.amqpAdmin = amqpAdmin;
    }
    
    @Override
    public Health health() {
        try {
            // Test connection by checking queue
            Properties queueProperties = amqpAdmin.getQueueProperties("test.queue");
            
            // Test message operations
            rabbitTemplate.convertAndSend("health.check", "ping");
            
            return Health.up()
                .withDetail("connection", "OK")
                .withDetail("queues", queueProperties != null ? "accessible" : "limited")
                .withDetail("messaging", "operational")
                .build();
                
        } catch (Exception e) {
            return Health.down()
                .withDetail("error", e.getMessage())
                .withException(e)
                .build();
        }
    }
}

Custom Metrics

Adding custom metrics for application-specific RabbitMQ monitoring requirements.

/**
 * Custom metrics configuration
 */
@Component
public class CustomRabbitMetrics {
    
    private final Counter messagesProcessed;
    private final Timer processingTime;
    private final Gauge queueDepth;
    
    public CustomRabbitMetrics(MeterRegistry meterRegistry, AmqpAdmin amqpAdmin) {
        this.messagesProcessed = Counter.builder("rabbitmq.messages.processed")
            .description("Number of messages processed successfully")
            .tag("application", "myapp")
            .register(meterRegistry);
            
        this.processingTime = Timer.builder("rabbitmq.message.processing.time")
            .description("Time taken to process messages")
            .register(meterRegistry);
            
        this.queueDepth = Gauge.builder("rabbitmq.queue.depth")
            .description("Number of messages in specific queue")
            .tag("queue", "important.queue")
            .register(meterRegistry, this, metrics -> getQueueDepth(amqpAdmin, "important.queue"));
    }
    
    /** Record message processing */
    public void recordMessageProcessed() {
        messagesProcessed.increment();
    }
    
    /** Record processing time */
    public Timer.Sample startProcessingTimer() {
        return Timer.start(processingTime);
    }
    
    private double getQueueDepth(AmqpAdmin admin, String queueName) {
        try {
            Properties props = admin.getQueueProperties(queueName);
            return props != null ? (Integer) props.get("QUEUE_MESSAGE_COUNT") : 0;
        } catch (Exception e) {
            return -1; // Error state
        }
    }
}

Usage in Message Handlers:

@Component
public class MetricEnabledMessageHandler {
    
    private final CustomRabbitMetrics metrics;
    
    public MetricEnabledMessageHandler(CustomRabbitMetrics metrics) {
        this.metrics = metrics;
    }
    
    @RabbitListener(queues = "monitored.queue")
    public void handleMessage(String message) {
        Timer.Sample timer = metrics.startProcessingTimer();
        
        try {
            // Process message
            processMessage(message);
            
            // Record success
            metrics.recordMessageProcessed();
            
        } finally {
            timer.stop();
        }
    }
}

Management Endpoints

Additional management endpoints for RabbitMQ monitoring and administration.

/**
 * Custom management endpoint for RabbitMQ information
 */
@Component
@Endpoint(id = "rabbitmq")
public class RabbitManagementEndpoint {
    
    private final CachingConnectionFactory connectionFactory;
    private final AmqpAdmin amqpAdmin;
    
    /** Get connection information */
    @ReadOperation
    public Map<String, Object> connectionInfo() {
        Map<String, Object> info = new HashMap<>();
        info.put("host", connectionFactory.getHost());
        info.put("port", connectionFactory.getPort());
        info.put("virtualHost", connectionFactory.getVirtualHost());
        info.put("cacheMode", connectionFactory.getCacheMode());
        info.put("channelCacheSize", connectionFactory.getChannelCacheSize());
        return info;
    }
    
    /** Get queue information */
    @ReadOperation
    public Map<String, Object> queueInfo(@Selector String queueName) {
        try {
            Properties props = amqpAdmin.getQueueProperties(queueName);
            if (props != null) {
                Map<String, Object> queueInfo = new HashMap<>();
                queueInfo.put("name", queueName);
                queueInfo.put("messageCount", props.get("QUEUE_MESSAGE_COUNT"));
                queueInfo.put("consumerCount", props.get("QUEUE_CONSUMER_COUNT"));
                return queueInfo;
            }
            return Map.of("error", "Queue not found");
        } catch (Exception e) {
            return Map.of("error", e.getMessage());
        }
    }
    
    /** Purge a queue */
    @WriteOperation
    public Map<String, Object> purgeQueue(@Selector String queueName) {
        try {
            amqpAdmin.purgeQueue(queueName, false);
            return Map.of("status", "purged", "queue", queueName);
        } catch (Exception e) {
            return Map.of("error", e.getMessage());
        }
    }
}

Management Endpoint Configuration:

management:
  endpoints:
    web:
      exposure:
        include: rabbitmq
  endpoint:
    rabbitmq:
      enabled: true

Endpoint Usage Examples:

# Get connection information
curl http://localhost:8080/actuator/rabbitmq

# Get specific queue information  
curl http://localhost:8080/actuator/rabbitmq/task.queue

# Purge a queue
curl -X POST http://localhost:8080/actuator/rabbitmq/test.queue

Alerting Integration

Integration with monitoring systems for RabbitMQ alerts and notifications.

/**
 * Custom health indicator with alerting
 */
@Component
public class AlertingRabbitHealthIndicator implements HealthIndicator {
    
    private final RabbitTemplate rabbitTemplate;
    private final AlertService alertService;
    private volatile boolean lastCheckFailed = false;
    
    @Override
    public Health health() {
        try {
            // Perform health check
            rabbitTemplate.execute(channel -> {
                channel.basicPublish("", "health.check", null, "ping".getBytes());
                return null;
            });
            
            if (lastCheckFailed) {
                alertService.sendRecoveryAlert("RabbitMQ connection restored");
                lastCheckFailed = false;
            }
            
            return Health.up().build();
            
        } catch (Exception e) {
            if (!lastCheckFailed) {
                alertService.sendAlert("RabbitMQ connection failed: " + e.getMessage());
                lastCheckFailed = true;
            }
            
            return Health.down().withException(e).build();
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-amqp

docs

actuator.md

configuration.md

core-messaging.md

index.md

listeners.md

streams.md

tile.json