0
# Observability and Metrics
1
2
Interfaces for collecting metrics and integrating with observability systems. The RabbitMQ Java client provides comprehensive metrics collection capabilities and distributed tracing integration.
3
4
## Capabilities
5
6
### Metrics Collection
7
8
Interface for collecting operational metrics from the RabbitMQ client.
9
10
```java { .api }
11
/**
12
* Interface to gather execution data of the client.
13
* Note transactions are not supported: they deal with
14
* publishing and acknowledgments and the collector contract
15
* assumes then that published messages and acks sent
16
* in a transaction are always counted, even if the
17
* transaction is rolled back.
18
*/
19
public interface MetricsCollector {
20
21
// Connection metrics
22
/**
23
* Called when a new connection is created
24
* @param connection - The new connection
25
*/
26
void newConnection(Connection connection);
27
28
/**
29
* Called when a connection is closed
30
* @param connection - The closed connection
31
*/
32
void closeConnection(Connection connection);
33
34
// Channel metrics
35
/**
36
* Called when a new channel is created
37
* @param channel - The new channel
38
*/
39
void newChannel(Channel channel);
40
41
/**
42
* Called when a channel is closed
43
* @param channel - The closed channel
44
*/
45
void closeChannel(Channel channel);
46
47
// Publishing metrics
48
/**
49
* Called when a message is published
50
* @param channel - Channel used for publishing
51
*/
52
void basicPublish(Channel channel);
53
54
/**
55
* Called when a message publishing fails (default method)
56
* @param channel - Channel used for publishing
57
* @param cause - Exception that caused failure
58
*/
59
default void basicPublishFailure(Channel channel, Throwable cause) {
60
// Default no-op implementation
61
}
62
63
/**
64
* Called when a publisher confirm (ack) is received (default method)
65
* @param channel - Channel that received confirm
66
* @param deliveryTag - Delivery tag of confirmed message
67
* @param multiple - Whether multiple messages were confirmed
68
*/
69
default void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
70
// Default no-op implementation
71
}
72
73
/**
74
* Called when a publisher nack is received (default method)
75
* @param channel - Channel that received nack
76
* @param deliveryTag - Delivery tag of nacked message
77
* @param multiple - Whether multiple messages were nacked
78
*/
79
default void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
80
// Default no-op implementation
81
}
82
83
/**
84
* Called when a published message is returned as unroutable (default method)
85
* @param channel - Channel that published the message
86
*/
87
default void basicPublishUnrouted(Channel channel) {
88
// Default no-op implementation
89
}
90
91
// Message consumption metrics
92
/**
93
* Called when a message is consumed (delivered to consumer)
94
* @param channel - Channel used for consuming
95
* @param deliveryTag - Delivery tag of consumed message
96
* @param autoAck - Whether auto-ack is enabled
97
*/
98
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
99
100
/**
101
* Called when a message is consumed with consumer tag
102
* @param channel - Channel used for consuming
103
* @param deliveryTag - Delivery tag of consumed message
104
* @param consumerTag - Consumer tag
105
*/
106
void consumedMessage(Channel channel, long deliveryTag, String consumerTag);
107
108
// Message acknowledgment metrics
109
/**
110
* Called when a message is acknowledged
111
* @param channel - Channel used for ack
112
* @param deliveryTag - Delivery tag of acknowledged message
113
* @param multiple - Whether multiple messages were acknowledged
114
*/
115
void basicAck(Channel channel, long deliveryTag, boolean multiple);
116
117
/**
118
* Called when a message is negatively acknowledged
119
* @param channel - Channel used for nack
120
* @param deliveryTag - Delivery tag of nacked message
121
*/
122
void basicNack(Channel channel, long deliveryTag);
123
124
/**
125
* Called when a message is negatively acknowledged with requeue option (default method)
126
* @param channel - Channel used for nack
127
* @param deliveryTag - Delivery tag of nacked message
128
* @param requeue - Whether to requeue the message
129
*/
130
default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
131
this.basicNack(channel, deliveryTag);
132
}
133
134
/**
135
* Called when a message is rejected
136
* @param channel - Channel used for reject
137
* @param deliveryTag - Delivery tag of rejected message
138
*/
139
void basicReject(Channel channel, long deliveryTag);
140
141
/**
142
* Called when a message is rejected with requeue option (default method)
143
* @param channel - Channel used for reject
144
* @param deliveryTag - Delivery tag of rejected message
145
* @param requeue - Whether to requeue the message
146
*/
147
default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
148
this.basicReject(channel, deliveryTag);
149
}
150
151
// Consumer lifecycle metrics
152
/**
153
* Called when a consumer is created
154
* @param channel - Channel used for consuming
155
* @param consumerTag - Consumer tag
156
* @param autoAck - Whether auto-ack is enabled
157
*/
158
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
159
160
/**
161
* Called when a consumer is cancelled
162
* @param channel - Channel used for consuming
163
* @param consumerTag - Consumer tag that was cancelled
164
*/
165
void basicCancel(Channel channel, String consumerTag);
166
}
167
```
168
169
### No-Op Metrics Collector
170
171
Default implementation that performs no operations - useful for disabling metrics collection.
172
173
```java { .api }
174
/**
175
* No-operation metrics collector that discards all metrics
176
*/
177
public class NoOpMetricsCollector implements MetricsCollector {
178
179
/**
180
* Singleton instance of the no-op collector
181
*/
182
public static final NoOpMetricsCollector INSTANCE = new NoOpMetricsCollector();
183
184
// All methods are no-op implementations
185
@Override public void newConnection(Connection connection) {}
186
@Override public void closeConnection(Connection connection) {}
187
@Override public void newChannel(Channel channel) {}
188
@Override public void closeChannel(Channel channel) {}
189
@Override public void basicPublish(Channel channel) {}
190
@Override public void basicConsume(Channel channel, String queue, boolean autoAck) {}
191
@Override public void basicCancel(Channel channel, String consumerTag) {}
192
@Override public void basicAck(Channel channel, long deliveryTag, boolean multiple) {}
193
@Override public void basicNack(Channel channel, long deliveryTag) {}
194
@Override public void basicReject(Channel channel, long deliveryTag) {}
195
@Override public void basicGet(Channel channel, String queue, boolean messageRetrieved) {}
196
@Override public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {}
197
@Override public void consumedMessage(Channel channel, long deliveryTag, boolean multiple) {}
198
@Override public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {}
199
@Override public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {}
200
@Override public void basicPublishUnrouted(Channel channel) {}
201
}
202
```
203
204
### Observation Collection
205
206
Interface for collecting observations and telemetry data with more advanced features.
207
208
```java { .api }
209
/**
210
* Interface for collecting observations and telemetry from RabbitMQ operations
211
*/
212
public interface ObservationCollector {
213
214
/**
215
* Create observation for a publish operation
216
* @param exchange - Exchange name
217
* @param routingKey - Routing key
218
* @return Observation context for the publish operation
219
*/
220
Observation.Context newPublishObservation(String exchange, String routingKey);
221
222
/**
223
* Create observation for a consume operation
224
* @param queue - Queue name
225
* @return Observation context for the consume operation
226
*/
227
Observation.Context newConsumeObservation(String queue);
228
229
/**
230
* Start an observation
231
* @param context - Observation context
232
* @return Started observation
233
*/
234
Observation start(Observation.Context context);
235
236
/**
237
* Stop an observation
238
* @param observation - Observation to stop
239
*/
240
void stop(Observation observation);
241
242
/**
243
* Record an error in an observation
244
* @param observation - Observation to record error for
245
* @param error - Error that occurred
246
*/
247
void error(Observation observation, Throwable error);
248
}
249
```
250
251
**Usage Examples:**
252
253
```java
254
// Configure metrics collection on ConnectionFactory
255
ConnectionFactory factory = new ConnectionFactory();
256
factory.setHost("localhost");
257
258
// Custom metrics collector implementation
259
MetricsCollector metricsCollector = new CustomMetricsCollector();
260
factory.setMetricsCollector(metricsCollector);
261
262
Connection connection = factory.newConnection();
263
Channel channel = connection.createChannel();
264
265
// All operations will now be tracked by the metrics collector
266
channel.basicPublish("", "queue", null, "message".getBytes());
267
```
268
269
```java
270
// Using no-op collector to disable metrics
271
ConnectionFactory factory = new ConnectionFactory();
272
factory.setMetricsCollector(NoOpMetricsCollector.INSTANCE);
273
```
274
275
```java
276
// Example custom metrics collector implementation
277
public class CustomMetricsCollector implements MetricsCollector {
278
private final Counter connectionsCreated = Counter.builder("rabbitmq.connections.created").register();
279
private final Counter messagesPublished = Counter.builder("rabbitmq.messages.published").register();
280
private final Timer publishTimer = Timer.builder("rabbitmq.publish.duration").register();
281
282
@Override
283
public void newConnection(Connection connection) {
284
connectionsCreated.increment();
285
System.out.println("New connection created: " + connection.getId());
286
}
287
288
@Override
289
public void basicPublish(Channel channel) {
290
messagesPublished.increment();
291
// Record publish timing, etc.
292
}
293
294
// Implement other methods as needed...
295
}
296
```
297
298
## Types
299
300
### Traffic Listener
301
302
Interface for monitoring network traffic for debugging and analysis purposes.
303
304
```java { .api }
305
/**
306
* Interface for listening to network traffic
307
*/
308
public interface TrafficListener {
309
310
/**
311
* Called when data is read from the network
312
* @param data - Data that was read
313
*/
314
void read(byte[] data);
315
316
/**
317
* Called when data is written to the network
318
* @param data - Data that was written
319
*/
320
void write(byte[] data);
321
}
322
```
323
324
### Exception Handler
325
326
Interface for customizing how the client handles various types of exceptions.
327
328
```java { .api }
329
/**
330
* Interface for handling exceptions that occur in consumers and connections
331
*/
332
public interface ExceptionHandler {
333
334
/**
335
* Handle unexpected connection driver exceptions
336
* @param conn - Connection where exception occurred
337
* @param exception - The exception
338
*/
339
void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception);
340
341
/**
342
* Handle exceptions in return listeners
343
* @param channel - Channel where exception occurred
344
* @param exception - The exception
345
*/
346
void handleReturnListenerException(Channel channel, Throwable exception);
347
348
/**
349
* Handle exceptions in flow listeners
350
* @param channel - Channel where exception occurred
351
* @param exception - The exception
352
*/
353
void handleFlowListenerException(Channel channel, Throwable exception);
354
355
/**
356
* Handle exceptions in confirm listeners
357
* @param channel - Channel where exception occurred
358
* @param exception - The exception
359
*/
360
void handleConfirmListenerException(Channel channel, Throwable exception);
361
362
/**
363
* Handle exceptions in blocked listeners
364
* @param connection - Connection where exception occurred
365
* @param exception - The exception
366
*/
367
void handleBlockedListenerException(Connection connection, Throwable exception);
368
369
/**
370
* Handle exceptions in consumers
371
* @param channel - Channel where exception occurred
372
* @param exception - The exception
373
* @param consumer - Consumer that caused the exception
374
* @param consumerTag - Consumer tag
375
* @param methodName - Method where exception occurred
376
*/
377
void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName);
378
379
/**
380
* Handle connection recovery exceptions
381
* @param conn - Connection being recovered
382
* @param exception - The exception
383
*/
384
void handleConnectionRecoveryException(Connection conn, Throwable exception);
385
386
/**
387
* Handle channel recovery exceptions
388
* @param ch - Channel being recovered
389
* @param exception - The exception
390
*/
391
void handleChannelRecoveryException(Channel ch, Throwable exception);
392
393
/**
394
* Handle topology recovery exceptions
395
* @param conn - Connection where topology recovery failed
396
* @param ch - Channel where topology recovery failed
397
* @param exception - The exception
398
*/
399
void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception);
400
}
401
```