or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconfirms-returns.mdconnection-channel.mdconsumer-api.mdconsuming.mderror-recovery.mdindex.mdobservability.mdpublishing.mdrpc.md

consumer-api.mddocs/

0

# Consumer API

1

2

Interfaces and implementations for consuming messages asynchronously with callbacks. The Consumer API provides both functional callback interfaces and full Consumer implementations for handling message delivery, cancellation, and shutdown events.

3

4

## Capabilities

5

6

### Functional Callback Interfaces

7

8

Modern callback interfaces for handling message delivery and consumer events.

9

10

```java { .api }

11

/**

12

* Callback interface for message delivery

13

*/

14

@FunctionalInterface

15

public interface DeliverCallback {

16

/**

17

* Called when a message is delivered to the consumer

18

* @param consumerTag - Consumer identifier

19

* @param delivery - Message delivery information

20

*/

21

void handle(String consumerTag, Delivery delivery) throws IOException;

22

}

23

24

/**

25

* Callback interface for consumer cancellation

26

*/

27

@FunctionalInterface

28

public interface CancelCallback {

29

/**

30

* Called when the consumer is cancelled

31

* @param consumerTag - Consumer identifier that was cancelled

32

*/

33

void handle(String consumerTag) throws IOException;

34

}

35

36

/**

37

* Callback interface for consumer shutdown signals

38

*/

39

@FunctionalInterface

40

public interface ConsumerShutdownSignalCallback {

41

/**

42

* Called when the consumer receives a shutdown signal

43

* @param consumerTag - Consumer identifier

44

* @param sig - Shutdown signal with reason and details

45

*/

46

void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

47

}

48

```

49

50

**Usage Examples:**

51

52

```java

53

// Lambda-based message handling

54

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

55

String message = new String(delivery.getBody(), "UTF-8");

56

System.out.println("[" + consumerTag + "] Received: " + message);

57

58

// Process based on message properties

59

AMQP.BasicProperties props = delivery.getProperties();

60

if ("urgent".equals(props.getType())) {

61

handleUrgentMessage(message);

62

} else {

63

handleRegularMessage(message);

64

}

65

};

66

67

CancelCallback cancelCallback = consumerTag -> {

68

System.out.println("Consumer " + consumerTag + " was cancelled");

69

// Cleanup resources

70

cleanupConsumerResources(consumerTag);

71

};

72

73

ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {

74

System.out.println("Consumer " + consumerTag + " shutdown: " + sig.getReason());

75

if (!sig.isInitiatedByApplication()) {

76

// Handle unexpected shutdown

77

scheduleReconnection();

78

}

79

};

80

81

// Start consuming with callbacks

82

String consumerTag = channel.basicConsume("work.queue", false,

83

deliverCallback, cancelCallback, shutdownCallback);

84

```

85

86

```java

87

// Method reference usage

88

public class MessageProcessor {

89

public void handleDelivery(String consumerTag, Delivery delivery) throws IOException {

90

// Process message

91

String message = new String(delivery.getBody(), "UTF-8");

92

processBusinessLogic(message);

93

94

// Manual acknowledgment

95

Channel channel = ((Consumer) this).getChannel();

96

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

97

}

98

99

public void handleCancel(String consumerTag) throws IOException {

100

System.out.println("Consumer cancelled: " + consumerTag);

101

}

102

}

103

104

MessageProcessor processor = new MessageProcessor();

105

channel.basicConsume("queue", false, processor::handleDelivery, processor::handleCancel);

106

```

107

108

### Consumer Interface

109

110

Full consumer interface for handling all consumer events.

111

112

```java { .api }

113

/**

114

* Interface for implementing message consumers

115

*/

116

public interface Consumer {

117

/**

118

* Called when a message is delivered

119

* @param consumerTag - Consumer identifier

120

* @param envelope - Delivery envelope with routing info

121

* @param properties - Message properties

122

* @param body - Message body

123

*/

124

void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;

125

126

/**

127

* Called when the consumer is cancelled by the server

128

* @param consumerTag - Consumer identifier

129

*/

130

void handleCancel(String consumerTag) throws IOException;

131

132

/**

133

* Called when basicCancel is called

134

* @param consumerTag - Consumer identifier

135

*/

136

void handleCancelOk(String consumerTag);

137

138

/**

139

* Called when the consumer is registered

140

* @param consumerTag - Consumer identifier

141

*/

142

void handleConsumeOk(String consumerTag);

143

144

/**

145

* Called when basicRecover is called

146

* @param consumerTag - Consumer identifier

147

*/

148

void handleRecoverOk(String consumerTag);

149

150

/**

151

* Called when a shutdown signal is received

152

* @param consumerTag - Consumer identifier

153

* @param sig - Shutdown signal

154

*/

155

void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

156

157

/**

158

* Get the consumer tag

159

* @return Consumer identifier

160

*/

161

String getConsumerTag();

162

}

163

```

164

165

### DefaultConsumer

166

167

Default implementation of the Consumer interface providing base functionality.

168

169

```java { .api }

170

/**

171

* Default implementation of Consumer interface.

172

* Extends this class and override methods as needed.

173

*/

174

public class DefaultConsumer implements Consumer {

175

/**

176

* Constructor taking a channel

177

* @param channel - Channel for this consumer

178

*/

179

public DefaultConsumer(Channel channel);

180

181

/**

182

* Get the channel this consumer is associated with

183

* @return Channel instance

184

*/

185

public Channel getChannel();

186

187

/**

188

* Get the consumer tag

189

* @return Consumer tag string

190

*/

191

public String getConsumerTag();

192

193

/**

194

* Set the consumer tag (called by the library)

195

* @param consumerTag - Consumer identifier

196

*/

197

public void setConsumerTag(String consumerTag);

198

199

// Default implementations of Consumer interface methods

200

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;

201

public void handleCancel(String consumerTag) throws IOException;

202

public void handleCancelOk(String consumerTag);

203

public void handleConsumeOk(String consumerTag);

204

public void handleRecoverOk(String consumerTag);

205

public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

206

}

207

```

208

209

**Usage Examples:**

210

211

```java

212

// Custom consumer extending DefaultConsumer

213

public class WorkQueueConsumer extends DefaultConsumer {

214

215

public WorkQueueConsumer(Channel channel) {

216

super(channel);

217

}

218

219

@Override

220

public void handleDelivery(String consumerTag, Envelope envelope,

221

AMQP.BasicProperties properties, byte[] body) throws IOException {

222

String message = new String(body, "UTF-8");

223

224

try {

225

// Simulate work

226

processWork(message);

227

228

// Acknowledge successful processing

229

getChannel().basicAck(envelope.getDeliveryTag(), false);

230

231

} catch (Exception e) {

232

System.err.println("Error processing message: " + e.getMessage());

233

234

// Reject and requeue for retry

235

getChannel().basicNack(envelope.getDeliveryTag(), false, true);

236

}

237

}

238

239

@Override

240

public void handleCancel(String consumerTag) throws IOException {

241

System.out.println("Consumer cancelled: " + consumerTag);

242

// Perform cleanup

243

cleanup();

244

}

245

246

@Override

247

public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

248

System.out.println("Consumer shutdown: " + sig.getReason());

249

if (!sig.isInitiatedByApplication()) {

250

// Handle unexpected shutdown

251

reconnect();

252

}

253

}

254

255

private void processWork(String message) throws Exception {

256

// Business logic here

257

Thread.sleep(1000); // Simulate processing time

258

System.out.println("Processed: " + message);

259

}

260

261

private void cleanup() {

262

// Cleanup resources

263

}

264

265

private void reconnect() {

266

// Reconnection logic

267

}

268

}

269

270

// Use the custom consumer

271

Channel channel = connection.createChannel();

272

WorkQueueConsumer consumer = new WorkQueueConsumer(channel);

273

channel.basicConsume("work.queue", false, consumer);

274

```

275

276

```java

277

// Simple consumer for logging messages

278

public class LoggingConsumer extends DefaultConsumer {

279

280

public LoggingConsumer(Channel channel) {

281

super(channel);

282

}

283

284

@Override

285

public void handleDelivery(String consumerTag, Envelope envelope,

286

AMQP.BasicProperties properties, byte[] body) throws IOException {

287

String message = new String(body, "UTF-8");

288

String routingKey = envelope.getRoutingKey();

289

String exchange = envelope.getExchange();

290

291

System.out.printf("[%s] %s->%s: %s%n",

292

consumerTag, exchange, routingKey, message);

293

294

// Auto-acknowledge for logging (fire-and-forget)

295

getChannel().basicAck(envelope.getDeliveryTag(), false);

296

}

297

}

298

299

channel.basicConsume("logs.queue", false, new LoggingConsumer(channel));

300

```

301

302

### Consumer Exception Handling

303

304

```java { .api }

305

/**

306

* Exception thrown when a consumer is cancelled

307

*/

308

public class ConsumerCancelledException extends RuntimeException {

309

public ConsumerCancelledException();

310

}

311

```

312

313

**Usage Examples:**

314

315

```java

316

// Handling consumer cancellation in application code

317

try {

318

// Consumer processing logic

319

while (isRunning) {

320

// Process messages or wait

321

Thread.sleep(1000);

322

}

323

} catch (ConsumerCancelledException e) {

324

System.out.println("Consumer was cancelled");

325

// Restart consumer or exit gracefully

326

restartConsumer();

327

}

328

```

329

330

### Advanced Consumer Patterns

331

332

**Multi-threaded Consumer:**

333

334

```java

335

public class ThreadedConsumer extends DefaultConsumer {

336

private final ExecutorService executor;

337

338

public ThreadedConsumer(Channel channel, ExecutorService executor) {

339

super(channel);

340

this.executor = executor;

341

}

342

343

@Override

344

public void handleDelivery(String consumerTag, Envelope envelope,

345

AMQP.BasicProperties properties, byte[] body) throws IOException {

346

347

// Submit to thread pool for processing

348

executor.submit(() -> {

349

try {

350

String message = new String(body, "UTF-8");

351

processMessage(message);

352

353

// Acknowledge in the callback thread

354

getChannel().basicAck(envelope.getDeliveryTag(), false);

355

356

} catch (Exception e) {

357

try {

358

getChannel().basicNack(envelope.getDeliveryTag(), false, true);

359

} catch (IOException ex) {

360

System.err.println("Failed to nack message: " + ex.getMessage());

361

}

362

}

363

});

364

}

365

}

366

```

367

368

**Retry Logic Consumer:**

369

370

```java

371

public class RetryConsumer extends DefaultConsumer {

372

private static final int MAX_RETRIES = 3;

373

374

public RetryConsumer(Channel channel) {

375

super(channel);

376

}

377

378

@Override

379

public void handleDelivery(String consumerTag, Envelope envelope,

380

AMQP.BasicProperties properties, byte[] body) throws IOException {

381

382

Map<String, Object> headers = properties.getHeaders();

383

int retryCount = headers != null && headers.containsKey("x-retry-count") ?

384

(Integer) headers.get("x-retry-count") : 0;

385

386

try {

387

String message = new String(body, "UTF-8");

388

processMessage(message);

389

getChannel().basicAck(envelope.getDeliveryTag(), false);

390

391

} catch (Exception e) {

392

if (retryCount < MAX_RETRIES) {

393

// Republish with incremented retry count

394

republishWithRetry(envelope, properties, body, retryCount + 1);

395

getChannel().basicAck(envelope.getDeliveryTag(), false);

396

} else {

397

// Send to dead letter queue or discard

398

getChannel().basicNack(envelope.getDeliveryTag(), false, false);

399

}

400

}

401

}

402

403

private void republishWithRetry(Envelope envelope, AMQP.BasicProperties props,

404

byte[] body, int retryCount) throws IOException {

405

406

Map<String, Object> headers = new HashMap<>(props.getHeaders() != null ? props.getHeaders() : new HashMap<>());

407

headers.put("x-retry-count", retryCount);

408

409

AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()

410

.headers(headers)

411

.contentType(props.getContentType())

412

.deliveryMode(props.getDeliveryMode())

413

.build();

414

415

// Republish to retry exchange/queue

416

getChannel().basicPublish("retry.exchange", envelope.getRoutingKey(), newProps, body);

417

}

418

}

419

```

420

421

## Types

422

423

### Consumer-Related Types

424

425

```java { .api }

426

// Consumer tag management

427

public interface ConsumerTagSupplier {

428

String get();

429

}

430

431

// Message delivery data

432

public class Delivery {

433

public Envelope getEnvelope();

434

public AMQP.BasicProperties getProperties();

435

public byte[] getBody();

436

}

437

438

// Envelope with message routing information

439

public class Envelope {

440

public long getDeliveryTag();

441

public boolean isRedeliver();

442

public String getExchange();

443

public String getRoutingKey();

444

}

445

```