or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdclient-management.mdindex.mdmessage-consumption.mdmessage-production.mdmessage-reading.mdschema-serialization.mdtransaction-support.md

message-consumption.mddocs/

0

# Message Consumption

1

2

Subscribing to topics with various subscription types, acknowledgment patterns, message processing strategies, and advanced consumption features.

3

4

## Capabilities

5

6

### Consumer Interface

7

8

Core interface for consuming messages from Pulsar topics.

9

10

```java { .api }

11

/**

12

* Interface for consuming messages from topics

13

* Thread-safe and supports various subscription types and acknowledgment patterns

14

*/

15

interface Consumer<T> extends Closeable {

16

/** Get topic name */

17

String getTopic();

18

19

/** Get subscription name */

20

String getSubscription();

21

22

/** Get consumer name */

23

String getConsumerName();

24

25

/** Receive message synchronously (blocks until message available) */

26

Message<T> receive() throws PulsarClientException;

27

28

/** Receive message asynchronously */

29

CompletableFuture<Message<T>> receiveAsync();

30

31

/** Receive message with timeout */

32

Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException;

33

34

/** Batch receive messages synchronously */

35

Messages<T> batchReceive() throws PulsarClientException;

36

37

/** Batch receive messages asynchronously */

38

CompletableFuture<Messages<T>> batchReceiveAsync();

39

40

/** Acknowledge message receipt */

41

void acknowledge(Message<?> message) throws PulsarClientException;

42

43

/** Acknowledge message by MessageId */

44

void acknowledge(MessageId messageId) throws PulsarClientException;

45

46

/** Acknowledge message asynchronously */

47

CompletableFuture<Void> acknowledgeAsync(Message<?> message);

48

49

/** Acknowledge message by MessageId asynchronously */

50

CompletableFuture<Void> acknowledgeAsync(MessageId messageId);

51

52

/** Acknowledge all messages up to and including specified message */

53

void acknowledgeCumulative(Message<?> message) throws PulsarClientException;

54

55

/** Acknowledge all messages up to and including specified MessageId */

56

void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;

57

58

/** Acknowledge cumulatively asynchronously */

59

CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);

60

61

/** Acknowledge cumulatively by MessageId asynchronously */

62

CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);

63

64

/** Negative acknowledge message (triggers redelivery) */

65

void negativeAcknowledge(Message<?> message);

66

67

/** Negative acknowledge by MessageId */

68

void negativeAcknowledge(MessageId messageId);

69

70

/** Negative acknowledge batch of messages */

71

void negativeAcknowledge(Messages<?> messages);

72

73

/** Reconsume message later with delay */

74

void reconsumeLater(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;

75

76

/** Reconsume message later with delay and custom properties */

77

void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;

78

79

/** Reconsume batch of messages later with delay */

80

void reconsumeLater(Messages<?> messages, long delay, TimeUnit unit) throws PulsarClientException;

81

82

/** Reconsume message later cumulatively with delay */

83

void reconsumeLaterCumulative(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;

84

85

/** Reconsume message later cumulatively with delay and custom properties */

86

void reconsumeLaterCumulative(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;

87

88

/** Reconsume message later asynchronously */

89

CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delay, TimeUnit unit);

90

91

/** Reconsume message later asynchronously with custom properties */

92

CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);

93

94

/** Reconsume batch of messages later asynchronously */

95

CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delay, TimeUnit unit);

96

97

/** Reconsume message later cumulatively asynchronously */

98

CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delay, TimeUnit unit);

99

100

/** Reconsume message later cumulatively asynchronously with custom properties */

101

CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);

102

103

/** Get consumer statistics */

104

ConsumerStats getStats();

105

106

/** Unsubscribe from topic */

107

void unsubscribe() throws PulsarClientException;

108

109

/** Unsubscribe asynchronously */

110

CompletableFuture<Void> unsubscribeAsync();

111

112

/** Check if consumer is connected */

113

boolean isConnected();

114

115

/** Get timestamp of last disconnection */

116

long getLastDisconnectedTimestamp();

117

118

/** Pause message delivery */

119

void pause();

120

121

/** Resume message delivery */

122

void resume();

123

124

/** Check if consumer is paused */

125

boolean isPaused();

126

127

/** Seek to specific message ID */

128

void seek(MessageId messageId) throws PulsarClientException;

129

130

/** Seek using custom function */

131

void seek(Function<String, Object> function) throws PulsarClientException;

132

133

/** Seek to specific message ID asynchronously */

134

CompletableFuture<Void> seekAsync(MessageId messageId);

135

136

/** Seek to specific timestamp */

137

void seek(long timestamp) throws PulsarClientException;

138

139

/** Seek to specific timestamp asynchronously */

140

CompletableFuture<Void> seekAsync(long timestamp);

141

142

/** Seek using custom function asynchronously */

143

CompletableFuture<Void> seekAsync(Function<String, Object> function);

144

145

/** Get last MessageId (deprecated) */

146

@Deprecated

147

CompletableFuture<MessageId> getLastMessageIdAsync();

148

149

/** Get last message IDs for all partitions */

150

List<TopicMessageId> getLastMessageIds() throws PulsarClientException;

151

152

/** Get last message IDs for all partitions asynchronously */

153

CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();

154

155

/** Close consumer */

156

void close() throws PulsarClientException;

157

158

/** Close consumer asynchronously */

159

CompletableFuture<Void> closeAsync();

160

}

161

```

162

163

**Usage Examples:**

164

165

```java

166

import org.apache.pulsar.client.api.*;

167

168

// Simple message consumption

169

Consumer<String> consumer = client.newConsumer(Schema.STRING)

170

.topic("my-topic")

171

.subscriptionName("my-subscription")

172

.subscribe();

173

174

while (true) {

175

Message<String> message = consumer.receive();

176

try {

177

System.out.println("Received: " + message.getValue());

178

consumer.acknowledge(message);

179

} catch (Exception e) {

180

consumer.negativeAcknowledge(message);

181

}

182

}

183

184

// Asynchronous consumption

185

consumer.receiveAsync()

186

.thenAccept(message -> {

187

System.out.println("Async received: " + message.getValue());

188

consumer.acknowledgeAsync(message);

189

})

190

.exceptionally(throwable -> {

191

System.err.println("Receive failed: " + throwable.getMessage());

192

return null;

193

});

194

195

// Batch consumption

196

Messages<String> messages = consumer.batchReceive();

197

for (Message<String> message : messages) {

198

System.out.println("Batch message: " + message.getValue());

199

}

200

consumer.acknowledge(messages);

201

```

202

203

### ConsumerBuilder Configuration

204

205

Builder interface for configuring and creating Consumer instances.

206

207

```java { .api }

208

/**

209

* Builder for configuring and creating Consumer instances

210

*/

211

interface ConsumerBuilder<T> extends Serializable, Cloneable {

212

/** Create and subscribe consumer synchronously */

213

Consumer<T> subscribe() throws PulsarClientException;

214

215

/** Create and subscribe consumer asynchronously */

216

CompletableFuture<Consumer<T>> subscribeAsync();

217

218

/** Clone the builder */

219

ConsumerBuilder<T> clone();

220

221

/** Set topic names to subscribe to */

222

ConsumerBuilder<T> topic(String... topicNames);

223

224

/** Set list of topic names */

225

ConsumerBuilder<T> topics(List<String> topicNames);

226

227

/** Set topic pattern for dynamic topic discovery */

228

ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);

229

230

/** Set topic pattern with regex subscription mode */

231

ConsumerBuilder<T> topicsPattern(String topicsPattern, RegexSubscriptionMode regexSubscriptionMode);

232

233

/** Set subscription name (required) */

234

ConsumerBuilder<T> subscriptionName(String subscriptionName);

235

236

/** Set subscription type */

237

ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);

238

239

/** Set subscription mode */

240

ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);

241

242

/** Set subscription initial position */

243

ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);

244

245

/** Set key-shared policy for Key_Shared subscription */

246

ConsumerBuilder<T> keySharedPolicy(KeySharedPolicy keySharedPolicy);

247

248

/** Set message listener for push-style consumption */

249

ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);

250

251

/** Set message listener executor */

252

ConsumerBuilder<T> messageListenerExecutor(Executor executor);

253

254

/** Set consumer event listener */

255

ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener);

256

257

/** Set receiver queue size (default: 1000) */

258

ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);

259

260

/** Set acknowledgment group time */

261

ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit);

262

263

/** Set replication clusters */

264

ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState);

265

266

/** Set max total receiver queue size across partitions */

267

ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);

268

269

/** Set consumer name */

270

ConsumerBuilder<T> consumerName(String consumerName);

271

272

/** Set acknowledgment timeout */

273

ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);

274

275

/** Set tick duration for acknowledgment timeout */

276

ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

277

278

/** Set negative acknowledgment redelivery delay */

279

ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);

280

281

/** Set default redelivery backoff */

282

ConsumerBuilder<T> defaultRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff);

283

284

/** Set dead letter policy */

285

ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);

286

287

/** Set retry enable */

288

ConsumerBuilder<T> enableRetry(boolean retryEnable);

289

290

/** Set batch receive policy */

291

ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePolicy);

292

293

/** Enable batch index acknowledgment */

294

ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgment);

295

296

/** Set max pending chunked messages */

297

ConsumerBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage);

298

299

/** Set auto acknowledge chunked messages timeout */

300

ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull);

301

302

/** Set expire time of incomplete chunked messages */

303

ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);

304

305

/** Set priority level */

306

ConsumerBuilder<T> priorityLevel(int priorityLevel);

307

308

/** Add property */

309

ConsumerBuilder<T> property(String key, String value);

310

311

/** Set properties */

312

ConsumerBuilder<T> properties(Map<String, String> properties);

313

314

/** Add consumer interceptor */

315

ConsumerBuilder<T> intercept(ConsumerInterceptor<T> interceptor);

316

317

/** Set start message ID inclusive */

318

ConsumerBuilder<T> startMessageIdInclusive();

319

320

/** Enable pooling messages */

321

ConsumerBuilder<T> poolMessages(boolean poolMessages);

322

323

/** Set start paused */

324

ConsumerBuilder<T> startPaused(boolean paused);

325

326

/** Set auto scale receiver queue size */

327

ConsumerBuilder<T> autoScaleReceiverQueueSizeEnabled(boolean enabled);

328

329

/** Set topic consumer builder */

330

ConsumerBuilder<T> topicConsumerBuilder(String topicName, TopicConsumerBuilder<T> topicConsumerBuilder);

331

}

332

```

333

334

### Encryption Configuration

335

336

Configure message decryption for consumers.

337

338

```java { .api }

339

interface ConsumerBuilder<T> {

340

/** Set crypto key reader */

341

ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);

342

343

/** Set default crypto key reader using private key path */

344

ConsumerBuilder<T> defaultCryptoKeyReader(String privateKeyPath);

345

346

/** Set default crypto key reader using key store */

347

ConsumerBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);

348

349

/** Set crypto failure action */

350

ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);

351

}

352

```

353

354

**Consumer Configuration Examples:**

355

356

```java

357

// Basic consumer with Exclusive subscription

358

Consumer<String> consumer = client.newConsumer(Schema.STRING)

359

.topic("my-topic")

360

.subscriptionName("my-exclusive-sub")

361

.subscriptionType(SubscriptionType.Exclusive)

362

.subscribe();

363

364

// Shared subscription with multiple consumers

365

Consumer<String> consumer = client.newConsumer(Schema.STRING)

366

.topics(Arrays.asList("topic1", "topic2", "topic3"))

367

.subscriptionName("my-shared-sub")

368

.subscriptionType(SubscriptionType.Shared)

369

.receiverQueueSize(1000)

370

.consumerName("consumer-1")

371

.subscribe();

372

373

// Key-shared subscription with custom policy

374

Consumer<String> consumer = client.newConsumer(Schema.STRING)

375

.topic("partitioned-topic")

376

.subscriptionName("key-shared-sub")

377

.subscriptionType(SubscriptionType.Key_Shared)

378

.keySharedPolicy(KeySharedPolicy.stickyHashRange())

379

.subscribe();

380

381

// Pattern subscription

382

Consumer<String> consumer = client.newConsumer(Schema.STRING)

383

.topicsPattern("persistent://public/default/topic-.*")

384

.subscriptionName("pattern-sub")

385

.subscribe();

386

387

// Consumer with message listener

388

Consumer<String> consumer = client.newConsumer(Schema.STRING)

389

.topic("listener-topic")

390

.subscriptionName("listener-sub")

391

.messageListener((consumer, message) -> {

392

System.out.println("Received: " + message.getValue());

393

try {

394

consumer.acknowledge(message);

395

} catch (PulsarClientException e) {

396

consumer.negativeAcknowledge(message);

397

}

398

})

399

.subscribe();

400

```

401

402

### Batch Message Processing

403

404

Interface for handling batches of messages.

405

406

```java { .api }

407

/**

408

* Container for batch of messages

409

*/

410

interface Messages<T> extends Iterable<Message<T>> {

411

/** Get number of messages in batch */

412

int size();

413

414

/** Get list of message values */

415

List<T> stream();

416

417

/** Iterator over messages */

418

Iterator<Message<T>> iterator();

419

}

420

421

/**

422

* Batch receive policy configuration

423

*/

424

class BatchReceivePolicy {

425

/** Create builder for batch receive policy */

426

static BatchReceivePolicy.Builder builder();

427

428

/** Default batch receive policy */

429

static final BatchReceivePolicy DEFAULT_POLICY;

430

431

/** Get maximum number of messages in batch */

432

int getMaxNumMessages();

433

434

/** Get maximum number of bytes in batch */

435

long getMaxNumBytes();

436

437

/** Get batch timeout in milliseconds */

438

long getTimeoutMs();

439

440

interface Builder {

441

/** Set maximum messages in batch */

442

Builder maxNumMessages(int maxNumMessages);

443

444

/** Set maximum bytes in batch */

445

Builder maxNumBytes(long maxNumBytes);

446

447

/** Set batch timeout */

448

Builder timeout(long timeout, TimeUnit timeUnit);

449

450

/** Build the policy */

451

BatchReceivePolicy build();

452

}

453

}

454

```

455

456

**Batch Processing Examples:**

457

458

```java

459

// Configure batch receive policy

460

BatchReceivePolicy batchPolicy = BatchReceivePolicy.builder()

461

.maxNumMessages(100)

462

.maxNumBytes(1024 * 1024)

463

.timeout(100, TimeUnit.MILLISECONDS)

464

.build();

465

466

Consumer<String> consumer = client.newConsumer(Schema.STRING)

467

.topic("batch-topic")

468

.subscriptionName("batch-sub")

469

.batchReceivePolicy(batchPolicy)

470

.subscribe();

471

472

// Receive and process batch

473

Messages<String> messages = consumer.batchReceive();

474

for (Message<String> message : messages) {

475

System.out.println("Processing: " + message.getValue());

476

}

477

// Acknowledge entire batch

478

consumer.acknowledge(messages);

479

```

480

481

### Consumer Statistics

482

483

Interface for accessing consumer statistics and metrics.

484

485

```java { .api }

486

/**

487

* Consumer statistics interface

488

*/

489

interface ConsumerStats {

490

/** Number of messages received */

491

long getNumMsgsReceived();

492

493

/** Number of bytes received */

494

long getNumBytesReceived();

495

496

/** Receive rate in messages per second */

497

double getReceiveMsgsRate();

498

499

/** Receive rate in bytes per second */

500

double getReceiveBytesRate();

501

502

/** Number of acknowledgments sent */

503

long getNumAcksSent();

504

505

/** Number of failed acknowledgments */

506

long getNumAcksFailed();

507

508

/** Total messages received since creation */

509

long getTotalMsgsReceived();

510

511

/** Total bytes received since creation */

512

long getTotalBytesReceived();

513

514

/** Total receive failures since creation */

515

long getTotalReceivedFailed();

516

517

/** Total acknowledgments sent since creation */

518

long getTotalAcksSent();

519

520

/** Total failed acknowledgments since creation */

521

long getTotalAcksFailed();

522

523

/** Available permits for receiving */

524

int getAvailablePermits();

525

526

/** Number of unacknowledged messages */

527

int getNumUnackedMessages();

528

}

529

```

530

531

### Dead Letter Queue Configuration

532

533

Configuration for handling failed message processing.

534

535

```java { .api }

536

/**

537

* Dead letter queue policy configuration

538

*/

539

class DeadLetterPolicy {

540

/** Create builder for dead letter policy */

541

static DeadLetterPolicy.Builder builder();

542

543

/** Get maximum redelivery count */

544

int getMaxRedeliverCount();

545

546

/** Get retry letter topic name */

547

String getRetryLetterTopic();

548

549

/** Get dead letter topic name */

550

String getDeadLetterTopic();

551

552

/** Get initial subscription name */

553

String getInitialSubscriptionName();

554

555

interface Builder {

556

/** Set maximum redelivery count */

557

Builder maxRedeliverCount(int maxRedeliverCount);

558

559

/** Set retry letter topic */

560

Builder retryLetterTopic(String retryLetterTopic);

561

562

/** Set dead letter topic */

563

Builder deadLetterTopic(String deadLetterTopic);

564

565

/** Set initial subscription name */

566

Builder initialSubscriptionName(String initialSubscriptionName);

567

568

/** Build the policy */

569

DeadLetterPolicy build();

570

}

571

}

572

```

573

574

**Dead Letter Queue Example:**

575

576

```java

577

// Configure dead letter policy

578

DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()

579

.maxRedeliverCount(3)

580

.retryLetterTopic("my-topic-retry")

581

.deadLetterTopic("my-topic-dlq")

582

.build();

583

584

Consumer<String> consumer = client.newConsumer(Schema.STRING)

585

.topic("my-topic")

586

.subscriptionName("my-sub")

587

.subscriptionType(SubscriptionType.Shared)

588

.deadLetterPolicy(deadLetterPolicy)

589

.enableRetry(true)

590

.subscribe();

591

```

592

593

## Supporting Types and Enums

594

595

```java { .api }

596

enum SubscriptionType {

597

/** Single consumer */

598

Exclusive,

599

/** Multiple consumers, round-robin */

600

Shared,

601

/** Multiple consumers, active/standby */

602

Failover,

603

/** Multiple consumers, key-based routing */

604

Key_Shared

605

}

606

607

enum SubscriptionMode {

608

/** Persistent subscription */

609

Durable,

610

/** Ephemeral subscription */

611

NonDurable

612

}

613

614

enum SubscriptionInitialPosition {

615

/** Start from latest message */

616

Latest,

617

/** Start from earliest message */

618

Earliest

619

}

620

621

enum RegexSubscriptionMode {

622

/** Persistent topics only */

623

PersistentOnly,

624

/** Non-persistent topics only */

625

NonPersistentOnly,

626

/** All topic types */

627

AllTopics

628

}

629

630

enum ConsumerCryptoFailureAction {

631

/** Fail the receive operation */

632

FAIL,

633

/** Discard the message */

634

DISCARD,

635

/** Consume message as-is */

636

CONSUME

637

}

638

639

interface MessageListener<T> {

640

/** Handle received message */

641

void received(Consumer<T> consumer, Message<T> msg);

642

}

643

644

interface ConsumerEventListener {

645

/** Consumer became active */

646

void becameActive(Consumer<?> consumer, int partitionId);

647

648

/** Consumer became inactive */

649

void becameInactive(Consumer<?> consumer, int partitionId);

650

}

651

652

interface RedeliveryBackoff {

653

/** Get next backoff delay */

654

long next(int redeliveryCount);

655

}

656

```