0
# Publisher Confirms and Returns
1
2
Mechanisms for reliable message publishing with publisher confirms and handling returned messages. These features provide acknowledgment that messages have been received and processed by the broker, and notification when messages cannot be routed to queues.
3
4
## Capabilities
5
6
### Publisher Confirms
7
8
Mechanism for getting acknowledgments from the broker that messages have been received and processed.
9
10
```java { .api }
11
/**
12
* Enable publisher confirms on this channel
13
* All subsequently published messages will be confirmed
14
*/
15
void confirmSelect() throws IOException;
16
17
/**
18
* Wait for all outstanding confirms to be received
19
* @return true if all confirms received, false if timeout
20
*/
21
boolean waitForConfirms() throws InterruptedException;
22
23
/**
24
* Wait for confirms with timeout
25
* @param timeout - Maximum time to wait in milliseconds
26
* @return true if all confirms received within timeout
27
*/
28
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
29
30
/**
31
* Wait for at least one confirm to be received
32
* Throws exception if any message is nacked
33
*/
34
void waitForConfirmsOrDie() throws IOException, InterruptedException;
35
36
/**
37
* Wait for confirms with timeout, throw exception on nack
38
* @param timeout - Maximum time to wait in milliseconds
39
*/
40
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
41
42
/**
43
* Get the next publish sequence number
44
* @return Sequence number for next published message
45
*/
46
long getNextPublishSeqNo();
47
48
/**
49
* Add a confirm listener for asynchronous confirm handling
50
* @param listener - Listener to receive confirm/nack notifications
51
*/
52
void addConfirmListener(ConfirmListener listener);
53
54
/**
55
* Remove a confirm listener
56
* @param listener - Listener to remove
57
* @return true if listener was removed
58
*/
59
boolean removeConfirmListener(ConfirmListener listener);
60
61
/**
62
* Clear all confirm listeners
63
*/
64
void clearConfirmListeners();
65
```
66
67
**Usage Examples:**
68
69
```java
70
// Synchronous confirms - wait for each message
71
Channel channel = connection.createChannel();
72
channel.confirmSelect();
73
74
for (int i = 0; i < 1000; i++) {
75
String message = "Message " + i;
76
channel.basicPublish("exchange", "routing.key", null, message.getBytes());
77
78
// Wait for this message to be confirmed
79
if (channel.waitForConfirms(5000)) {
80
System.out.println("Message " + i + " confirmed");
81
} else {
82
System.out.println("Message " + i + " not confirmed within timeout");
83
}
84
}
85
```
86
87
```java
88
// Batch confirms - publish multiple messages then wait
89
channel.confirmSelect();
90
int batchSize = 100;
91
92
for (int i = 0; i < 1000; i++) {
93
String message = "Message " + i;
94
channel.basicPublish("exchange", "routing.key", null, message.getBytes());
95
96
if ((i + 1) % batchSize == 0) {
97
// Wait for batch to be confirmed
98
try {
99
channel.waitForConfirmsOrDie(10000);
100
System.out.println("Batch " + ((i + 1) / batchSize) + " confirmed");
101
} catch (IOException e) {
102
System.out.println("Batch failed: " + e.getMessage());
103
}
104
}
105
}
106
```
107
108
### Asynchronous Confirm Handling
109
110
Interfaces and callbacks for handling confirms asynchronously without blocking.
111
112
```java { .api }
113
/**
114
* Listener interface for publisher confirms
115
*/
116
public interface ConfirmListener {
117
/**
118
* Called when message(s) are acknowledged by broker
119
* @param deliveryTag - Delivery tag of confirmed message
120
* @param multiple - True if multiple messages confirmed (up to and including deliveryTag)
121
*/
122
void handleAck(long deliveryTag, boolean multiple) throws IOException;
123
124
/**
125
* Called when message(s) are rejected by broker
126
* @param deliveryTag - Delivery tag of rejected message
127
* @param multiple - True if multiple messages rejected (up to and including deliveryTag)
128
*/
129
void handleNack(long deliveryTag, boolean multiple) throws IOException;
130
}
131
132
/**
133
* Functional interface for confirm acknowledgments
134
*/
135
@FunctionalInterface
136
public interface ConfirmCallback {
137
/**
138
* Handle confirm acknowledgment
139
* @param deliveryTag - Delivery tag of confirmed message
140
* @param multiple - True if multiple messages confirmed
141
*/
142
void handle(long deliveryTag, boolean multiple) throws IOException;
143
}
144
```
145
146
**Usage Examples:**
147
148
```java
149
// Asynchronous confirms with listener
150
channel.confirmSelect();
151
152
channel.addConfirmListener(new ConfirmListener() {
153
@Override
154
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
155
if (multiple) {
156
System.out.println("Messages up to " + deliveryTag + " confirmed");
157
// Remove confirmed messages from tracking
158
removeConfirmedMessages(deliveryTag);
159
} else {
160
System.out.println("Message " + deliveryTag + " confirmed");
161
removeConfirmedMessage(deliveryTag);
162
}
163
}
164
165
@Override
166
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
167
if (multiple) {
168
System.out.println("Messages up to " + deliveryTag + " rejected");
169
// Handle rejected messages
170
handleRejectedMessages(deliveryTag);
171
} else {
172
System.out.println("Message " + deliveryTag + " rejected");
173
handleRejectedMessage(deliveryTag);
174
}
175
}
176
});
177
178
// Publish with tracking
179
Map<Long, String> outstandingConfirms = new ConcurrentHashMap<>();
180
181
for (int i = 0; i < 1000; i++) {
182
String message = "Message " + i;
183
long deliveryTag = channel.getNextPublishSeqNo();
184
outstandingConfirms.put(deliveryTag, message);
185
186
channel.basicPublish("exchange", "routing.key", null, message.getBytes());
187
}
188
```
189
190
```java
191
// Using functional confirm callbacks
192
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
193
System.out.println("Acked: " + deliveryTag + " (multiple: " + multiple + ")");
194
confirmTracker.handleAck(deliveryTag, multiple);
195
};
196
197
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
198
System.out.println("Nacked: " + deliveryTag + " (multiple: " + multiple + ")");
199
confirmTracker.handleNack(deliveryTag, multiple);
200
// Retry logic here
201
};
202
203
channel.addConfirmListener(ackCallback, nackCallback);
204
```
205
206
### Message Returns
207
208
Handling of messages that cannot be routed to any queue (when published with mandatory flag).
209
210
```java { .api }
211
/**
212
* Add a return listener to handle returned messages
213
* @param listener - Listener to receive returned messages
214
*/
215
void addReturnListener(ReturnListener listener);
216
217
/**
218
* Remove a return listener
219
* @param listener - Listener to remove
220
* @return true if listener was removed
221
*/
222
boolean removeReturnListener(ReturnListener listener);
223
224
/**
225
* Clear all return listeners
226
*/
227
void clearReturnListeners();
228
229
/**
230
* Listener interface for returned messages
231
*/
232
public interface ReturnListener {
233
/**
234
* Called when a message is returned by the broker
235
* @param replyCode - Reply code indicating why message was returned
236
* @param replyText - Human-readable reply text
237
* @param exchange - Exchange the message was published to
238
* @param routingKey - Routing key used for publishing
239
* @param properties - Message properties
240
* @param body - Message body
241
*/
242
void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
243
AMQP.BasicProperties properties, byte[] body) throws IOException;
244
}
245
246
/**
247
* Functional interface for handling returned messages
248
*/
249
@FunctionalInterface
250
public interface ReturnCallback {
251
/**
252
* Handle returned message
253
* @param returnMessage - Return information
254
*/
255
void handle(Return returnMessage) throws IOException;
256
}
257
258
/**
259
* Class representing a returned message
260
*/
261
public class Return {
262
public int getReplyCode();
263
public String getReplyText();
264
public String getExchange();
265
public String getRoutingKey();
266
public AMQP.BasicProperties getProperties();
267
public byte[] getBody();
268
}
269
```
270
271
**Usage Examples:**
272
273
```java
274
// Handle returned messages with listener
275
channel.addReturnListener(new ReturnListener() {
276
@Override
277
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
278
AMQP.BasicProperties properties, byte[] body) throws IOException {
279
280
String message = new String(body, "UTF-8");
281
System.out.printf("Message returned: %d - %s%n", replyCode, replyText);
282
System.out.printf("Exchange: %s, Routing Key: %s%n", exchange, routingKey);
283
System.out.printf("Message: %s%n", message);
284
285
// Handle based on reason
286
switch (replyCode) {
287
case 312: // NO_ROUTE
288
System.out.println("No route found for message");
289
// Try alternative routing or store for retry
290
handleUnroutableMessage(exchange, routingKey, properties, body);
291
break;
292
case 313: // NO_CONSUMERS
293
System.out.println("No consumers available");
294
// Queue exists but no consumers
295
handleNoConsumers(exchange, routingKey, properties, body);
296
break;
297
default:
298
System.out.println("Unknown return reason: " + replyCode);
299
handleUnknownReturn(replyCode, replyText, properties, body);
300
}
301
}
302
});
303
304
// Publish with mandatory flag
305
String message = "Important message";
306
boolean mandatory = true; // Return if unroutable
307
channel.basicPublish("my.exchange", "nonexistent.key", mandatory, null, message.getBytes());
308
```
309
310
```java
311
// Using functional return callback
312
ReturnCallback returnCallback = returnMessage -> {
313
System.out.println("Returned: " + returnMessage.getReplyText());
314
System.out.println("Exchange: " + returnMessage.getExchange());
315
System.out.println("Routing Key: " + returnMessage.getRoutingKey());
316
317
String messageBody = new String(returnMessage.getBody(), "UTF-8");
318
System.out.println("Message: " + messageBody);
319
320
// Store for retry or send to dead letter
321
storeForRetry(returnMessage);
322
};
323
324
channel.addReturnListener(returnCallback);
325
```
326
327
### Combined Confirms and Returns
328
329
Example showing how to use both confirms and returns together for robust publishing.
330
331
**Robust Publisher Implementation:**
332
333
```java
334
public class RobustPublisher {
335
private final Channel channel;
336
private final Map<Long, PendingMessage> pendingConfirms = new ConcurrentHashMap<>();
337
private final AtomicLong publishSeqNo = new AtomicLong(0);
338
339
public RobustPublisher(Channel channel) throws IOException {
340
this.channel = channel;
341
channel.confirmSelect();
342
setupListeners();
343
}
344
345
private void setupListeners() {
346
// Handle confirms
347
channel.addConfirmListener(new ConfirmListener() {
348
@Override
349
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
350
if (multiple) {
351
// Remove all confirmed messages up to deliveryTag
352
pendingConfirms.entrySet().removeIf(entry -> entry.getKey() <= deliveryTag);
353
} else {
354
pendingConfirms.remove(deliveryTag);
355
}
356
System.out.println("Confirmed: " + deliveryTag + " (multiple: " + multiple + ")");
357
}
358
359
@Override
360
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
361
if (multiple) {
362
// Handle all nacked messages up to deliveryTag
363
pendingConfirms.entrySet().stream()
364
.filter(entry -> entry.getKey() <= deliveryTag)
365
.forEach(entry -> handleNackedMessage(entry.getValue()));
366
pendingConfirms.entrySet().removeIf(entry -> entry.getKey() <= deliveryTag);
367
} else {
368
PendingMessage message = pendingConfirms.remove(deliveryTag);
369
if (message != null) {
370
handleNackedMessage(message);
371
}
372
}
373
System.out.println("Nacked: " + deliveryTag + " (multiple: " + multiple + ")");
374
}
375
});
376
377
// Handle returns
378
channel.addReturnListener(returnMessage -> {
379
System.out.println("Message returned: " + returnMessage.getReplyText());
380
381
// Find the corresponding pending message and mark as returned
382
String correlationId = returnMessage.getProperties().getCorrelationId();
383
if (correlationId != null) {
384
handleReturnedMessage(correlationId, returnMessage);
385
}
386
});
387
}
388
389
public void publishReliably(String exchange, String routingKey, AMQP.BasicProperties props,
390
byte[] body) throws IOException {
391
392
// Generate correlation ID for tracking
393
String correlationId = UUID.randomUUID().toString();
394
AMQP.BasicProperties propsWithCorrelation = new AMQP.BasicProperties.Builder()
395
.correlationId(correlationId)
396
.contentType(props != null ? props.getContentType() : null)
397
.deliveryMode(props != null ? props.getDeliveryMode() : null)
398
.headers(props != null ? props.getHeaders() : null)
399
.build();
400
401
// Track the message
402
long seqNo = channel.getNextPublishSeqNo();
403
PendingMessage pending = new PendingMessage(correlationId, exchange, routingKey,
404
propsWithCorrelation, body, System.currentTimeMillis());
405
pendingConfirms.put(seqNo, pending);
406
407
// Publish with mandatory flag
408
channel.basicPublish(exchange, routingKey, true, propsWithCorrelation, body);
409
410
System.out.println("Published message " + seqNo + " with correlation ID: " + correlationId);
411
}
412
413
private void handleNackedMessage(PendingMessage message) {
414
System.out.println("Message nacked: " + message.getCorrelationId());
415
// Implement retry logic or dead letter handling
416
scheduleRetry(message);
417
}
418
419
private void handleReturnedMessage(String correlationId, Return returnMessage) {
420
System.out.println("Message returned: " + correlationId);
421
// Handle unroutable message
422
storeInDeadLetter(correlationId, returnMessage);
423
}
424
425
// Helper class for tracking pending messages
426
private static class PendingMessage {
427
private final String correlationId;
428
private final String exchange;
429
private final String routingKey;
430
private final AMQP.BasicProperties properties;
431
private final byte[] body;
432
private final long timestamp;
433
434
// Constructor and getters...
435
}
436
}
437
```
438
439
**Usage:**
440
441
```java
442
Channel channel = connection.createChannel();
443
RobustPublisher publisher = new RobustPublisher(channel);
444
445
// Publish messages reliably
446
for (int i = 0; i < 100; i++) {
447
String message = "Message " + i;
448
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
449
450
publisher.publishReliably("my.exchange", "routing.key", props, message.getBytes());
451
}
452
453
// Monitor pending confirms
454
Thread.sleep(5000);
455
System.out.println("Pending confirms: " + publisher.getPendingConfirmCount());
456
```
457
458
## Types
459
460
### Confirm and Return Types
461
462
```java { .api }
463
// Return message information
464
public class Return {
465
public int getReplyCode(); // AMQP reply code (312=NO_ROUTE, 313=NO_CONSUMERS)
466
public String getReplyText(); // Human-readable explanation
467
public String getExchange(); // Exchange message was published to
468
public String getRoutingKey(); // Routing key used
469
public AMQP.BasicProperties getProperties(); // Message properties
470
public byte[] getBody(); // Message body
471
}
472
473
// Common AMQP reply codes for returns
474
public static final int REPLY_SUCCESS = 200;
475
public static final int NO_ROUTE = 312; // No route found for message
476
public static final int NO_CONSUMERS = 313; // Queue exists but no consumers
477
```