Apache Pulsar Java client library for distributed pub-sub messaging platform
—
Subscribing to topics with various subscription types, acknowledgment patterns, message processing strategies, and advanced consumption features.
Core interface for consuming messages from Pulsar topics.
/**
* Interface for consuming messages from topics
* Thread-safe and supports various subscription types and acknowledgment patterns
*/
interface Consumer<T> extends Closeable {
/** Get topic name */
String getTopic();
/** Get subscription name */
String getSubscription();
/** Get consumer name */
String getConsumerName();
/** Receive message synchronously (blocks until message available) */
Message<T> receive() throws PulsarClientException;
/** Receive message asynchronously */
CompletableFuture<Message<T>> receiveAsync();
/** Receive message with timeout */
Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException;
/** Batch receive messages synchronously */
Messages<T> batchReceive() throws PulsarClientException;
/** Batch receive messages asynchronously */
CompletableFuture<Messages<T>> batchReceiveAsync();
/** Acknowledge message receipt */
void acknowledge(Message<?> message) throws PulsarClientException;
/** Acknowledge message by MessageId */
void acknowledge(MessageId messageId) throws PulsarClientException;
/** Acknowledge message asynchronously */
CompletableFuture<Void> acknowledgeAsync(Message<?> message);
/** Acknowledge message by MessageId asynchronously */
CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
/** Acknowledge all messages up to and including specified message */
void acknowledgeCumulative(Message<?> message) throws PulsarClientException;
/** Acknowledge all messages up to and including specified MessageId */
void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
/** Acknowledge cumulatively asynchronously */
CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
/** Acknowledge cumulatively by MessageId asynchronously */
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
/** Negative acknowledge message (triggers redelivery) */
void negativeAcknowledge(Message<?> message);
/** Negative acknowledge by MessageId */
void negativeAcknowledge(MessageId messageId);
/** Negative acknowledge batch of messages */
void negativeAcknowledge(Messages<?> messages);
/** Reconsume message later with delay */
void reconsumeLater(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;
/** Reconsume message later with delay and custom properties */
void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;
/** Reconsume batch of messages later with delay */
void reconsumeLater(Messages<?> messages, long delay, TimeUnit unit) throws PulsarClientException;
/** Reconsume message later cumulatively with delay */
void reconsumeLaterCumulative(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;
/** Reconsume message later cumulatively with delay and custom properties */
void reconsumeLaterCumulative(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;
/** Reconsume message later asynchronously */
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delay, TimeUnit unit);
/** Reconsume message later asynchronously with custom properties */
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);
/** Reconsume batch of messages later asynchronously */
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delay, TimeUnit unit);
/** Reconsume message later cumulatively asynchronously */
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delay, TimeUnit unit);
/** Reconsume message later cumulatively asynchronously with custom properties */
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);
/** Get consumer statistics */
ConsumerStats getStats();
/** Unsubscribe from topic */
void unsubscribe() throws PulsarClientException;
/** Unsubscribe asynchronously */
CompletableFuture<Void> unsubscribeAsync();
/** Check if consumer is connected */
boolean isConnected();
/** Get timestamp of last disconnection */
long getLastDisconnectedTimestamp();
/** Pause message delivery */
void pause();
/** Resume message delivery */
void resume();
/** Check if consumer is paused */
boolean isPaused();
/** Seek to specific message ID */
void seek(MessageId messageId) throws PulsarClientException;
/** Seek using custom function */
void seek(Function<String, Object> function) throws PulsarClientException;
/** Seek to specific message ID asynchronously */
CompletableFuture<Void> seekAsync(MessageId messageId);
/** Seek to specific timestamp */
void seek(long timestamp) throws PulsarClientException;
/** Seek to specific timestamp asynchronously */
CompletableFuture<Void> seekAsync(long timestamp);
/** Seek using custom function asynchronously */
CompletableFuture<Void> seekAsync(Function<String, Object> function);
/** Get last MessageId (deprecated) */
@Deprecated
CompletableFuture<MessageId> getLastMessageIdAsync();
/** Get last message IDs for all partitions */
List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
/** Get last message IDs for all partitions asynchronously */
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
/** Close consumer */
void close() throws PulsarClientException;
/** Close consumer asynchronously */
CompletableFuture<Void> closeAsync();
}Usage Examples:
import org.apache.pulsar.client.api.*;
// Simple message consumption
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
Message<String> message = consumer.receive();
try {
System.out.println("Received: " + message.getValue());
consumer.acknowledge(message);
} catch (Exception e) {
consumer.negativeAcknowledge(message);
}
}
// Asynchronous consumption
consumer.receiveAsync()
.thenAccept(message -> {
System.out.println("Async received: " + message.getValue());
consumer.acknowledgeAsync(message);
})
.exceptionally(throwable -> {
System.err.println("Receive failed: " + throwable.getMessage());
return null;
});
// Batch consumption
Messages<String> messages = consumer.batchReceive();
for (Message<String> message : messages) {
System.out.println("Batch message: " + message.getValue());
}
consumer.acknowledge(messages);Builder interface for configuring and creating Consumer instances.
/**
* Builder for configuring and creating Consumer instances
*/
interface ConsumerBuilder<T> extends Serializable, Cloneable {
/** Create and subscribe consumer synchronously */
Consumer<T> subscribe() throws PulsarClientException;
/** Create and subscribe consumer asynchronously */
CompletableFuture<Consumer<T>> subscribeAsync();
/** Clone the builder */
ConsumerBuilder<T> clone();
/** Set topic names to subscribe to */
ConsumerBuilder<T> topic(String... topicNames);
/** Set list of topic names */
ConsumerBuilder<T> topics(List<String> topicNames);
/** Set topic pattern for dynamic topic discovery */
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
/** Set topic pattern with regex subscription mode */
ConsumerBuilder<T> topicsPattern(String topicsPattern, RegexSubscriptionMode regexSubscriptionMode);
/** Set subscription name (required) */
ConsumerBuilder<T> subscriptionName(String subscriptionName);
/** Set subscription type */
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
/** Set subscription mode */
ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);
/** Set subscription initial position */
ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);
/** Set key-shared policy for Key_Shared subscription */
ConsumerBuilder<T> keySharedPolicy(KeySharedPolicy keySharedPolicy);
/** Set message listener for push-style consumption */
ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
/** Set message listener executor */
ConsumerBuilder<T> messageListenerExecutor(Executor executor);
/** Set consumer event listener */
ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener);
/** Set receiver queue size (default: 1000) */
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);
/** Set acknowledgment group time */
ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit);
/** Set replication clusters */
ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState);
/** Set max total receiver queue size across partitions */
ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
/** Set consumer name */
ConsumerBuilder<T> consumerName(String consumerName);
/** Set acknowledgment timeout */
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
/** Set tick duration for acknowledgment timeout */
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
/** Set negative acknowledgment redelivery delay */
ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);
/** Set default redelivery backoff */
ConsumerBuilder<T> defaultRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff);
/** Set dead letter policy */
ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
/** Set retry enable */
ConsumerBuilder<T> enableRetry(boolean retryEnable);
/** Set batch receive policy */
ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePolicy);
/** Enable batch index acknowledgment */
ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgment);
/** Set max pending chunked messages */
ConsumerBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage);
/** Set auto acknowledge chunked messages timeout */
ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull);
/** Set expire time of incomplete chunked messages */
ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
/** Set priority level */
ConsumerBuilder<T> priorityLevel(int priorityLevel);
/** Add property */
ConsumerBuilder<T> property(String key, String value);
/** Set properties */
ConsumerBuilder<T> properties(Map<String, String> properties);
/** Add consumer interceptor */
ConsumerBuilder<T> intercept(ConsumerInterceptor<T> interceptor);
/** Set start message ID inclusive */
ConsumerBuilder<T> startMessageIdInclusive();
/** Enable pooling messages */
ConsumerBuilder<T> poolMessages(boolean poolMessages);
/** Set start paused */
ConsumerBuilder<T> startPaused(boolean paused);
/** Set auto scale receiver queue size */
ConsumerBuilder<T> autoScaleReceiverQueueSizeEnabled(boolean enabled);
/** Set topic consumer builder */
ConsumerBuilder<T> topicConsumerBuilder(String topicName, TopicConsumerBuilder<T> topicConsumerBuilder);
}Configure message decryption for consumers.
interface ConsumerBuilder<T> {
/** Set crypto key reader */
ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
/** Set default crypto key reader using private key path */
ConsumerBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
/** Set default crypto key reader using key store */
ConsumerBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
/** Set crypto failure action */
ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
}Consumer Configuration Examples:
// Basic consumer with Exclusive subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-exclusive-sub")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
// Shared subscription with multiple consumers
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topics(Arrays.asList("topic1", "topic2", "topic3"))
.subscriptionName("my-shared-sub")
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(1000)
.consumerName("consumer-1")
.subscribe();
// Key-shared subscription with custom policy
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("partitioned-topic")
.subscriptionName("key-shared-sub")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange())
.subscribe();
// Pattern subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topicsPattern("persistent://public/default/topic-.*")
.subscriptionName("pattern-sub")
.subscribe();
// Consumer with message listener
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("listener-topic")
.subscriptionName("listener-sub")
.messageListener((consumer, message) -> {
System.out.println("Received: " + message.getValue());
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
consumer.negativeAcknowledge(message);
}
})
.subscribe();Interface for handling batches of messages.
/**
* Container for batch of messages
*/
interface Messages<T> extends Iterable<Message<T>> {
/** Get number of messages in batch */
int size();
/** Get list of message values */
List<T> stream();
/** Iterator over messages */
Iterator<Message<T>> iterator();
}
/**
* Batch receive policy configuration
*/
class BatchReceivePolicy {
/** Create builder for batch receive policy */
static BatchReceivePolicy.Builder builder();
/** Default batch receive policy */
static final BatchReceivePolicy DEFAULT_POLICY;
/** Get maximum number of messages in batch */
int getMaxNumMessages();
/** Get maximum number of bytes in batch */
long getMaxNumBytes();
/** Get batch timeout in milliseconds */
long getTimeoutMs();
interface Builder {
/** Set maximum messages in batch */
Builder maxNumMessages(int maxNumMessages);
/** Set maximum bytes in batch */
Builder maxNumBytes(long maxNumBytes);
/** Set batch timeout */
Builder timeout(long timeout, TimeUnit timeUnit);
/** Build the policy */
BatchReceivePolicy build();
}
}Batch Processing Examples:
// Configure batch receive policy
BatchReceivePolicy batchPolicy = BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024 * 1024)
.timeout(100, TimeUnit.MILLISECONDS)
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("batch-topic")
.subscriptionName("batch-sub")
.batchReceivePolicy(batchPolicy)
.subscribe();
// Receive and process batch
Messages<String> messages = consumer.batchReceive();
for (Message<String> message : messages) {
System.out.println("Processing: " + message.getValue());
}
// Acknowledge entire batch
consumer.acknowledge(messages);Interface for accessing consumer statistics and metrics.
/**
* Consumer statistics interface
*/
interface ConsumerStats {
/** Number of messages received */
long getNumMsgsReceived();
/** Number of bytes received */
long getNumBytesReceived();
/** Receive rate in messages per second */
double getReceiveMsgsRate();
/** Receive rate in bytes per second */
double getReceiveBytesRate();
/** Number of acknowledgments sent */
long getNumAcksSent();
/** Number of failed acknowledgments */
long getNumAcksFailed();
/** Total messages received since creation */
long getTotalMsgsReceived();
/** Total bytes received since creation */
long getTotalBytesReceived();
/** Total receive failures since creation */
long getTotalReceivedFailed();
/** Total acknowledgments sent since creation */
long getTotalAcksSent();
/** Total failed acknowledgments since creation */
long getTotalAcksFailed();
/** Available permits for receiving */
int getAvailablePermits();
/** Number of unacknowledged messages */
int getNumUnackedMessages();
}Configuration for handling failed message processing.
/**
* Dead letter queue policy configuration
*/
class DeadLetterPolicy {
/** Create builder for dead letter policy */
static DeadLetterPolicy.Builder builder();
/** Get maximum redelivery count */
int getMaxRedeliverCount();
/** Get retry letter topic name */
String getRetryLetterTopic();
/** Get dead letter topic name */
String getDeadLetterTopic();
/** Get initial subscription name */
String getInitialSubscriptionName();
interface Builder {
/** Set maximum redelivery count */
Builder maxRedeliverCount(int maxRedeliverCount);
/** Set retry letter topic */
Builder retryLetterTopic(String retryLetterTopic);
/** Set dead letter topic */
Builder deadLetterTopic(String deadLetterTopic);
/** Set initial subscription name */
Builder initialSubscriptionName(String initialSubscriptionName);
/** Build the policy */
DeadLetterPolicy build();
}
}Dead Letter Queue Example:
// Configure dead letter policy
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(3)
.retryLetterTopic("my-topic-retry")
.deadLetterTopic("my-topic-dlq")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(deadLetterPolicy)
.enableRetry(true)
.subscribe();enum SubscriptionType {
/** Single consumer */
Exclusive,
/** Multiple consumers, round-robin */
Shared,
/** Multiple consumers, active/standby */
Failover,
/** Multiple consumers, key-based routing */
Key_Shared
}
enum SubscriptionMode {
/** Persistent subscription */
Durable,
/** Ephemeral subscription */
NonDurable
}
enum SubscriptionInitialPosition {
/** Start from latest message */
Latest,
/** Start from earliest message */
Earliest
}
enum RegexSubscriptionMode {
/** Persistent topics only */
PersistentOnly,
/** Non-persistent topics only */
NonPersistentOnly,
/** All topic types */
AllTopics
}
enum ConsumerCryptoFailureAction {
/** Fail the receive operation */
FAIL,
/** Discard the message */
DISCARD,
/** Consume message as-is */
CONSUME
}
interface MessageListener<T> {
/** Handle received message */
void received(Consumer<T> consumer, Message<T> msg);
}
interface ConsumerEventListener {
/** Consumer became active */
void becameActive(Consumer<?> consumer, int partitionId);
/** Consumer became inactive */
void becameInactive(Consumer<?> consumer, int partitionId);
}
interface RedeliveryBackoff {
/** Get next backoff delay */
long next(int redeliveryCount);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-client