or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actuator.mdconfiguration.mdcore-messaging.mdindex.mdlisteners.mdstreams.md

actuator.mddocs/

0

# Actuator Integration

1

2

Health monitoring and metrics collection for RabbitMQ connections through Spring Boot Actuator endpoints, providing operational visibility into messaging infrastructure health and performance.

3

4

## Capabilities

5

6

### Health Indicator

7

8

Auto-configured health indicator that monitors RabbitMQ connectivity and provides health status through the `/actuator/health` endpoint.

9

10

```java { .api }

11

/**

12

* Auto-configuration for RabbitMQ health indicator

13

*/

14

@AutoConfiguration(after = RabbitAutoConfiguration.class)

15

@ConditionalOnClass(RabbitTemplate.class)

16

@ConditionalOnBean(RabbitTemplate.class)

17

@ConditionalOnEnabledHealthIndicator("rabbit")

18

public class RabbitHealthContributorAutoConfiguration {

19

20

/** Creates health contributor for RabbitMQ */

21

@Bean

22

@ConditionalOnMissingBean(name = {"rabbitHealthIndicator", "rabbitHealthContributor"})

23

public HealthContributor rabbitHealthContributor(ConfigurableListableBeanFactory beanFactory);

24

}

25

26

/**

27

* Health indicator implementation

28

*/

29

public class RabbitHealthIndicator implements HealthIndicator {

30

31

/** Check RabbitMQ health */

32

@Override

33

public Health health();

34

}

35

```

36

37

**Health Response Examples:**

38

39

```json

40

// Healthy RabbitMQ connection

41

{

42

"status": "UP",

43

"components": {

44

"rabbit": {

45

"status": "UP",

46

"details": {

47

"version": "3.12.4"

48

}

49

}

50

}

51

}

52

53

// Unhealthy RabbitMQ connection

54

{

55

"status": "DOWN",

56

"components": {

57

"rabbit": {

58

"status": "DOWN",

59

"details": {

60

"error": "org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused"

61

}

62

}

63

}

64

}

65

```

66

67

**Health Configuration:**

68

69

```yaml

70

management:

71

health:

72

rabbit:

73

enabled: true

74

endpoint:

75

health:

76

show-details: always

77

endpoints:

78

web:

79

exposure:

80

include: health

81

```

82

83

### Metrics Collection

84

85

Auto-configured metrics collection for RabbitMQ connection factories using Micrometer, providing detailed operational metrics.

86

87

```java { .api }

88

/**

89

* Auto-configuration for RabbitMQ metrics

90

*/

91

@AutoConfiguration(after = {MetricsAutoConfiguration.class, RabbitAutoConfiguration.class})

92

@ConditionalOnClass({ConnectionFactory.class, AbstractConnectionFactory.class})

93

@ConditionalOnBean({org.springframework.amqp.rabbit.connection.ConnectionFactory.class, MeterRegistry.class})

94

public class RabbitMetricsAutoConfiguration {

95

96

/** Creates metrics post-processor for connection factories */

97

@Bean

98

public static RabbitConnectionFactoryMetricsPostProcessor rabbitConnectionFactoryMetricsPostProcessor(

99

ApplicationContext applicationContext);

100

}

101

102

/**

103

* Post-processor that adds metrics to connection factories

104

*/

105

public class RabbitConnectionFactoryMetricsPostProcessor implements BeanPostProcessor {

106

107

/** Add metrics to connection factory after initialization */

108

@Override

109

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;

110

}

111

```

112

113

**Available Metrics:**

114

115

```java { .api }

116

/**

117

* RabbitMQ connection metrics available through Micrometer

118

*/

119

// Connection metrics

120

"rabbitmq.connections" - Gauge of active connections

121

"rabbitmq.connections.opened" - Counter of opened connections

122

"rabbitmq.connections.closed" - Counter of closed connections

123

124

// Channel metrics

125

"rabbitmq.channels" - Gauge of active channels

126

"rabbitmq.channels.opened" - Counter of opened channels

127

"rabbitmq.channels.closed" - Counter of closed channels

128

129

// Message metrics (when available)

130

"rabbitmq.published" - Counter of published messages

131

"rabbitmq.published.confirmed" - Counter of confirmed publishes

132

"rabbitmq.published.returned" - Counter of returned messages

133

"rabbitmq.consumed" - Counter of consumed messages

134

"rabbitmq.acknowledged" - Counter of acknowledged messages

135

"rabbitmq.rejected" - Counter of rejected messages

136

137

// Connection pool metrics (for caching connection factory)

138

"rabbitmq.connection.pool.size" - Current pool size

139

"rabbitmq.connection.pool.active" - Active connections in pool

140

"rabbitmq.connection.pool.idle" - Idle connections in pool

141

```

142

143

**Metrics Configuration:**

144

145

```yaml

146

management:

147

metrics:

148

enable:

149

rabbitmq: true

150

endpoints:

151

web:

152

exposure:

153

include: metrics, prometheus

154

```

155

156

### Custom Health Checks

157

158

Extending health checks with custom RabbitMQ health indicators for specific business requirements.

159

160

```java { .api }

161

/**

162

* Custom health indicator example

163

*/

164

@Component

165

public class CustomRabbitHealthIndicator implements HealthIndicator {

166

167

private final RabbitTemplate rabbitTemplate;

168

private final AmqpAdmin amqpAdmin;

169

170

public CustomRabbitHealthIndicator(RabbitTemplate rabbitTemplate, AmqpAdmin amqpAdmin) {

171

this.rabbitTemplate = rabbitTemplate;

172

this.amqpAdmin = amqpAdmin;

173

}

174

175

@Override

176

public Health health() {

177

try {

178

// Test connection by checking queue

179

Properties queueProperties = amqpAdmin.getQueueProperties("test.queue");

180

181

// Test message operations

182

rabbitTemplate.convertAndSend("health.check", "ping");

183

184

return Health.up()

185

.withDetail("connection", "OK")

186

.withDetail("queues", queueProperties != null ? "accessible" : "limited")

187

.withDetail("messaging", "operational")

188

.build();

189

190

} catch (Exception e) {

191

return Health.down()

192

.withDetail("error", e.getMessage())

193

.withException(e)

194

.build();

195

}

196

}

197

}

198

```

199

200

### Custom Metrics

201

202

Adding custom metrics for application-specific RabbitMQ monitoring requirements.

203

204

```java { .api }

205

/**

206

* Custom metrics configuration

207

*/

208

@Component

209

public class CustomRabbitMetrics {

210

211

private final Counter messagesProcessed;

212

private final Timer processingTime;

213

private final Gauge queueDepth;

214

215

public CustomRabbitMetrics(MeterRegistry meterRegistry, AmqpAdmin amqpAdmin) {

216

this.messagesProcessed = Counter.builder("rabbitmq.messages.processed")

217

.description("Number of messages processed successfully")

218

.tag("application", "myapp")

219

.register(meterRegistry);

220

221

this.processingTime = Timer.builder("rabbitmq.message.processing.time")

222

.description("Time taken to process messages")

223

.register(meterRegistry);

224

225

this.queueDepth = Gauge.builder("rabbitmq.queue.depth")

226

.description("Number of messages in specific queue")

227

.tag("queue", "important.queue")

228

.register(meterRegistry, this, metrics -> getQueueDepth(amqpAdmin, "important.queue"));

229

}

230

231

/** Record message processing */

232

public void recordMessageProcessed() {

233

messagesProcessed.increment();

234

}

235

236

/** Record processing time */

237

public Timer.Sample startProcessingTimer() {

238

return Timer.start(processingTime);

239

}

240

241

private double getQueueDepth(AmqpAdmin admin, String queueName) {

242

try {

243

Properties props = admin.getQueueProperties(queueName);

244

return props != null ? (Integer) props.get("QUEUE_MESSAGE_COUNT") : 0;

245

} catch (Exception e) {

246

return -1; // Error state

247

}

248

}

249

}

250

```

251

252

**Usage in Message Handlers:**

253

254

```java

255

@Component

256

public class MetricEnabledMessageHandler {

257

258

private final CustomRabbitMetrics metrics;

259

260

public MetricEnabledMessageHandler(CustomRabbitMetrics metrics) {

261

this.metrics = metrics;

262

}

263

264

@RabbitListener(queues = "monitored.queue")

265

public void handleMessage(String message) {

266

Timer.Sample timer = metrics.startProcessingTimer();

267

268

try {

269

// Process message

270

processMessage(message);

271

272

// Record success

273

metrics.recordMessageProcessed();

274

275

} finally {

276

timer.stop();

277

}

278

}

279

}

280

```

281

282

### Management Endpoints

283

284

Additional management endpoints for RabbitMQ monitoring and administration.

285

286

```java { .api }

287

/**

288

* Custom management endpoint for RabbitMQ information

289

*/

290

@Component

291

@Endpoint(id = "rabbitmq")

292

public class RabbitManagementEndpoint {

293

294

private final CachingConnectionFactory connectionFactory;

295

private final AmqpAdmin amqpAdmin;

296

297

/** Get connection information */

298

@ReadOperation

299

public Map<String, Object> connectionInfo() {

300

Map<String, Object> info = new HashMap<>();

301

info.put("host", connectionFactory.getHost());

302

info.put("port", connectionFactory.getPort());

303

info.put("virtualHost", connectionFactory.getVirtualHost());

304

info.put("cacheMode", connectionFactory.getCacheMode());

305

info.put("channelCacheSize", connectionFactory.getChannelCacheSize());

306

return info;

307

}

308

309

/** Get queue information */

310

@ReadOperation

311

public Map<String, Object> queueInfo(@Selector String queueName) {

312

try {

313

Properties props = amqpAdmin.getQueueProperties(queueName);

314

if (props != null) {

315

Map<String, Object> queueInfo = new HashMap<>();

316

queueInfo.put("name", queueName);

317

queueInfo.put("messageCount", props.get("QUEUE_MESSAGE_COUNT"));

318

queueInfo.put("consumerCount", props.get("QUEUE_CONSUMER_COUNT"));

319

return queueInfo;

320

}

321

return Map.of("error", "Queue not found");

322

} catch (Exception e) {

323

return Map.of("error", e.getMessage());

324

}

325

}

326

327

/** Purge a queue */

328

@WriteOperation

329

public Map<String, Object> purgeQueue(@Selector String queueName) {

330

try {

331

amqpAdmin.purgeQueue(queueName, false);

332

return Map.of("status", "purged", "queue", queueName);

333

} catch (Exception e) {

334

return Map.of("error", e.getMessage());

335

}

336

}

337

}

338

```

339

340

**Management Endpoint Configuration:**

341

342

```yaml

343

management:

344

endpoints:

345

web:

346

exposure:

347

include: rabbitmq

348

endpoint:

349

rabbitmq:

350

enabled: true

351

```

352

353

**Endpoint Usage Examples:**

354

355

```bash

356

# Get connection information

357

curl http://localhost:8080/actuator/rabbitmq

358

359

# Get specific queue information

360

curl http://localhost:8080/actuator/rabbitmq/task.queue

361

362

# Purge a queue

363

curl -X POST http://localhost:8080/actuator/rabbitmq/test.queue

364

```

365

366

### Alerting Integration

367

368

Integration with monitoring systems for RabbitMQ alerts and notifications.

369

370

```java { .api }

371

/**

372

* Custom health indicator with alerting

373

*/

374

@Component

375

public class AlertingRabbitHealthIndicator implements HealthIndicator {

376

377

private final RabbitTemplate rabbitTemplate;

378

private final AlertService alertService;

379

private volatile boolean lastCheckFailed = false;

380

381

@Override

382

public Health health() {

383

try {

384

// Perform health check

385

rabbitTemplate.execute(channel -> {

386

channel.basicPublish("", "health.check", null, "ping".getBytes());

387

return null;

388

});

389

390

if (lastCheckFailed) {

391

alertService.sendRecoveryAlert("RabbitMQ connection restored");

392

lastCheckFailed = false;

393

}

394

395

return Health.up().build();

396

397

} catch (Exception e) {

398

if (!lastCheckFailed) {

399

alertService.sendAlert("RabbitMQ connection failed: " + e.getMessage());

400

lastCheckFailed = true;

401

}

402

403

return Health.down().withException(e).build();

404

}

405

}

406

}

407

```