or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconfirms-returns.mdconnection-channel.mdconsumer-api.mdconsuming.mderror-recovery.mdindex.mdobservability.mdpublishing.mdrpc.md

consuming.mddocs/

0

# Message Consuming

1

2

Operations for consuming messages from queues and managing queue topology. Consuming allows applications to receive messages from RabbitMQ queues either by pushing messages to consumers or by polling for messages.

3

4

## Capabilities

5

6

### Queue Management

7

8

Operations for declaring, deleting, binding, and purging queues.

9

10

```java { .api }

11

/**

12

* Declare a queue with default settings

13

* @param queue - Queue name (empty string for server-generated name)

14

* @return Queue.DeclareOk with queue information

15

*/

16

AMQP.Queue.DeclareOk queueDeclare(String queue) throws IOException;

17

18

/**

19

* Declare a queue with full options

20

* @param queue - Queue name (empty string for server-generated name)

21

* @param durable - Queue survives server restarts

22

* @param exclusive - Queue is exclusive to this connection

23

* @param autoDelete - Queue deleted when no longer in use

24

* @param arguments - Optional queue arguments

25

*/

26

AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

27

28

/**

29

* Declare a queue passively (check if exists without creating)

30

* @param queue - Queue name

31

*/

32

AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

33

34

/**

35

* Delete a queue

36

* @param queue - Queue name

37

*/

38

AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;

39

40

/**

41

* Delete a queue with conditions

42

* @param queue - Queue name

43

* @param ifUnused - Only delete if queue has no consumers

44

* @param ifEmpty - Only delete if queue is empty

45

*/

46

AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

47

48

/**

49

* Bind a queue to an exchange

50

* @param queue - Queue name

51

* @param exchange - Exchange name

52

* @param routingKey - Routing key for binding

53

*/

54

AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

55

56

/**

57

* Bind queue with arguments

58

* @param queue - Queue name

59

* @param exchange - Exchange name

60

* @param routingKey - Routing key

61

* @param arguments - Binding arguments

62

*/

63

AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

64

65

/**

66

* Unbind a queue from an exchange

67

* @param queue - Queue name

68

* @param exchange - Exchange name

69

* @param routingKey - Routing key to unbind

70

*/

71

AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

72

73

/**

74

* Unbind queue with arguments

75

* @param queue - Queue name

76

* @param exchange - Exchange name

77

* @param routingKey - Routing key

78

* @param arguments - Binding arguments to match

79

*/

80

AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

81

82

/**

83

* Purge all messages from a queue

84

* @param queue - Queue name

85

* @return Queue.PurgeOk with message count purged

86

*/

87

AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;

88

```

89

90

**Usage Examples:**

91

92

```java

93

Channel channel = connection.createChannel();

94

95

// Declare different types of queues

96

channel.queueDeclare("work.queue", true, false, false, null); // Durable work queue

97

channel.queueDeclare("temp.queue", false, true, true, null); // Temporary exclusive queue

98

99

// Server-generated queue name

100

AMQP.Queue.DeclareOk result = channel.queueDeclare("", false, true, true, null);

101

String queueName = result.getQueue();

102

103

// Queue with arguments (TTL, max length, etc.)

104

Map<String, Object> args = new HashMap<>();

105

args.put("x-message-ttl", 60000); // Message TTL

106

args.put("x-max-length", 1000); // Max queue length

107

args.put("x-overflow", "reject-publish"); // Overflow behavior

108

channel.queueDeclare("limited.queue", true, false, false, args);

109

```

110

111

```java

112

// Bind queue to exchanges

113

channel.queueBind("user.notifications", "user.events", "user.created");

114

channel.queueBind("user.notifications", "user.events", "user.updated");

115

116

// Topic binding with wildcards

117

channel.queueBind("error.logs", "logs.topic", "*.error.*");

118

channel.queueBind("all.logs", "logs.topic", "#");

119

120

// Headers binding

121

Map<String, Object> bindingArgs = new HashMap<>();

122

bindingArgs.put("x-match", "any");

123

bindingArgs.put("type", "notification");

124

bindingArgs.put("priority", "high");

125

channel.queueBind("priority.queue", "headers.exchange", "", bindingArgs);

126

```

127

128

### Push-Based Consuming

129

130

Asynchronous message consumption using callbacks where messages are pushed to consumers.

131

132

```java { .api }

133

/**

134

* Start consuming messages with callbacks

135

* @param queue - Queue name to consume from

136

* @param autoAck - Auto-acknowledge messages

137

* @param deliverCallback - Callback for message delivery

138

* @param cancelCallback - Callback for consumer cancellation

139

* @return Consumer tag for managing the consumer

140

*/

141

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

142

143

/**

144

* Start consuming with additional options

145

* @param queue - Queue name

146

* @param autoAck - Auto-acknowledge messages

147

* @param consumerTag - Custom consumer tag (empty string for server-generated)

148

* @param deliverCallback - Message delivery callback

149

* @param cancelCallback - Consumer cancellation callback

150

*/

151

String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

152

153

/**

154

* Start consuming with full options

155

* @param queue - Queue name

156

* @param autoAck - Auto-acknowledge messages

157

* @param consumerTag - Custom consumer tag

158

* @param noLocal - Don't deliver messages published by this connection

159

* @param exclusive - Exclusive consumer

160

* @param arguments - Consumer arguments

161

* @param deliverCallback - Message delivery callback

162

* @param cancelCallback - Consumer cancellation callback

163

*/

164

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

165

166

/**

167

* Start consuming with Consumer interface

168

* @param queue - Queue name

169

* @param consumer - Consumer implementation

170

*/

171

String basicConsume(String queue, Consumer consumer) throws IOException;

172

173

/**

174

* Start consuming with Consumer and options

175

* @param queue - Queue name

176

* @param autoAck - Auto-acknowledge messages

177

* @param consumer - Consumer implementation

178

*/

179

String basicConsume(String queue, boolean autoAck, Consumer consumer) throws IOException;

180

181

/**

182

* Cancel a consumer

183

* @param consumerTag - Consumer tag to cancel

184

*/

185

AMQP.Basic.CancelOk basicCancel(String consumerTag) throws IOException;

186

```

187

188

**Usage Examples:**

189

190

```java

191

// Simple callback-based consumer

192

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

193

String message = new String(delivery.getBody(), "UTF-8");

194

System.out.println("Received: " + message);

195

196

// Access message metadata

197

Envelope envelope = delivery.getEnvelope();

198

System.out.println("Delivery tag: " + envelope.getDeliveryTag());

199

System.out.println("Exchange: " + envelope.getExchange());

200

System.out.println("Routing key: " + envelope.getRoutingKey());

201

202

// Access message properties

203

AMQP.BasicProperties props = delivery.getProperties();

204

if (props.getContentType() != null) {

205

System.out.println("Content type: " + props.getContentType());

206

}

207

};

208

209

CancelCallback cancelCallback = consumerTag -> {

210

System.out.println("Consumer cancelled: " + consumerTag);

211

};

212

213

String consumerTag = channel.basicConsume("work.queue", true, deliverCallback, cancelCallback);

214

```

215

216

```java

217

// Manual acknowledgment consumer

218

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

219

try {

220

String message = new String(delivery.getBody(), "UTF-8");

221

222

// Process message

223

processMessage(message);

224

225

// Manually acknowledge successful processing

226

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

227

228

} catch (Exception e) {

229

// Reject and requeue on error

230

channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);

231

}

232

};

233

234

channel.basicConsume("work.queue", false, deliverCallback, cancelCallback);

235

```

236

237

```java

238

// Consumer with shutdown signal callback

239

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

240

// Handle delivery

241

};

242

243

CancelCallback cancelCallback = consumerTag -> {

244

System.out.println("Consumer cancelled: " + consumerTag);

245

};

246

247

ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {

248

System.out.println("Consumer shutdown: " + consumerTag);

249

};

250

251

String consumerTag = channel.basicConsume("queue", true,

252

deliverCallback, cancelCallback, shutdownCallback);

253

```

254

255

### Pull-Based Consuming

256

257

Synchronous message consumption by polling for messages.

258

259

```java { .api }

260

/**

261

* Get a single message from queue (polling)

262

* @param queue - Queue name

263

* @param autoAck - Auto-acknowledge the message

264

* @return GetResponse with message data, or null if no message available

265

*/

266

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

267

```

268

269

**Usage Examples:**

270

271

```java

272

// Poll for messages

273

while (true) {

274

GetResponse response = channel.basicGet("work.queue", false);

275

276

if (response != null) {

277

String message = new String(response.getBody(), "UTF-8");

278

System.out.println("Got message: " + message);

279

System.out.println("Messages remaining: " + response.getMessageCount());

280

281

// Process message

282

try {

283

processMessage(message);

284

// Acknowledge after successful processing

285

channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

286

} catch (Exception e) {

287

// Reject and requeue on error

288

channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);

289

}

290

} else {

291

// No messages available

292

Thread.sleep(1000);

293

}

294

}

295

```

296

297

### Message Acknowledgment

298

299

Operations for acknowledging, rejecting, and recovering messages.

300

301

```java { .api }

302

/**

303

* Acknowledge one or more messages

304

* @param deliveryTag - Delivery tag from the message

305

* @param multiple - Acknowledge all messages up to and including this delivery tag

306

*/

307

void basicAck(long deliveryTag, boolean multiple) throws IOException;

308

309

/**

310

* Reject one or more messages with requeue option

311

* @param deliveryTag - Delivery tag from the message

312

* @param multiple - Reject all messages up to and including this delivery tag

313

* @param requeue - Requeue the rejected messages

314

*/

315

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

316

317

/**

318

* Reject a single message

319

* @param deliveryTag - Delivery tag from the message

320

* @param requeue - Requeue the rejected message

321

*/

322

void basicReject(long deliveryTag, boolean requeue) throws IOException;

323

324

/**

325

* Recover unacknowledged messages (redelivers them)

326

* @param requeue - Requeue the messages

327

*/

328

AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

329

330

/**

331

* Recover unacknowledged messages (deprecated)

332

*/

333

AMQP.Basic.RecoverOk basicRecover() throws IOException;

334

```

335

336

**Usage Examples:**

337

338

```java

339

// Single message acknowledgment

340

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

341

342

// Acknowledge multiple messages (up to and including the delivery tag)

343

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);

344

345

// Reject and requeue single message

346

channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);

347

348

// Reject and discard message (send to dead letter exchange if configured)

349

channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);

350

351

// Recover all unacknowledged messages in this channel

352

channel.basicRecover(true);

353

```

354

355

### Quality of Service (QoS)

356

357

Control message prefetching and processing limits.

358

359

```java { .api }

360

/**

361

* Set QoS parameters for the channel

362

* @param prefetchCount - Maximum number of unacknowledged messages

363

*/

364

void basicQos(int prefetchCount) throws IOException;

365

366

/**

367

* Set QoS with size limit

368

* @param prefetchSize - Maximum size of unacknowledged messages (0 = no limit)

369

* @param prefetchCount - Maximum number of unacknowledged messages

370

* @param global - Apply to entire connection vs. just this channel

371

*/

372

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

373

```

374

375

**Usage Examples:**

376

377

```java

378

// Limit to processing 1 message at a time (useful for work queues)

379

channel.basicQos(1);

380

381

// Limit to 10 messages at a time

382

channel.basicQos(10);

383

384

// Set prefetch count and size limits

385

channel.basicQos(0, 5, false); // Max 5 messages, no size limit, per-channel

386

```

387

388

## Types

389

390

### Queue and Consuming Types

391

392

```java { .api }

393

// Queue operation results

394

public static class AMQP.Queue {

395

public static class DeclareOk {

396

public String getQueue(); // Queue name (useful for server-generated names)

397

public int getMessageCount(); // Current message count

398

public int getConsumerCount(); // Current consumer count

399

}

400

401

public static class DeleteOk {

402

public int getMessageCount(); // Number of messages deleted

403

}

404

405

public static class BindOk {

406

// Confirmation of queue binding

407

}

408

409

public static class UnbindOk {

410

// Confirmation of queue unbinding

411

}

412

413

public static class PurgeOk {

414

public int getMessageCount(); // Number of messages purged

415

}

416

}

417

418

// Basic operation results

419

public static class AMQP.Basic {

420

public static class CancelOk {

421

public String getConsumerTag();

422

}

423

424

public static class RecoverOk {

425

// Confirmation of message recovery

426

}

427

}

428

429

// Message delivery information

430

public class Delivery {

431

public Envelope getEnvelope();

432

public AMQP.BasicProperties getProperties();

433

public byte[] getBody();

434

}

435

436

// Message envelope with routing information

437

public class Envelope {

438

public long getDeliveryTag();

439

public boolean isRedeliver();

440

public String getExchange();

441

public String getRoutingKey();

442

}

443

444

// Response from basicGet operation

445

public class GetResponse {

446

public Envelope getEnvelope();

447

public AMQP.BasicProperties getProps();

448

public byte[] getBody();

449

public int getMessageCount(); // Remaining messages in queue

450

}

451

```