or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconfirms-returns.mdconnection-channel.mdconsumer-api.mdconsuming.mderror-recovery.mdindex.mdobservability.mdpublishing.mdrpc.md

observability.mddocs/

0

# Observability and Metrics

1

2

Interfaces for collecting metrics and integrating with observability systems. The RabbitMQ Java client provides comprehensive metrics collection capabilities and distributed tracing integration.

3

4

## Capabilities

5

6

### Metrics Collection

7

8

Interface for collecting operational metrics from the RabbitMQ client.

9

10

```java { .api }

11

/**

12

* Interface to gather execution data of the client.

13

* Note transactions are not supported: they deal with

14

* publishing and acknowledgments and the collector contract

15

* assumes then that published messages and acks sent

16

* in a transaction are always counted, even if the

17

* transaction is rolled back.

18

*/

19

public interface MetricsCollector {

20

21

// Connection metrics

22

/**

23

* Called when a new connection is created

24

* @param connection - The new connection

25

*/

26

void newConnection(Connection connection);

27

28

/**

29

* Called when a connection is closed

30

* @param connection - The closed connection

31

*/

32

void closeConnection(Connection connection);

33

34

// Channel metrics

35

/**

36

* Called when a new channel is created

37

* @param channel - The new channel

38

*/

39

void newChannel(Channel channel);

40

41

/**

42

* Called when a channel is closed

43

* @param channel - The closed channel

44

*/

45

void closeChannel(Channel channel);

46

47

// Publishing metrics

48

/**

49

* Called when a message is published

50

* @param channel - Channel used for publishing

51

*/

52

void basicPublish(Channel channel);

53

54

/**

55

* Called when a message publishing fails (default method)

56

* @param channel - Channel used for publishing

57

* @param cause - Exception that caused failure

58

*/

59

default void basicPublishFailure(Channel channel, Throwable cause) {

60

// Default no-op implementation

61

}

62

63

/**

64

* Called when a publisher confirm (ack) is received (default method)

65

* @param channel - Channel that received confirm

66

* @param deliveryTag - Delivery tag of confirmed message

67

* @param multiple - Whether multiple messages were confirmed

68

*/

69

default void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {

70

// Default no-op implementation

71

}

72

73

/**

74

* Called when a publisher nack is received (default method)

75

* @param channel - Channel that received nack

76

* @param deliveryTag - Delivery tag of nacked message

77

* @param multiple - Whether multiple messages were nacked

78

*/

79

default void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {

80

// Default no-op implementation

81

}

82

83

/**

84

* Called when a published message is returned as unroutable (default method)

85

* @param channel - Channel that published the message

86

*/

87

default void basicPublishUnrouted(Channel channel) {

88

// Default no-op implementation

89

}

90

91

// Message consumption metrics

92

/**

93

* Called when a message is consumed (delivered to consumer)

94

* @param channel - Channel used for consuming

95

* @param deliveryTag - Delivery tag of consumed message

96

* @param autoAck - Whether auto-ack is enabled

97

*/

98

void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);

99

100

/**

101

* Called when a message is consumed with consumer tag

102

* @param channel - Channel used for consuming

103

* @param deliveryTag - Delivery tag of consumed message

104

* @param consumerTag - Consumer tag

105

*/

106

void consumedMessage(Channel channel, long deliveryTag, String consumerTag);

107

108

// Message acknowledgment metrics

109

/**

110

* Called when a message is acknowledged

111

* @param channel - Channel used for ack

112

* @param deliveryTag - Delivery tag of acknowledged message

113

* @param multiple - Whether multiple messages were acknowledged

114

*/

115

void basicAck(Channel channel, long deliveryTag, boolean multiple);

116

117

/**

118

* Called when a message is negatively acknowledged

119

* @param channel - Channel used for nack

120

* @param deliveryTag - Delivery tag of nacked message

121

*/

122

void basicNack(Channel channel, long deliveryTag);

123

124

/**

125

* Called when a message is negatively acknowledged with requeue option (default method)

126

* @param channel - Channel used for nack

127

* @param deliveryTag - Delivery tag of nacked message

128

* @param requeue - Whether to requeue the message

129

*/

130

default void basicNack(Channel channel, long deliveryTag, boolean requeue) {

131

this.basicNack(channel, deliveryTag);

132

}

133

134

/**

135

* Called when a message is rejected

136

* @param channel - Channel used for reject

137

* @param deliveryTag - Delivery tag of rejected message

138

*/

139

void basicReject(Channel channel, long deliveryTag);

140

141

/**

142

* Called when a message is rejected with requeue option (default method)

143

* @param channel - Channel used for reject

144

* @param deliveryTag - Delivery tag of rejected message

145

* @param requeue - Whether to requeue the message

146

*/

147

default void basicReject(Channel channel, long deliveryTag, boolean requeue) {

148

this.basicReject(channel, deliveryTag);

149

}

150

151

// Consumer lifecycle metrics

152

/**

153

* Called when a consumer is created

154

* @param channel - Channel used for consuming

155

* @param consumerTag - Consumer tag

156

* @param autoAck - Whether auto-ack is enabled

157

*/

158

void basicConsume(Channel channel, String consumerTag, boolean autoAck);

159

160

/**

161

* Called when a consumer is cancelled

162

* @param channel - Channel used for consuming

163

* @param consumerTag - Consumer tag that was cancelled

164

*/

165

void basicCancel(Channel channel, String consumerTag);

166

}

167

```

168

169

### No-Op Metrics Collector

170

171

Default implementation that performs no operations - useful for disabling metrics collection.

172

173

```java { .api }

174

/**

175

* No-operation metrics collector that discards all metrics

176

*/

177

public class NoOpMetricsCollector implements MetricsCollector {

178

179

/**

180

* Singleton instance of the no-op collector

181

*/

182

public static final NoOpMetricsCollector INSTANCE = new NoOpMetricsCollector();

183

184

// All methods are no-op implementations

185

@Override public void newConnection(Connection connection) {}

186

@Override public void closeConnection(Connection connection) {}

187

@Override public void newChannel(Channel channel) {}

188

@Override public void closeChannel(Channel channel) {}

189

@Override public void basicPublish(Channel channel) {}

190

@Override public void basicConsume(Channel channel, String queue, boolean autoAck) {}

191

@Override public void basicCancel(Channel channel, String consumerTag) {}

192

@Override public void basicAck(Channel channel, long deliveryTag, boolean multiple) {}

193

@Override public void basicNack(Channel channel, long deliveryTag) {}

194

@Override public void basicReject(Channel channel, long deliveryTag) {}

195

@Override public void basicGet(Channel channel, String queue, boolean messageRetrieved) {}

196

@Override public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {}

197

@Override public void consumedMessage(Channel channel, long deliveryTag, boolean multiple) {}

198

@Override public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {}

199

@Override public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {}

200

@Override public void basicPublishUnrouted(Channel channel) {}

201

}

202

```

203

204

### Observation Collection

205

206

Interface for collecting observations and telemetry data with more advanced features.

207

208

```java { .api }

209

/**

210

* Interface for collecting observations and telemetry from RabbitMQ operations

211

*/

212

public interface ObservationCollector {

213

214

/**

215

* Create observation for a publish operation

216

* @param exchange - Exchange name

217

* @param routingKey - Routing key

218

* @return Observation context for the publish operation

219

*/

220

Observation.Context newPublishObservation(String exchange, String routingKey);

221

222

/**

223

* Create observation for a consume operation

224

* @param queue - Queue name

225

* @return Observation context for the consume operation

226

*/

227

Observation.Context newConsumeObservation(String queue);

228

229

/**

230

* Start an observation

231

* @param context - Observation context

232

* @return Started observation

233

*/

234

Observation start(Observation.Context context);

235

236

/**

237

* Stop an observation

238

* @param observation - Observation to stop

239

*/

240

void stop(Observation observation);

241

242

/**

243

* Record an error in an observation

244

* @param observation - Observation to record error for

245

* @param error - Error that occurred

246

*/

247

void error(Observation observation, Throwable error);

248

}

249

```

250

251

**Usage Examples:**

252

253

```java

254

// Configure metrics collection on ConnectionFactory

255

ConnectionFactory factory = new ConnectionFactory();

256

factory.setHost("localhost");

257

258

// Custom metrics collector implementation

259

MetricsCollector metricsCollector = new CustomMetricsCollector();

260

factory.setMetricsCollector(metricsCollector);

261

262

Connection connection = factory.newConnection();

263

Channel channel = connection.createChannel();

264

265

// All operations will now be tracked by the metrics collector

266

channel.basicPublish("", "queue", null, "message".getBytes());

267

```

268

269

```java

270

// Using no-op collector to disable metrics

271

ConnectionFactory factory = new ConnectionFactory();

272

factory.setMetricsCollector(NoOpMetricsCollector.INSTANCE);

273

```

274

275

```java

276

// Example custom metrics collector implementation

277

public class CustomMetricsCollector implements MetricsCollector {

278

private final Counter connectionsCreated = Counter.builder("rabbitmq.connections.created").register();

279

private final Counter messagesPublished = Counter.builder("rabbitmq.messages.published").register();

280

private final Timer publishTimer = Timer.builder("rabbitmq.publish.duration").register();

281

282

@Override

283

public void newConnection(Connection connection) {

284

connectionsCreated.increment();

285

System.out.println("New connection created: " + connection.getId());

286

}

287

288

@Override

289

public void basicPublish(Channel channel) {

290

messagesPublished.increment();

291

// Record publish timing, etc.

292

}

293

294

// Implement other methods as needed...

295

}

296

```

297

298

## Types

299

300

### Traffic Listener

301

302

Interface for monitoring network traffic for debugging and analysis purposes.

303

304

```java { .api }

305

/**

306

* Interface for listening to network traffic

307

*/

308

public interface TrafficListener {

309

310

/**

311

* Called when data is read from the network

312

* @param data - Data that was read

313

*/

314

void read(byte[] data);

315

316

/**

317

* Called when data is written to the network

318

* @param data - Data that was written

319

*/

320

void write(byte[] data);

321

}

322

```

323

324

### Exception Handler

325

326

Interface for customizing how the client handles various types of exceptions.

327

328

```java { .api }

329

/**

330

* Interface for handling exceptions that occur in consumers and connections

331

*/

332

public interface ExceptionHandler {

333

334

/**

335

* Handle unexpected connection driver exceptions

336

* @param conn - Connection where exception occurred

337

* @param exception - The exception

338

*/

339

void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception);

340

341

/**

342

* Handle exceptions in return listeners

343

* @param channel - Channel where exception occurred

344

* @param exception - The exception

345

*/

346

void handleReturnListenerException(Channel channel, Throwable exception);

347

348

/**

349

* Handle exceptions in flow listeners

350

* @param channel - Channel where exception occurred

351

* @param exception - The exception

352

*/

353

void handleFlowListenerException(Channel channel, Throwable exception);

354

355

/**

356

* Handle exceptions in confirm listeners

357

* @param channel - Channel where exception occurred

358

* @param exception - The exception

359

*/

360

void handleConfirmListenerException(Channel channel, Throwable exception);

361

362

/**

363

* Handle exceptions in blocked listeners

364

* @param connection - Connection where exception occurred

365

* @param exception - The exception

366

*/

367

void handleBlockedListenerException(Connection connection, Throwable exception);

368

369

/**

370

* Handle exceptions in consumers

371

* @param channel - Channel where exception occurred

372

* @param exception - The exception

373

* @param consumer - Consumer that caused the exception

374

* @param consumerTag - Consumer tag

375

* @param methodName - Method where exception occurred

376

*/

377

void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName);

378

379

/**

380

* Handle connection recovery exceptions

381

* @param conn - Connection being recovered

382

* @param exception - The exception

383

*/

384

void handleConnectionRecoveryException(Connection conn, Throwable exception);

385

386

/**

387

* Handle channel recovery exceptions

388

* @param ch - Channel being recovered

389

* @param exception - The exception

390

*/

391

void handleChannelRecoveryException(Channel ch, Throwable exception);

392

393

/**

394

* Handle topology recovery exceptions

395

* @param conn - Connection where topology recovery failed

396

* @param ch - Channel where topology recovery failed

397

* @param exception - The exception

398

*/

399

void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception);

400

}

401

```