or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconfirms-returns.mdconnection-channel.mdconsumer-api.mdconsuming.mderror-recovery.mdindex.mdobservability.mdpublishing.mdrpc.md

confirms-returns.mddocs/

0

# Publisher Confirms and Returns

1

2

Mechanisms for reliable message publishing with publisher confirms and handling returned messages. These features provide acknowledgment that messages have been received and processed by the broker, and notification when messages cannot be routed to queues.

3

4

## Capabilities

5

6

### Publisher Confirms

7

8

Mechanism for getting acknowledgments from the broker that messages have been received and processed.

9

10

```java { .api }

11

/**

12

* Enable publisher confirms on this channel

13

* All subsequently published messages will be confirmed

14

*/

15

void confirmSelect() throws IOException;

16

17

/**

18

* Wait for all outstanding confirms to be received

19

* @return true if all confirms received, false if timeout

20

*/

21

boolean waitForConfirms() throws InterruptedException;

22

23

/**

24

* Wait for confirms with timeout

25

* @param timeout - Maximum time to wait in milliseconds

26

* @return true if all confirms received within timeout

27

*/

28

boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

29

30

/**

31

* Wait for at least one confirm to be received

32

* Throws exception if any message is nacked

33

*/

34

void waitForConfirmsOrDie() throws IOException, InterruptedException;

35

36

/**

37

* Wait for confirms with timeout, throw exception on nack

38

* @param timeout - Maximum time to wait in milliseconds

39

*/

40

void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

41

42

/**

43

* Get the next publish sequence number

44

* @return Sequence number for next published message

45

*/

46

long getNextPublishSeqNo();

47

48

/**

49

* Add a confirm listener for asynchronous confirm handling

50

* @param listener - Listener to receive confirm/nack notifications

51

*/

52

void addConfirmListener(ConfirmListener listener);

53

54

/**

55

* Remove a confirm listener

56

* @param listener - Listener to remove

57

* @return true if listener was removed

58

*/

59

boolean removeConfirmListener(ConfirmListener listener);

60

61

/**

62

* Clear all confirm listeners

63

*/

64

void clearConfirmListeners();

65

```

66

67

**Usage Examples:**

68

69

```java

70

// Synchronous confirms - wait for each message

71

Channel channel = connection.createChannel();

72

channel.confirmSelect();

73

74

for (int i = 0; i < 1000; i++) {

75

String message = "Message " + i;

76

channel.basicPublish("exchange", "routing.key", null, message.getBytes());

77

78

// Wait for this message to be confirmed

79

if (channel.waitForConfirms(5000)) {

80

System.out.println("Message " + i + " confirmed");

81

} else {

82

System.out.println("Message " + i + " not confirmed within timeout");

83

}

84

}

85

```

86

87

```java

88

// Batch confirms - publish multiple messages then wait

89

channel.confirmSelect();

90

int batchSize = 100;

91

92

for (int i = 0; i < 1000; i++) {

93

String message = "Message " + i;

94

channel.basicPublish("exchange", "routing.key", null, message.getBytes());

95

96

if ((i + 1) % batchSize == 0) {

97

// Wait for batch to be confirmed

98

try {

99

channel.waitForConfirmsOrDie(10000);

100

System.out.println("Batch " + ((i + 1) / batchSize) + " confirmed");

101

} catch (IOException e) {

102

System.out.println("Batch failed: " + e.getMessage());

103

}

104

}

105

}

106

```

107

108

### Asynchronous Confirm Handling

109

110

Interfaces and callbacks for handling confirms asynchronously without blocking.

111

112

```java { .api }

113

/**

114

* Listener interface for publisher confirms

115

*/

116

public interface ConfirmListener {

117

/**

118

* Called when message(s) are acknowledged by broker

119

* @param deliveryTag - Delivery tag of confirmed message

120

* @param multiple - True if multiple messages confirmed (up to and including deliveryTag)

121

*/

122

void handleAck(long deliveryTag, boolean multiple) throws IOException;

123

124

/**

125

* Called when message(s) are rejected by broker

126

* @param deliveryTag - Delivery tag of rejected message

127

* @param multiple - True if multiple messages rejected (up to and including deliveryTag)

128

*/

129

void handleNack(long deliveryTag, boolean multiple) throws IOException;

130

}

131

132

/**

133

* Functional interface for confirm acknowledgments

134

*/

135

@FunctionalInterface

136

public interface ConfirmCallback {

137

/**

138

* Handle confirm acknowledgment

139

* @param deliveryTag - Delivery tag of confirmed message

140

* @param multiple - True if multiple messages confirmed

141

*/

142

void handle(long deliveryTag, boolean multiple) throws IOException;

143

}

144

```

145

146

**Usage Examples:**

147

148

```java

149

// Asynchronous confirms with listener

150

channel.confirmSelect();

151

152

channel.addConfirmListener(new ConfirmListener() {

153

@Override

154

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

155

if (multiple) {

156

System.out.println("Messages up to " + deliveryTag + " confirmed");

157

// Remove confirmed messages from tracking

158

removeConfirmedMessages(deliveryTag);

159

} else {

160

System.out.println("Message " + deliveryTag + " confirmed");

161

removeConfirmedMessage(deliveryTag);

162

}

163

}

164

165

@Override

166

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

167

if (multiple) {

168

System.out.println("Messages up to " + deliveryTag + " rejected");

169

// Handle rejected messages

170

handleRejectedMessages(deliveryTag);

171

} else {

172

System.out.println("Message " + deliveryTag + " rejected");

173

handleRejectedMessage(deliveryTag);

174

}

175

}

176

});

177

178

// Publish with tracking

179

Map<Long, String> outstandingConfirms = new ConcurrentHashMap<>();

180

181

for (int i = 0; i < 1000; i++) {

182

String message = "Message " + i;

183

long deliveryTag = channel.getNextPublishSeqNo();

184

outstandingConfirms.put(deliveryTag, message);

185

186

channel.basicPublish("exchange", "routing.key", null, message.getBytes());

187

}

188

```

189

190

```java

191

// Using functional confirm callbacks

192

ConfirmCallback ackCallback = (deliveryTag, multiple) -> {

193

System.out.println("Acked: " + deliveryTag + " (multiple: " + multiple + ")");

194

confirmTracker.handleAck(deliveryTag, multiple);

195

};

196

197

ConfirmCallback nackCallback = (deliveryTag, multiple) -> {

198

System.out.println("Nacked: " + deliveryTag + " (multiple: " + multiple + ")");

199

confirmTracker.handleNack(deliveryTag, multiple);

200

// Retry logic here

201

};

202

203

channel.addConfirmListener(ackCallback, nackCallback);

204

```

205

206

### Message Returns

207

208

Handling of messages that cannot be routed to any queue (when published with mandatory flag).

209

210

```java { .api }

211

/**

212

* Add a return listener to handle returned messages

213

* @param listener - Listener to receive returned messages

214

*/

215

void addReturnListener(ReturnListener listener);

216

217

/**

218

* Remove a return listener

219

* @param listener - Listener to remove

220

* @return true if listener was removed

221

*/

222

boolean removeReturnListener(ReturnListener listener);

223

224

/**

225

* Clear all return listeners

226

*/

227

void clearReturnListeners();

228

229

/**

230

* Listener interface for returned messages

231

*/

232

public interface ReturnListener {

233

/**

234

* Called when a message is returned by the broker

235

* @param replyCode - Reply code indicating why message was returned

236

* @param replyText - Human-readable reply text

237

* @param exchange - Exchange the message was published to

238

* @param routingKey - Routing key used for publishing

239

* @param properties - Message properties

240

* @param body - Message body

241

*/

242

void handleReturn(int replyCode, String replyText, String exchange, String routingKey,

243

AMQP.BasicProperties properties, byte[] body) throws IOException;

244

}

245

246

/**

247

* Functional interface for handling returned messages

248

*/

249

@FunctionalInterface

250

public interface ReturnCallback {

251

/**

252

* Handle returned message

253

* @param returnMessage - Return information

254

*/

255

void handle(Return returnMessage) throws IOException;

256

}

257

258

/**

259

* Class representing a returned message

260

*/

261

public class Return {

262

public int getReplyCode();

263

public String getReplyText();

264

public String getExchange();

265

public String getRoutingKey();

266

public AMQP.BasicProperties getProperties();

267

public byte[] getBody();

268

}

269

```

270

271

**Usage Examples:**

272

273

```java

274

// Handle returned messages with listener

275

channel.addReturnListener(new ReturnListener() {

276

@Override

277

public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,

278

AMQP.BasicProperties properties, byte[] body) throws IOException {

279

280

String message = new String(body, "UTF-8");

281

System.out.printf("Message returned: %d - %s%n", replyCode, replyText);

282

System.out.printf("Exchange: %s, Routing Key: %s%n", exchange, routingKey);

283

System.out.printf("Message: %s%n", message);

284

285

// Handle based on reason

286

switch (replyCode) {

287

case 312: // NO_ROUTE

288

System.out.println("No route found for message");

289

// Try alternative routing or store for retry

290

handleUnroutableMessage(exchange, routingKey, properties, body);

291

break;

292

case 313: // NO_CONSUMERS

293

System.out.println("No consumers available");

294

// Queue exists but no consumers

295

handleNoConsumers(exchange, routingKey, properties, body);

296

break;

297

default:

298

System.out.println("Unknown return reason: " + replyCode);

299

handleUnknownReturn(replyCode, replyText, properties, body);

300

}

301

}

302

});

303

304

// Publish with mandatory flag

305

String message = "Important message";

306

boolean mandatory = true; // Return if unroutable

307

channel.basicPublish("my.exchange", "nonexistent.key", mandatory, null, message.getBytes());

308

```

309

310

```java

311

// Using functional return callback

312

ReturnCallback returnCallback = returnMessage -> {

313

System.out.println("Returned: " + returnMessage.getReplyText());

314

System.out.println("Exchange: " + returnMessage.getExchange());

315

System.out.println("Routing Key: " + returnMessage.getRoutingKey());

316

317

String messageBody = new String(returnMessage.getBody(), "UTF-8");

318

System.out.println("Message: " + messageBody);

319

320

// Store for retry or send to dead letter

321

storeForRetry(returnMessage);

322

};

323

324

channel.addReturnListener(returnCallback);

325

```

326

327

### Combined Confirms and Returns

328

329

Example showing how to use both confirms and returns together for robust publishing.

330

331

**Robust Publisher Implementation:**

332

333

```java

334

public class RobustPublisher {

335

private final Channel channel;

336

private final Map<Long, PendingMessage> pendingConfirms = new ConcurrentHashMap<>();

337

private final AtomicLong publishSeqNo = new AtomicLong(0);

338

339

public RobustPublisher(Channel channel) throws IOException {

340

this.channel = channel;

341

channel.confirmSelect();

342

setupListeners();

343

}

344

345

private void setupListeners() {

346

// Handle confirms

347

channel.addConfirmListener(new ConfirmListener() {

348

@Override

349

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

350

if (multiple) {

351

// Remove all confirmed messages up to deliveryTag

352

pendingConfirms.entrySet().removeIf(entry -> entry.getKey() <= deliveryTag);

353

} else {

354

pendingConfirms.remove(deliveryTag);

355

}

356

System.out.println("Confirmed: " + deliveryTag + " (multiple: " + multiple + ")");

357

}

358

359

@Override

360

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

361

if (multiple) {

362

// Handle all nacked messages up to deliveryTag

363

pendingConfirms.entrySet().stream()

364

.filter(entry -> entry.getKey() <= deliveryTag)

365

.forEach(entry -> handleNackedMessage(entry.getValue()));

366

pendingConfirms.entrySet().removeIf(entry -> entry.getKey() <= deliveryTag);

367

} else {

368

PendingMessage message = pendingConfirms.remove(deliveryTag);

369

if (message != null) {

370

handleNackedMessage(message);

371

}

372

}

373

System.out.println("Nacked: " + deliveryTag + " (multiple: " + multiple + ")");

374

}

375

});

376

377

// Handle returns

378

channel.addReturnListener(returnMessage -> {

379

System.out.println("Message returned: " + returnMessage.getReplyText());

380

381

// Find the corresponding pending message and mark as returned

382

String correlationId = returnMessage.getProperties().getCorrelationId();

383

if (correlationId != null) {

384

handleReturnedMessage(correlationId, returnMessage);

385

}

386

});

387

}

388

389

public void publishReliably(String exchange, String routingKey, AMQP.BasicProperties props,

390

byte[] body) throws IOException {

391

392

// Generate correlation ID for tracking

393

String correlationId = UUID.randomUUID().toString();

394

AMQP.BasicProperties propsWithCorrelation = new AMQP.BasicProperties.Builder()

395

.correlationId(correlationId)

396

.contentType(props != null ? props.getContentType() : null)

397

.deliveryMode(props != null ? props.getDeliveryMode() : null)

398

.headers(props != null ? props.getHeaders() : null)

399

.build();

400

401

// Track the message

402

long seqNo = channel.getNextPublishSeqNo();

403

PendingMessage pending = new PendingMessage(correlationId, exchange, routingKey,

404

propsWithCorrelation, body, System.currentTimeMillis());

405

pendingConfirms.put(seqNo, pending);

406

407

// Publish with mandatory flag

408

channel.basicPublish(exchange, routingKey, true, propsWithCorrelation, body);

409

410

System.out.println("Published message " + seqNo + " with correlation ID: " + correlationId);

411

}

412

413

private void handleNackedMessage(PendingMessage message) {

414

System.out.println("Message nacked: " + message.getCorrelationId());

415

// Implement retry logic or dead letter handling

416

scheduleRetry(message);

417

}

418

419

private void handleReturnedMessage(String correlationId, Return returnMessage) {

420

System.out.println("Message returned: " + correlationId);

421

// Handle unroutable message

422

storeInDeadLetter(correlationId, returnMessage);

423

}

424

425

// Helper class for tracking pending messages

426

private static class PendingMessage {

427

private final String correlationId;

428

private final String exchange;

429

private final String routingKey;

430

private final AMQP.BasicProperties properties;

431

private final byte[] body;

432

private final long timestamp;

433

434

// Constructor and getters...

435

}

436

}

437

```

438

439

**Usage:**

440

441

```java

442

Channel channel = connection.createChannel();

443

RobustPublisher publisher = new RobustPublisher(channel);

444

445

// Publish messages reliably

446

for (int i = 0; i < 100; i++) {

447

String message = "Message " + i;

448

AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;

449

450

publisher.publishReliably("my.exchange", "routing.key", props, message.getBytes());

451

}

452

453

// Monitor pending confirms

454

Thread.sleep(5000);

455

System.out.println("Pending confirms: " + publisher.getPendingConfirmCount());

456

```

457

458

## Types

459

460

### Confirm and Return Types

461

462

```java { .api }

463

// Return message information

464

public class Return {

465

public int getReplyCode(); // AMQP reply code (312=NO_ROUTE, 313=NO_CONSUMERS)

466

public String getReplyText(); // Human-readable explanation

467

public String getExchange(); // Exchange message was published to

468

public String getRoutingKey(); // Routing key used

469

public AMQP.BasicProperties getProperties(); // Message properties

470

public byte[] getBody(); // Message body

471

}

472

473

// Common AMQP reply codes for returns

474

public static final int REPLY_SUCCESS = 200;

475

public static final int NO_ROUTE = 312; // No route found for message

476

public static final int NO_CONSUMERS = 313; // Queue exists but no consumers

477

```