0
# Actuator Integration
1
2
Health monitoring and metrics collection for RabbitMQ connections through Spring Boot Actuator endpoints, providing operational visibility into messaging infrastructure health and performance.
3
4
## Capabilities
5
6
### Health Indicator
7
8
Auto-configured health indicator that monitors RabbitMQ connectivity and provides health status through the `/actuator/health` endpoint.
9
10
```java { .api }
11
/**
12
* Auto-configuration for RabbitMQ health indicator
13
*/
14
@AutoConfiguration(after = RabbitAutoConfiguration.class)
15
@ConditionalOnClass(RabbitTemplate.class)
16
@ConditionalOnBean(RabbitTemplate.class)
17
@ConditionalOnEnabledHealthIndicator("rabbit")
18
public class RabbitHealthContributorAutoConfiguration {
19
20
/** Creates health contributor for RabbitMQ */
21
@Bean
22
@ConditionalOnMissingBean(name = {"rabbitHealthIndicator", "rabbitHealthContributor"})
23
public HealthContributor rabbitHealthContributor(ConfigurableListableBeanFactory beanFactory);
24
}
25
26
/**
27
* Health indicator implementation
28
*/
29
public class RabbitHealthIndicator implements HealthIndicator {
30
31
/** Check RabbitMQ health */
32
@Override
33
public Health health();
34
}
35
```
36
37
**Health Response Examples:**
38
39
```json
40
// Healthy RabbitMQ connection
41
{
42
"status": "UP",
43
"components": {
44
"rabbit": {
45
"status": "UP",
46
"details": {
47
"version": "3.12.4"
48
}
49
}
50
}
51
}
52
53
// Unhealthy RabbitMQ connection
54
{
55
"status": "DOWN",
56
"components": {
57
"rabbit": {
58
"status": "DOWN",
59
"details": {
60
"error": "org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused"
61
}
62
}
63
}
64
}
65
```
66
67
**Health Configuration:**
68
69
```yaml
70
management:
71
health:
72
rabbit:
73
enabled: true
74
endpoint:
75
health:
76
show-details: always
77
endpoints:
78
web:
79
exposure:
80
include: health
81
```
82
83
### Metrics Collection
84
85
Auto-configured metrics collection for RabbitMQ connection factories using Micrometer, providing detailed operational metrics.
86
87
```java { .api }
88
/**
89
* Auto-configuration for RabbitMQ metrics
90
*/
91
@AutoConfiguration(after = {MetricsAutoConfiguration.class, RabbitAutoConfiguration.class})
92
@ConditionalOnClass({ConnectionFactory.class, AbstractConnectionFactory.class})
93
@ConditionalOnBean({org.springframework.amqp.rabbit.connection.ConnectionFactory.class, MeterRegistry.class})
94
public class RabbitMetricsAutoConfiguration {
95
96
/** Creates metrics post-processor for connection factories */
97
@Bean
98
public static RabbitConnectionFactoryMetricsPostProcessor rabbitConnectionFactoryMetricsPostProcessor(
99
ApplicationContext applicationContext);
100
}
101
102
/**
103
* Post-processor that adds metrics to connection factories
104
*/
105
public class RabbitConnectionFactoryMetricsPostProcessor implements BeanPostProcessor {
106
107
/** Add metrics to connection factory after initialization */
108
@Override
109
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
110
}
111
```
112
113
**Available Metrics:**
114
115
```java { .api }
116
/**
117
* RabbitMQ connection metrics available through Micrometer
118
*/
119
// Connection metrics
120
"rabbitmq.connections" - Gauge of active connections
121
"rabbitmq.connections.opened" - Counter of opened connections
122
"rabbitmq.connections.closed" - Counter of closed connections
123
124
// Channel metrics
125
"rabbitmq.channels" - Gauge of active channels
126
"rabbitmq.channels.opened" - Counter of opened channels
127
"rabbitmq.channels.closed" - Counter of closed channels
128
129
// Message metrics (when available)
130
"rabbitmq.published" - Counter of published messages
131
"rabbitmq.published.confirmed" - Counter of confirmed publishes
132
"rabbitmq.published.returned" - Counter of returned messages
133
"rabbitmq.consumed" - Counter of consumed messages
134
"rabbitmq.acknowledged" - Counter of acknowledged messages
135
"rabbitmq.rejected" - Counter of rejected messages
136
137
// Connection pool metrics (for caching connection factory)
138
"rabbitmq.connection.pool.size" - Current pool size
139
"rabbitmq.connection.pool.active" - Active connections in pool
140
"rabbitmq.connection.pool.idle" - Idle connections in pool
141
```
142
143
**Metrics Configuration:**
144
145
```yaml
146
management:
147
metrics:
148
enable:
149
rabbitmq: true
150
endpoints:
151
web:
152
exposure:
153
include: metrics, prometheus
154
```
155
156
### Custom Health Checks
157
158
Extending health checks with custom RabbitMQ health indicators for specific business requirements.
159
160
```java { .api }
161
/**
162
* Custom health indicator example
163
*/
164
@Component
165
public class CustomRabbitHealthIndicator implements HealthIndicator {
166
167
private final RabbitTemplate rabbitTemplate;
168
private final AmqpAdmin amqpAdmin;
169
170
public CustomRabbitHealthIndicator(RabbitTemplate rabbitTemplate, AmqpAdmin amqpAdmin) {
171
this.rabbitTemplate = rabbitTemplate;
172
this.amqpAdmin = amqpAdmin;
173
}
174
175
@Override
176
public Health health() {
177
try {
178
// Test connection by checking queue
179
Properties queueProperties = amqpAdmin.getQueueProperties("test.queue");
180
181
// Test message operations
182
rabbitTemplate.convertAndSend("health.check", "ping");
183
184
return Health.up()
185
.withDetail("connection", "OK")
186
.withDetail("queues", queueProperties != null ? "accessible" : "limited")
187
.withDetail("messaging", "operational")
188
.build();
189
190
} catch (Exception e) {
191
return Health.down()
192
.withDetail("error", e.getMessage())
193
.withException(e)
194
.build();
195
}
196
}
197
}
198
```
199
200
### Custom Metrics
201
202
Adding custom metrics for application-specific RabbitMQ monitoring requirements.
203
204
```java { .api }
205
/**
206
* Custom metrics configuration
207
*/
208
@Component
209
public class CustomRabbitMetrics {
210
211
private final Counter messagesProcessed;
212
private final Timer processingTime;
213
private final Gauge queueDepth;
214
215
public CustomRabbitMetrics(MeterRegistry meterRegistry, AmqpAdmin amqpAdmin) {
216
this.messagesProcessed = Counter.builder("rabbitmq.messages.processed")
217
.description("Number of messages processed successfully")
218
.tag("application", "myapp")
219
.register(meterRegistry);
220
221
this.processingTime = Timer.builder("rabbitmq.message.processing.time")
222
.description("Time taken to process messages")
223
.register(meterRegistry);
224
225
this.queueDepth = Gauge.builder("rabbitmq.queue.depth")
226
.description("Number of messages in specific queue")
227
.tag("queue", "important.queue")
228
.register(meterRegistry, this, metrics -> getQueueDepth(amqpAdmin, "important.queue"));
229
}
230
231
/** Record message processing */
232
public void recordMessageProcessed() {
233
messagesProcessed.increment();
234
}
235
236
/** Record processing time */
237
public Timer.Sample startProcessingTimer() {
238
return Timer.start(processingTime);
239
}
240
241
private double getQueueDepth(AmqpAdmin admin, String queueName) {
242
try {
243
Properties props = admin.getQueueProperties(queueName);
244
return props != null ? (Integer) props.get("QUEUE_MESSAGE_COUNT") : 0;
245
} catch (Exception e) {
246
return -1; // Error state
247
}
248
}
249
}
250
```
251
252
**Usage in Message Handlers:**
253
254
```java
255
@Component
256
public class MetricEnabledMessageHandler {
257
258
private final CustomRabbitMetrics metrics;
259
260
public MetricEnabledMessageHandler(CustomRabbitMetrics metrics) {
261
this.metrics = metrics;
262
}
263
264
@RabbitListener(queues = "monitored.queue")
265
public void handleMessage(String message) {
266
Timer.Sample timer = metrics.startProcessingTimer();
267
268
try {
269
// Process message
270
processMessage(message);
271
272
// Record success
273
metrics.recordMessageProcessed();
274
275
} finally {
276
timer.stop();
277
}
278
}
279
}
280
```
281
282
### Management Endpoints
283
284
Additional management endpoints for RabbitMQ monitoring and administration.
285
286
```java { .api }
287
/**
288
* Custom management endpoint for RabbitMQ information
289
*/
290
@Component
291
@Endpoint(id = "rabbitmq")
292
public class RabbitManagementEndpoint {
293
294
private final CachingConnectionFactory connectionFactory;
295
private final AmqpAdmin amqpAdmin;
296
297
/** Get connection information */
298
@ReadOperation
299
public Map<String, Object> connectionInfo() {
300
Map<String, Object> info = new HashMap<>();
301
info.put("host", connectionFactory.getHost());
302
info.put("port", connectionFactory.getPort());
303
info.put("virtualHost", connectionFactory.getVirtualHost());
304
info.put("cacheMode", connectionFactory.getCacheMode());
305
info.put("channelCacheSize", connectionFactory.getChannelCacheSize());
306
return info;
307
}
308
309
/** Get queue information */
310
@ReadOperation
311
public Map<String, Object> queueInfo(@Selector String queueName) {
312
try {
313
Properties props = amqpAdmin.getQueueProperties(queueName);
314
if (props != null) {
315
Map<String, Object> queueInfo = new HashMap<>();
316
queueInfo.put("name", queueName);
317
queueInfo.put("messageCount", props.get("QUEUE_MESSAGE_COUNT"));
318
queueInfo.put("consumerCount", props.get("QUEUE_CONSUMER_COUNT"));
319
return queueInfo;
320
}
321
return Map.of("error", "Queue not found");
322
} catch (Exception e) {
323
return Map.of("error", e.getMessage());
324
}
325
}
326
327
/** Purge a queue */
328
@WriteOperation
329
public Map<String, Object> purgeQueue(@Selector String queueName) {
330
try {
331
amqpAdmin.purgeQueue(queueName, false);
332
return Map.of("status", "purged", "queue", queueName);
333
} catch (Exception e) {
334
return Map.of("error", e.getMessage());
335
}
336
}
337
}
338
```
339
340
**Management Endpoint Configuration:**
341
342
```yaml
343
management:
344
endpoints:
345
web:
346
exposure:
347
include: rabbitmq
348
endpoint:
349
rabbitmq:
350
enabled: true
351
```
352
353
**Endpoint Usage Examples:**
354
355
```bash
356
# Get connection information
357
curl http://localhost:8080/actuator/rabbitmq
358
359
# Get specific queue information
360
curl http://localhost:8080/actuator/rabbitmq/task.queue
361
362
# Purge a queue
363
curl -X POST http://localhost:8080/actuator/rabbitmq/test.queue
364
```
365
366
### Alerting Integration
367
368
Integration with monitoring systems for RabbitMQ alerts and notifications.
369
370
```java { .api }
371
/**
372
* Custom health indicator with alerting
373
*/
374
@Component
375
public class AlertingRabbitHealthIndicator implements HealthIndicator {
376
377
private final RabbitTemplate rabbitTemplate;
378
private final AlertService alertService;
379
private volatile boolean lastCheckFailed = false;
380
381
@Override
382
public Health health() {
383
try {
384
// Perform health check
385
rabbitTemplate.execute(channel -> {
386
channel.basicPublish("", "health.check", null, "ping".getBytes());
387
return null;
388
});
389
390
if (lastCheckFailed) {
391
alertService.sendRecoveryAlert("RabbitMQ connection restored");
392
lastCheckFailed = false;
393
}
394
395
return Health.up().build();
396
397
} catch (Exception e) {
398
if (!lastCheckFailed) {
399
alertService.sendAlert("RabbitMQ connection failed: " + e.getMessage());
400
lastCheckFailed = true;
401
}
402
403
return Health.down().withException(e).build();
404
}
405
}
406
}
407
```