0
# Consumer API
1
2
Interfaces and implementations for consuming messages asynchronously with callbacks. The Consumer API provides both functional callback interfaces and full Consumer implementations for handling message delivery, cancellation, and shutdown events.
3
4
## Capabilities
5
6
### Functional Callback Interfaces
7
8
Modern callback interfaces for handling message delivery and consumer events.
9
10
```java { .api }
11
/**
12
* Callback interface for message delivery
13
*/
14
@FunctionalInterface
15
public interface DeliverCallback {
16
/**
17
* Called when a message is delivered to the consumer
18
* @param consumerTag - Consumer identifier
19
* @param delivery - Message delivery information
20
*/
21
void handle(String consumerTag, Delivery delivery) throws IOException;
22
}
23
24
/**
25
* Callback interface for consumer cancellation
26
*/
27
@FunctionalInterface
28
public interface CancelCallback {
29
/**
30
* Called when the consumer is cancelled
31
* @param consumerTag - Consumer identifier that was cancelled
32
*/
33
void handle(String consumerTag) throws IOException;
34
}
35
36
/**
37
* Callback interface for consumer shutdown signals
38
*/
39
@FunctionalInterface
40
public interface ConsumerShutdownSignalCallback {
41
/**
42
* Called when the consumer receives a shutdown signal
43
* @param consumerTag - Consumer identifier
44
* @param sig - Shutdown signal with reason and details
45
*/
46
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
47
}
48
```
49
50
**Usage Examples:**
51
52
```java
53
// Lambda-based message handling
54
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
55
String message = new String(delivery.getBody(), "UTF-8");
56
System.out.println("[" + consumerTag + "] Received: " + message);
57
58
// Process based on message properties
59
AMQP.BasicProperties props = delivery.getProperties();
60
if ("urgent".equals(props.getType())) {
61
handleUrgentMessage(message);
62
} else {
63
handleRegularMessage(message);
64
}
65
};
66
67
CancelCallback cancelCallback = consumerTag -> {
68
System.out.println("Consumer " + consumerTag + " was cancelled");
69
// Cleanup resources
70
cleanupConsumerResources(consumerTag);
71
};
72
73
ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {
74
System.out.println("Consumer " + consumerTag + " shutdown: " + sig.getReason());
75
if (!sig.isInitiatedByApplication()) {
76
// Handle unexpected shutdown
77
scheduleReconnection();
78
}
79
};
80
81
// Start consuming with callbacks
82
String consumerTag = channel.basicConsume("work.queue", false,
83
deliverCallback, cancelCallback, shutdownCallback);
84
```
85
86
```java
87
// Method reference usage
88
public class MessageProcessor {
89
public void handleDelivery(String consumerTag, Delivery delivery) throws IOException {
90
// Process message
91
String message = new String(delivery.getBody(), "UTF-8");
92
processBusinessLogic(message);
93
94
// Manual acknowledgment
95
Channel channel = ((Consumer) this).getChannel();
96
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
97
}
98
99
public void handleCancel(String consumerTag) throws IOException {
100
System.out.println("Consumer cancelled: " + consumerTag);
101
}
102
}
103
104
MessageProcessor processor = new MessageProcessor();
105
channel.basicConsume("queue", false, processor::handleDelivery, processor::handleCancel);
106
```
107
108
### Consumer Interface
109
110
Full consumer interface for handling all consumer events.
111
112
```java { .api }
113
/**
114
* Interface for implementing message consumers
115
*/
116
public interface Consumer {
117
/**
118
* Called when a message is delivered
119
* @param consumerTag - Consumer identifier
120
* @param envelope - Delivery envelope with routing info
121
* @param properties - Message properties
122
* @param body - Message body
123
*/
124
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
125
126
/**
127
* Called when the consumer is cancelled by the server
128
* @param consumerTag - Consumer identifier
129
*/
130
void handleCancel(String consumerTag) throws IOException;
131
132
/**
133
* Called when basicCancel is called
134
* @param consumerTag - Consumer identifier
135
*/
136
void handleCancelOk(String consumerTag);
137
138
/**
139
* Called when the consumer is registered
140
* @param consumerTag - Consumer identifier
141
*/
142
void handleConsumeOk(String consumerTag);
143
144
/**
145
* Called when basicRecover is called
146
* @param consumerTag - Consumer identifier
147
*/
148
void handleRecoverOk(String consumerTag);
149
150
/**
151
* Called when a shutdown signal is received
152
* @param consumerTag - Consumer identifier
153
* @param sig - Shutdown signal
154
*/
155
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
156
157
/**
158
* Get the consumer tag
159
* @return Consumer identifier
160
*/
161
String getConsumerTag();
162
}
163
```
164
165
### DefaultConsumer
166
167
Default implementation of the Consumer interface providing base functionality.
168
169
```java { .api }
170
/**
171
* Default implementation of Consumer interface.
172
* Extends this class and override methods as needed.
173
*/
174
public class DefaultConsumer implements Consumer {
175
/**
176
* Constructor taking a channel
177
* @param channel - Channel for this consumer
178
*/
179
public DefaultConsumer(Channel channel);
180
181
/**
182
* Get the channel this consumer is associated with
183
* @return Channel instance
184
*/
185
public Channel getChannel();
186
187
/**
188
* Get the consumer tag
189
* @return Consumer tag string
190
*/
191
public String getConsumerTag();
192
193
/**
194
* Set the consumer tag (called by the library)
195
* @param consumerTag - Consumer identifier
196
*/
197
public void setConsumerTag(String consumerTag);
198
199
// Default implementations of Consumer interface methods
200
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
201
public void handleCancel(String consumerTag) throws IOException;
202
public void handleCancelOk(String consumerTag);
203
public void handleConsumeOk(String consumerTag);
204
public void handleRecoverOk(String consumerTag);
205
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
206
}
207
```
208
209
**Usage Examples:**
210
211
```java
212
// Custom consumer extending DefaultConsumer
213
public class WorkQueueConsumer extends DefaultConsumer {
214
215
public WorkQueueConsumer(Channel channel) {
216
super(channel);
217
}
218
219
@Override
220
public void handleDelivery(String consumerTag, Envelope envelope,
221
AMQP.BasicProperties properties, byte[] body) throws IOException {
222
String message = new String(body, "UTF-8");
223
224
try {
225
// Simulate work
226
processWork(message);
227
228
// Acknowledge successful processing
229
getChannel().basicAck(envelope.getDeliveryTag(), false);
230
231
} catch (Exception e) {
232
System.err.println("Error processing message: " + e.getMessage());
233
234
// Reject and requeue for retry
235
getChannel().basicNack(envelope.getDeliveryTag(), false, true);
236
}
237
}
238
239
@Override
240
public void handleCancel(String consumerTag) throws IOException {
241
System.out.println("Consumer cancelled: " + consumerTag);
242
// Perform cleanup
243
cleanup();
244
}
245
246
@Override
247
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
248
System.out.println("Consumer shutdown: " + sig.getReason());
249
if (!sig.isInitiatedByApplication()) {
250
// Handle unexpected shutdown
251
reconnect();
252
}
253
}
254
255
private void processWork(String message) throws Exception {
256
// Business logic here
257
Thread.sleep(1000); // Simulate processing time
258
System.out.println("Processed: " + message);
259
}
260
261
private void cleanup() {
262
// Cleanup resources
263
}
264
265
private void reconnect() {
266
// Reconnection logic
267
}
268
}
269
270
// Use the custom consumer
271
Channel channel = connection.createChannel();
272
WorkQueueConsumer consumer = new WorkQueueConsumer(channel);
273
channel.basicConsume("work.queue", false, consumer);
274
```
275
276
```java
277
// Simple consumer for logging messages
278
public class LoggingConsumer extends DefaultConsumer {
279
280
public LoggingConsumer(Channel channel) {
281
super(channel);
282
}
283
284
@Override
285
public void handleDelivery(String consumerTag, Envelope envelope,
286
AMQP.BasicProperties properties, byte[] body) throws IOException {
287
String message = new String(body, "UTF-8");
288
String routingKey = envelope.getRoutingKey();
289
String exchange = envelope.getExchange();
290
291
System.out.printf("[%s] %s->%s: %s%n",
292
consumerTag, exchange, routingKey, message);
293
294
// Auto-acknowledge for logging (fire-and-forget)
295
getChannel().basicAck(envelope.getDeliveryTag(), false);
296
}
297
}
298
299
channel.basicConsume("logs.queue", false, new LoggingConsumer(channel));
300
```
301
302
### Consumer Exception Handling
303
304
```java { .api }
305
/**
306
* Exception thrown when a consumer is cancelled
307
*/
308
public class ConsumerCancelledException extends RuntimeException {
309
public ConsumerCancelledException();
310
}
311
```
312
313
**Usage Examples:**
314
315
```java
316
// Handling consumer cancellation in application code
317
try {
318
// Consumer processing logic
319
while (isRunning) {
320
// Process messages or wait
321
Thread.sleep(1000);
322
}
323
} catch (ConsumerCancelledException e) {
324
System.out.println("Consumer was cancelled");
325
// Restart consumer or exit gracefully
326
restartConsumer();
327
}
328
```
329
330
### Advanced Consumer Patterns
331
332
**Multi-threaded Consumer:**
333
334
```java
335
public class ThreadedConsumer extends DefaultConsumer {
336
private final ExecutorService executor;
337
338
public ThreadedConsumer(Channel channel, ExecutorService executor) {
339
super(channel);
340
this.executor = executor;
341
}
342
343
@Override
344
public void handleDelivery(String consumerTag, Envelope envelope,
345
AMQP.BasicProperties properties, byte[] body) throws IOException {
346
347
// Submit to thread pool for processing
348
executor.submit(() -> {
349
try {
350
String message = new String(body, "UTF-8");
351
processMessage(message);
352
353
// Acknowledge in the callback thread
354
getChannel().basicAck(envelope.getDeliveryTag(), false);
355
356
} catch (Exception e) {
357
try {
358
getChannel().basicNack(envelope.getDeliveryTag(), false, true);
359
} catch (IOException ex) {
360
System.err.println("Failed to nack message: " + ex.getMessage());
361
}
362
}
363
});
364
}
365
}
366
```
367
368
**Retry Logic Consumer:**
369
370
```java
371
public class RetryConsumer extends DefaultConsumer {
372
private static final int MAX_RETRIES = 3;
373
374
public RetryConsumer(Channel channel) {
375
super(channel);
376
}
377
378
@Override
379
public void handleDelivery(String consumerTag, Envelope envelope,
380
AMQP.BasicProperties properties, byte[] body) throws IOException {
381
382
Map<String, Object> headers = properties.getHeaders();
383
int retryCount = headers != null && headers.containsKey("x-retry-count") ?
384
(Integer) headers.get("x-retry-count") : 0;
385
386
try {
387
String message = new String(body, "UTF-8");
388
processMessage(message);
389
getChannel().basicAck(envelope.getDeliveryTag(), false);
390
391
} catch (Exception e) {
392
if (retryCount < MAX_RETRIES) {
393
// Republish with incremented retry count
394
republishWithRetry(envelope, properties, body, retryCount + 1);
395
getChannel().basicAck(envelope.getDeliveryTag(), false);
396
} else {
397
// Send to dead letter queue or discard
398
getChannel().basicNack(envelope.getDeliveryTag(), false, false);
399
}
400
}
401
}
402
403
private void republishWithRetry(Envelope envelope, AMQP.BasicProperties props,
404
byte[] body, int retryCount) throws IOException {
405
406
Map<String, Object> headers = new HashMap<>(props.getHeaders() != null ? props.getHeaders() : new HashMap<>());
407
headers.put("x-retry-count", retryCount);
408
409
AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()
410
.headers(headers)
411
.contentType(props.getContentType())
412
.deliveryMode(props.getDeliveryMode())
413
.build();
414
415
// Republish to retry exchange/queue
416
getChannel().basicPublish("retry.exchange", envelope.getRoutingKey(), newProps, body);
417
}
418
}
419
```
420
421
## Types
422
423
### Consumer-Related Types
424
425
```java { .api }
426
// Consumer tag management
427
public interface ConsumerTagSupplier {
428
String get();
429
}
430
431
// Message delivery data
432
public class Delivery {
433
public Envelope getEnvelope();
434
public AMQP.BasicProperties getProperties();
435
public byte[] getBody();
436
}
437
438
// Envelope with message routing information
439
public class Envelope {
440
public long getDeliveryTag();
441
public boolean isRedeliver();
442
public String getExchange();
443
public String getRoutingKey();
444
}
445
```