0
# Message Consumption
1
2
Subscribing to topics with various subscription types, acknowledgment patterns, message processing strategies, and advanced consumption features.
3
4
## Capabilities
5
6
### Consumer Interface
7
8
Core interface for consuming messages from Pulsar topics.
9
10
```java { .api }
11
/**
12
* Interface for consuming messages from topics
13
* Thread-safe and supports various subscription types and acknowledgment patterns
14
*/
15
interface Consumer<T> extends Closeable {
16
/** Get topic name */
17
String getTopic();
18
19
/** Get subscription name */
20
String getSubscription();
21
22
/** Get consumer name */
23
String getConsumerName();
24
25
/** Receive message synchronously (blocks until message available) */
26
Message<T> receive() throws PulsarClientException;
27
28
/** Receive message asynchronously */
29
CompletableFuture<Message<T>> receiveAsync();
30
31
/** Receive message with timeout */
32
Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException;
33
34
/** Batch receive messages synchronously */
35
Messages<T> batchReceive() throws PulsarClientException;
36
37
/** Batch receive messages asynchronously */
38
CompletableFuture<Messages<T>> batchReceiveAsync();
39
40
/** Acknowledge message receipt */
41
void acknowledge(Message<?> message) throws PulsarClientException;
42
43
/** Acknowledge message by MessageId */
44
void acknowledge(MessageId messageId) throws PulsarClientException;
45
46
/** Acknowledge message asynchronously */
47
CompletableFuture<Void> acknowledgeAsync(Message<?> message);
48
49
/** Acknowledge message by MessageId asynchronously */
50
CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
51
52
/** Acknowledge all messages up to and including specified message */
53
void acknowledgeCumulative(Message<?> message) throws PulsarClientException;
54
55
/** Acknowledge all messages up to and including specified MessageId */
56
void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
57
58
/** Acknowledge cumulatively asynchronously */
59
CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
60
61
/** Acknowledge cumulatively by MessageId asynchronously */
62
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
63
64
/** Negative acknowledge message (triggers redelivery) */
65
void negativeAcknowledge(Message<?> message);
66
67
/** Negative acknowledge by MessageId */
68
void negativeAcknowledge(MessageId messageId);
69
70
/** Negative acknowledge batch of messages */
71
void negativeAcknowledge(Messages<?> messages);
72
73
/** Reconsume message later with delay */
74
void reconsumeLater(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;
75
76
/** Reconsume message later with delay and custom properties */
77
void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;
78
79
/** Reconsume batch of messages later with delay */
80
void reconsumeLater(Messages<?> messages, long delay, TimeUnit unit) throws PulsarClientException;
81
82
/** Reconsume message later cumulatively with delay */
83
void reconsumeLaterCumulative(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;
84
85
/** Reconsume message later cumulatively with delay and custom properties */
86
void reconsumeLaterCumulative(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;
87
88
/** Reconsume message later asynchronously */
89
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delay, TimeUnit unit);
90
91
/** Reconsume message later asynchronously with custom properties */
92
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);
93
94
/** Reconsume batch of messages later asynchronously */
95
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delay, TimeUnit unit);
96
97
/** Reconsume message later cumulatively asynchronously */
98
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delay, TimeUnit unit);
99
100
/** Reconsume message later cumulatively asynchronously with custom properties */
101
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);
102
103
/** Get consumer statistics */
104
ConsumerStats getStats();
105
106
/** Unsubscribe from topic */
107
void unsubscribe() throws PulsarClientException;
108
109
/** Unsubscribe asynchronously */
110
CompletableFuture<Void> unsubscribeAsync();
111
112
/** Check if consumer is connected */
113
boolean isConnected();
114
115
/** Get timestamp of last disconnection */
116
long getLastDisconnectedTimestamp();
117
118
/** Pause message delivery */
119
void pause();
120
121
/** Resume message delivery */
122
void resume();
123
124
/** Check if consumer is paused */
125
boolean isPaused();
126
127
/** Seek to specific message ID */
128
void seek(MessageId messageId) throws PulsarClientException;
129
130
/** Seek using custom function */
131
void seek(Function<String, Object> function) throws PulsarClientException;
132
133
/** Seek to specific message ID asynchronously */
134
CompletableFuture<Void> seekAsync(MessageId messageId);
135
136
/** Seek to specific timestamp */
137
void seek(long timestamp) throws PulsarClientException;
138
139
/** Seek to specific timestamp asynchronously */
140
CompletableFuture<Void> seekAsync(long timestamp);
141
142
/** Seek using custom function asynchronously */
143
CompletableFuture<Void> seekAsync(Function<String, Object> function);
144
145
/** Get last MessageId (deprecated) */
146
@Deprecated
147
CompletableFuture<MessageId> getLastMessageIdAsync();
148
149
/** Get last message IDs for all partitions */
150
List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
151
152
/** Get last message IDs for all partitions asynchronously */
153
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
154
155
/** Close consumer */
156
void close() throws PulsarClientException;
157
158
/** Close consumer asynchronously */
159
CompletableFuture<Void> closeAsync();
160
}
161
```
162
163
**Usage Examples:**
164
165
```java
166
import org.apache.pulsar.client.api.*;
167
168
// Simple message consumption
169
Consumer<String> consumer = client.newConsumer(Schema.STRING)
170
.topic("my-topic")
171
.subscriptionName("my-subscription")
172
.subscribe();
173
174
while (true) {
175
Message<String> message = consumer.receive();
176
try {
177
System.out.println("Received: " + message.getValue());
178
consumer.acknowledge(message);
179
} catch (Exception e) {
180
consumer.negativeAcknowledge(message);
181
}
182
}
183
184
// Asynchronous consumption
185
consumer.receiveAsync()
186
.thenAccept(message -> {
187
System.out.println("Async received: " + message.getValue());
188
consumer.acknowledgeAsync(message);
189
})
190
.exceptionally(throwable -> {
191
System.err.println("Receive failed: " + throwable.getMessage());
192
return null;
193
});
194
195
// Batch consumption
196
Messages<String> messages = consumer.batchReceive();
197
for (Message<String> message : messages) {
198
System.out.println("Batch message: " + message.getValue());
199
}
200
consumer.acknowledge(messages);
201
```
202
203
### ConsumerBuilder Configuration
204
205
Builder interface for configuring and creating Consumer instances.
206
207
```java { .api }
208
/**
209
* Builder for configuring and creating Consumer instances
210
*/
211
interface ConsumerBuilder<T> extends Serializable, Cloneable {
212
/** Create and subscribe consumer synchronously */
213
Consumer<T> subscribe() throws PulsarClientException;
214
215
/** Create and subscribe consumer asynchronously */
216
CompletableFuture<Consumer<T>> subscribeAsync();
217
218
/** Clone the builder */
219
ConsumerBuilder<T> clone();
220
221
/** Set topic names to subscribe to */
222
ConsumerBuilder<T> topic(String... topicNames);
223
224
/** Set list of topic names */
225
ConsumerBuilder<T> topics(List<String> topicNames);
226
227
/** Set topic pattern for dynamic topic discovery */
228
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
229
230
/** Set topic pattern with regex subscription mode */
231
ConsumerBuilder<T> topicsPattern(String topicsPattern, RegexSubscriptionMode regexSubscriptionMode);
232
233
/** Set subscription name (required) */
234
ConsumerBuilder<T> subscriptionName(String subscriptionName);
235
236
/** Set subscription type */
237
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
238
239
/** Set subscription mode */
240
ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);
241
242
/** Set subscription initial position */
243
ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);
244
245
/** Set key-shared policy for Key_Shared subscription */
246
ConsumerBuilder<T> keySharedPolicy(KeySharedPolicy keySharedPolicy);
247
248
/** Set message listener for push-style consumption */
249
ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
250
251
/** Set message listener executor */
252
ConsumerBuilder<T> messageListenerExecutor(Executor executor);
253
254
/** Set consumer event listener */
255
ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener);
256
257
/** Set receiver queue size (default: 1000) */
258
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);
259
260
/** Set acknowledgment group time */
261
ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit);
262
263
/** Set replication clusters */
264
ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState);
265
266
/** Set max total receiver queue size across partitions */
267
ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
268
269
/** Set consumer name */
270
ConsumerBuilder<T> consumerName(String consumerName);
271
272
/** Set acknowledgment timeout */
273
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
274
275
/** Set tick duration for acknowledgment timeout */
276
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
277
278
/** Set negative acknowledgment redelivery delay */
279
ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);
280
281
/** Set default redelivery backoff */
282
ConsumerBuilder<T> defaultRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff);
283
284
/** Set dead letter policy */
285
ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
286
287
/** Set retry enable */
288
ConsumerBuilder<T> enableRetry(boolean retryEnable);
289
290
/** Set batch receive policy */
291
ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePolicy);
292
293
/** Enable batch index acknowledgment */
294
ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgment);
295
296
/** Set max pending chunked messages */
297
ConsumerBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage);
298
299
/** Set auto acknowledge chunked messages timeout */
300
ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull);
301
302
/** Set expire time of incomplete chunked messages */
303
ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
304
305
/** Set priority level */
306
ConsumerBuilder<T> priorityLevel(int priorityLevel);
307
308
/** Add property */
309
ConsumerBuilder<T> property(String key, String value);
310
311
/** Set properties */
312
ConsumerBuilder<T> properties(Map<String, String> properties);
313
314
/** Add consumer interceptor */
315
ConsumerBuilder<T> intercept(ConsumerInterceptor<T> interceptor);
316
317
/** Set start message ID inclusive */
318
ConsumerBuilder<T> startMessageIdInclusive();
319
320
/** Enable pooling messages */
321
ConsumerBuilder<T> poolMessages(boolean poolMessages);
322
323
/** Set start paused */
324
ConsumerBuilder<T> startPaused(boolean paused);
325
326
/** Set auto scale receiver queue size */
327
ConsumerBuilder<T> autoScaleReceiverQueueSizeEnabled(boolean enabled);
328
329
/** Set topic consumer builder */
330
ConsumerBuilder<T> topicConsumerBuilder(String topicName, TopicConsumerBuilder<T> topicConsumerBuilder);
331
}
332
```
333
334
### Encryption Configuration
335
336
Configure message decryption for consumers.
337
338
```java { .api }
339
interface ConsumerBuilder<T> {
340
/** Set crypto key reader */
341
ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
342
343
/** Set default crypto key reader using private key path */
344
ConsumerBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
345
346
/** Set default crypto key reader using key store */
347
ConsumerBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
348
349
/** Set crypto failure action */
350
ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
351
}
352
```
353
354
**Consumer Configuration Examples:**
355
356
```java
357
// Basic consumer with Exclusive subscription
358
Consumer<String> consumer = client.newConsumer(Schema.STRING)
359
.topic("my-topic")
360
.subscriptionName("my-exclusive-sub")
361
.subscriptionType(SubscriptionType.Exclusive)
362
.subscribe();
363
364
// Shared subscription with multiple consumers
365
Consumer<String> consumer = client.newConsumer(Schema.STRING)
366
.topics(Arrays.asList("topic1", "topic2", "topic3"))
367
.subscriptionName("my-shared-sub")
368
.subscriptionType(SubscriptionType.Shared)
369
.receiverQueueSize(1000)
370
.consumerName("consumer-1")
371
.subscribe();
372
373
// Key-shared subscription with custom policy
374
Consumer<String> consumer = client.newConsumer(Schema.STRING)
375
.topic("partitioned-topic")
376
.subscriptionName("key-shared-sub")
377
.subscriptionType(SubscriptionType.Key_Shared)
378
.keySharedPolicy(KeySharedPolicy.stickyHashRange())
379
.subscribe();
380
381
// Pattern subscription
382
Consumer<String> consumer = client.newConsumer(Schema.STRING)
383
.topicsPattern("persistent://public/default/topic-.*")
384
.subscriptionName("pattern-sub")
385
.subscribe();
386
387
// Consumer with message listener
388
Consumer<String> consumer = client.newConsumer(Schema.STRING)
389
.topic("listener-topic")
390
.subscriptionName("listener-sub")
391
.messageListener((consumer, message) -> {
392
System.out.println("Received: " + message.getValue());
393
try {
394
consumer.acknowledge(message);
395
} catch (PulsarClientException e) {
396
consumer.negativeAcknowledge(message);
397
}
398
})
399
.subscribe();
400
```
401
402
### Batch Message Processing
403
404
Interface for handling batches of messages.
405
406
```java { .api }
407
/**
408
* Container for batch of messages
409
*/
410
interface Messages<T> extends Iterable<Message<T>> {
411
/** Get number of messages in batch */
412
int size();
413
414
/** Get list of message values */
415
List<T> stream();
416
417
/** Iterator over messages */
418
Iterator<Message<T>> iterator();
419
}
420
421
/**
422
* Batch receive policy configuration
423
*/
424
class BatchReceivePolicy {
425
/** Create builder for batch receive policy */
426
static BatchReceivePolicy.Builder builder();
427
428
/** Default batch receive policy */
429
static final BatchReceivePolicy DEFAULT_POLICY;
430
431
/** Get maximum number of messages in batch */
432
int getMaxNumMessages();
433
434
/** Get maximum number of bytes in batch */
435
long getMaxNumBytes();
436
437
/** Get batch timeout in milliseconds */
438
long getTimeoutMs();
439
440
interface Builder {
441
/** Set maximum messages in batch */
442
Builder maxNumMessages(int maxNumMessages);
443
444
/** Set maximum bytes in batch */
445
Builder maxNumBytes(long maxNumBytes);
446
447
/** Set batch timeout */
448
Builder timeout(long timeout, TimeUnit timeUnit);
449
450
/** Build the policy */
451
BatchReceivePolicy build();
452
}
453
}
454
```
455
456
**Batch Processing Examples:**
457
458
```java
459
// Configure batch receive policy
460
BatchReceivePolicy batchPolicy = BatchReceivePolicy.builder()
461
.maxNumMessages(100)
462
.maxNumBytes(1024 * 1024)
463
.timeout(100, TimeUnit.MILLISECONDS)
464
.build();
465
466
Consumer<String> consumer = client.newConsumer(Schema.STRING)
467
.topic("batch-topic")
468
.subscriptionName("batch-sub")
469
.batchReceivePolicy(batchPolicy)
470
.subscribe();
471
472
// Receive and process batch
473
Messages<String> messages = consumer.batchReceive();
474
for (Message<String> message : messages) {
475
System.out.println("Processing: " + message.getValue());
476
}
477
// Acknowledge entire batch
478
consumer.acknowledge(messages);
479
```
480
481
### Consumer Statistics
482
483
Interface for accessing consumer statistics and metrics.
484
485
```java { .api }
486
/**
487
* Consumer statistics interface
488
*/
489
interface ConsumerStats {
490
/** Number of messages received */
491
long getNumMsgsReceived();
492
493
/** Number of bytes received */
494
long getNumBytesReceived();
495
496
/** Receive rate in messages per second */
497
double getReceiveMsgsRate();
498
499
/** Receive rate in bytes per second */
500
double getReceiveBytesRate();
501
502
/** Number of acknowledgments sent */
503
long getNumAcksSent();
504
505
/** Number of failed acknowledgments */
506
long getNumAcksFailed();
507
508
/** Total messages received since creation */
509
long getTotalMsgsReceived();
510
511
/** Total bytes received since creation */
512
long getTotalBytesReceived();
513
514
/** Total receive failures since creation */
515
long getTotalReceivedFailed();
516
517
/** Total acknowledgments sent since creation */
518
long getTotalAcksSent();
519
520
/** Total failed acknowledgments since creation */
521
long getTotalAcksFailed();
522
523
/** Available permits for receiving */
524
int getAvailablePermits();
525
526
/** Number of unacknowledged messages */
527
int getNumUnackedMessages();
528
}
529
```
530
531
### Dead Letter Queue Configuration
532
533
Configuration for handling failed message processing.
534
535
```java { .api }
536
/**
537
* Dead letter queue policy configuration
538
*/
539
class DeadLetterPolicy {
540
/** Create builder for dead letter policy */
541
static DeadLetterPolicy.Builder builder();
542
543
/** Get maximum redelivery count */
544
int getMaxRedeliverCount();
545
546
/** Get retry letter topic name */
547
String getRetryLetterTopic();
548
549
/** Get dead letter topic name */
550
String getDeadLetterTopic();
551
552
/** Get initial subscription name */
553
String getInitialSubscriptionName();
554
555
interface Builder {
556
/** Set maximum redelivery count */
557
Builder maxRedeliverCount(int maxRedeliverCount);
558
559
/** Set retry letter topic */
560
Builder retryLetterTopic(String retryLetterTopic);
561
562
/** Set dead letter topic */
563
Builder deadLetterTopic(String deadLetterTopic);
564
565
/** Set initial subscription name */
566
Builder initialSubscriptionName(String initialSubscriptionName);
567
568
/** Build the policy */
569
DeadLetterPolicy build();
570
}
571
}
572
```
573
574
**Dead Letter Queue Example:**
575
576
```java
577
// Configure dead letter policy
578
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
579
.maxRedeliverCount(3)
580
.retryLetterTopic("my-topic-retry")
581
.deadLetterTopic("my-topic-dlq")
582
.build();
583
584
Consumer<String> consumer = client.newConsumer(Schema.STRING)
585
.topic("my-topic")
586
.subscriptionName("my-sub")
587
.subscriptionType(SubscriptionType.Shared)
588
.deadLetterPolicy(deadLetterPolicy)
589
.enableRetry(true)
590
.subscribe();
591
```
592
593
## Supporting Types and Enums
594
595
```java { .api }
596
enum SubscriptionType {
597
/** Single consumer */
598
Exclusive,
599
/** Multiple consumers, round-robin */
600
Shared,
601
/** Multiple consumers, active/standby */
602
Failover,
603
/** Multiple consumers, key-based routing */
604
Key_Shared
605
}
606
607
enum SubscriptionMode {
608
/** Persistent subscription */
609
Durable,
610
/** Ephemeral subscription */
611
NonDurable
612
}
613
614
enum SubscriptionInitialPosition {
615
/** Start from latest message */
616
Latest,
617
/** Start from earliest message */
618
Earliest
619
}
620
621
enum RegexSubscriptionMode {
622
/** Persistent topics only */
623
PersistentOnly,
624
/** Non-persistent topics only */
625
NonPersistentOnly,
626
/** All topic types */
627
AllTopics
628
}
629
630
enum ConsumerCryptoFailureAction {
631
/** Fail the receive operation */
632
FAIL,
633
/** Discard the message */
634
DISCARD,
635
/** Consume message as-is */
636
CONSUME
637
}
638
639
interface MessageListener<T> {
640
/** Handle received message */
641
void received(Consumer<T> consumer, Message<T> msg);
642
}
643
644
interface ConsumerEventListener {
645
/** Consumer became active */
646
void becameActive(Consumer<?> consumer, int partitionId);
647
648
/** Consumer became inactive */
649
void becameInactive(Consumer<?> consumer, int partitionId);
650
}
651
652
interface RedeliveryBackoff {
653
/** Get next backoff delay */
654
long next(int redeliveryCount);
655
}
656
```