CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-client

Apache Pulsar Java client library for distributed pub-sub messaging platform

Pending
Overview
Eval results
Files

message-consumption.mddocs/

Message Consumption

Subscribing to topics with various subscription types, acknowledgment patterns, message processing strategies, and advanced consumption features.

Capabilities

Consumer Interface

Core interface for consuming messages from Pulsar topics.

/**
 * Interface for consuming messages from topics
 * Thread-safe and supports various subscription types and acknowledgment patterns
 */
interface Consumer<T> extends Closeable {
    /** Get topic name */
    String getTopic();
    
    /** Get subscription name */
    String getSubscription();
    
    /** Get consumer name */
    String getConsumerName();
    
    /** Receive message synchronously (blocks until message available) */
    Message<T> receive() throws PulsarClientException;
    
    /** Receive message asynchronously */
    CompletableFuture<Message<T>> receiveAsync();
    
    /** Receive message with timeout */
    Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException;
    
    /** Batch receive messages synchronously */
    Messages<T> batchReceive() throws PulsarClientException;
    
    /** Batch receive messages asynchronously */
    CompletableFuture<Messages<T>> batchReceiveAsync();
    
    /** Acknowledge message receipt */
    void acknowledge(Message<?> message) throws PulsarClientException;
    
    /** Acknowledge message by MessageId */
    void acknowledge(MessageId messageId) throws PulsarClientException;
    
    /** Acknowledge message asynchronously */
    CompletableFuture<Void> acknowledgeAsync(Message<?> message);
    
    /** Acknowledge message by MessageId asynchronously */
    CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
    
    /** Acknowledge all messages up to and including specified message */
    void acknowledgeCumulative(Message<?> message) throws PulsarClientException;
    
    /** Acknowledge all messages up to and including specified MessageId */
    void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
    
    /** Acknowledge cumulatively asynchronously */
    CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
    
    /** Acknowledge cumulatively by MessageId asynchronously */
    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
    
    /** Negative acknowledge message (triggers redelivery) */
    void negativeAcknowledge(Message<?> message);
    
    /** Negative acknowledge by MessageId */
    void negativeAcknowledge(MessageId messageId);
    
    /** Negative acknowledge batch of messages */
    void negativeAcknowledge(Messages<?> messages);
    
    /** Reconsume message later with delay */
    void reconsumeLater(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;
    
    /** Reconsume message later with delay and custom properties */
    void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;
    
    /** Reconsume batch of messages later with delay */
    void reconsumeLater(Messages<?> messages, long delay, TimeUnit unit) throws PulsarClientException;
    
    /** Reconsume message later cumulatively with delay */
    void reconsumeLaterCumulative(Message<?> message, long delay, TimeUnit unit) throws PulsarClientException;
    
    /** Reconsume message later cumulatively with delay and custom properties */
    void reconsumeLaterCumulative(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit) throws PulsarClientException;
    
    /** Reconsume message later asynchronously */
    CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delay, TimeUnit unit);
    
    /** Reconsume message later asynchronously with custom properties */
    CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);
    
    /** Reconsume batch of messages later asynchronously */
    CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delay, TimeUnit unit);
    
    /** Reconsume message later cumulatively asynchronously */
    CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delay, TimeUnit unit);
    
    /** Reconsume message later cumulatively asynchronously with custom properties */
    CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delay, TimeUnit unit);
    
    /** Get consumer statistics */
    ConsumerStats getStats();
    
    /** Unsubscribe from topic */
    void unsubscribe() throws PulsarClientException;
    
    /** Unsubscribe asynchronously */
    CompletableFuture<Void> unsubscribeAsync();
    
    /** Check if consumer is connected */
    boolean isConnected();
    
    /** Get timestamp of last disconnection */
    long getLastDisconnectedTimestamp();
    
    /** Pause message delivery */
    void pause();
    
    /** Resume message delivery */
    void resume();
    
    /** Check if consumer is paused */
    boolean isPaused();
    
    /** Seek to specific message ID */
    void seek(MessageId messageId) throws PulsarClientException;
    
    /** Seek using custom function */
    void seek(Function<String, Object> function) throws PulsarClientException;
    
    /** Seek to specific message ID asynchronously */
    CompletableFuture<Void> seekAsync(MessageId messageId);
    
    /** Seek to specific timestamp */
    void seek(long timestamp) throws PulsarClientException;
    
    /** Seek to specific timestamp asynchronously */
    CompletableFuture<Void> seekAsync(long timestamp);
    
    /** Seek using custom function asynchronously */
    CompletableFuture<Void> seekAsync(Function<String, Object> function);
    
    /** Get last MessageId (deprecated) */
    @Deprecated
    CompletableFuture<MessageId> getLastMessageIdAsync();
    
    /** Get last message IDs for all partitions */
    List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
    
    /** Get last message IDs for all partitions asynchronously */
    CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
    
    /** Close consumer */
    void close() throws PulsarClientException;
    
    /** Close consumer asynchronously */
    CompletableFuture<Void> closeAsync();
}

Usage Examples:

import org.apache.pulsar.client.api.*;

// Simple message consumption
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscribe();

while (true) {
    Message<String> message = consumer.receive();
    try {
        System.out.println("Received: " + message.getValue());
        consumer.acknowledge(message);
    } catch (Exception e) {
        consumer.negativeAcknowledge(message);
    }
}

// Asynchronous consumption
consumer.receiveAsync()
    .thenAccept(message -> {
        System.out.println("Async received: " + message.getValue());
        consumer.acknowledgeAsync(message);
    })
    .exceptionally(throwable -> {
        System.err.println("Receive failed: " + throwable.getMessage());
        return null;
    });

// Batch consumption
Messages<String> messages = consumer.batchReceive();
for (Message<String> message : messages) {
    System.out.println("Batch message: " + message.getValue());
}
consumer.acknowledge(messages);

ConsumerBuilder Configuration

Builder interface for configuring and creating Consumer instances.

/**
 * Builder for configuring and creating Consumer instances
 */
interface ConsumerBuilder<T> extends Serializable, Cloneable {
    /** Create and subscribe consumer synchronously */
    Consumer<T> subscribe() throws PulsarClientException;
    
    /** Create and subscribe consumer asynchronously */
    CompletableFuture<Consumer<T>> subscribeAsync();
    
    /** Clone the builder */
    ConsumerBuilder<T> clone();
    
    /** Set topic names to subscribe to */
    ConsumerBuilder<T> topic(String... topicNames);
    
    /** Set list of topic names */
    ConsumerBuilder<T> topics(List<String> topicNames);
    
    /** Set topic pattern for dynamic topic discovery */
    ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);
    
    /** Set topic pattern with regex subscription mode */
    ConsumerBuilder<T> topicsPattern(String topicsPattern, RegexSubscriptionMode regexSubscriptionMode);
    
    /** Set subscription name (required) */
    ConsumerBuilder<T> subscriptionName(String subscriptionName);
    
    /** Set subscription type */
    ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
    
    /** Set subscription mode */
    ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);
    
    /** Set subscription initial position */
    ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);
    
    /** Set key-shared policy for Key_Shared subscription */
    ConsumerBuilder<T> keySharedPolicy(KeySharedPolicy keySharedPolicy);
    
    /** Set message listener for push-style consumption */
    ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
    
    /** Set message listener executor */
    ConsumerBuilder<T> messageListenerExecutor(Executor executor);
    
    /** Set consumer event listener */
    ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener);
    
    /** Set receiver queue size (default: 1000) */
    ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);
    
    /** Set acknowledgment group time */
    ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit);
    
    /** Set replication clusters */
    ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState);
    
    /** Set max total receiver queue size across partitions */
    ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
    
    /** Set consumer name */
    ConsumerBuilder<T> consumerName(String consumerName);
    
    /** Set acknowledgment timeout */
    ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
    
    /** Set tick duration for acknowledgment timeout */
    ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
    
    /** Set negative acknowledgment redelivery delay */
    ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);
    
    /** Set default redelivery backoff */
    ConsumerBuilder<T> defaultRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff);
    
    /** Set dead letter policy */
    ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
    
    /** Set retry enable */
    ConsumerBuilder<T> enableRetry(boolean retryEnable);
    
    /** Set batch receive policy */
    ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePolicy);
    
    /** Enable batch index acknowledgment */
    ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgment);
    
    /** Set max pending chunked messages */
    ConsumerBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage);
    
    /** Set auto acknowledge chunked messages timeout */
    ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull);
    
    /** Set expire time of incomplete chunked messages */
    ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);
    
    /** Set priority level */
    ConsumerBuilder<T> priorityLevel(int priorityLevel);
    
    /** Add property */
    ConsumerBuilder<T> property(String key, String value);
    
    /** Set properties */
    ConsumerBuilder<T> properties(Map<String, String> properties);
    
    /** Add consumer interceptor */
    ConsumerBuilder<T> intercept(ConsumerInterceptor<T> interceptor);
    
    /** Set start message ID inclusive */
    ConsumerBuilder<T> startMessageIdInclusive();
    
    /** Enable pooling messages */
    ConsumerBuilder<T> poolMessages(boolean poolMessages);
    
    /** Set start paused */
    ConsumerBuilder<T> startPaused(boolean paused);
    
    /** Set auto scale receiver queue size */
    ConsumerBuilder<T> autoScaleReceiverQueueSizeEnabled(boolean enabled);
    
    /** Set topic consumer builder */
    ConsumerBuilder<T> topicConsumerBuilder(String topicName, TopicConsumerBuilder<T> topicConsumerBuilder);
}

Encryption Configuration

Configure message decryption for consumers.

interface ConsumerBuilder<T> {
    /** Set crypto key reader */
    ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
    
    /** Set default crypto key reader using private key path */
    ConsumerBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
    
    /** Set default crypto key reader using key store */
    ConsumerBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
    
    /** Set crypto failure action */
    ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
}

Consumer Configuration Examples:

// Basic consumer with Exclusive subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-exclusive-sub")
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();

// Shared subscription with multiple consumers
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topics(Arrays.asList("topic1", "topic2", "topic3"))
    .subscriptionName("my-shared-sub")
    .subscriptionType(SubscriptionType.Shared)
    .receiverQueueSize(1000)
    .consumerName("consumer-1")
    .subscribe();

// Key-shared subscription with custom policy
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("partitioned-topic")
    .subscriptionName("key-shared-sub")
    .subscriptionType(SubscriptionType.Key_Shared)
    .keySharedPolicy(KeySharedPolicy.stickyHashRange())
    .subscribe();

// Pattern subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topicsPattern("persistent://public/default/topic-.*")
    .subscriptionName("pattern-sub")
    .subscribe();

// Consumer with message listener
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("listener-topic")
    .subscriptionName("listener-sub")
    .messageListener((consumer, message) -> {
        System.out.println("Received: " + message.getValue());
        try {
            consumer.acknowledge(message);
        } catch (PulsarClientException e) {
            consumer.negativeAcknowledge(message);
        }
    })
    .subscribe();

Batch Message Processing

Interface for handling batches of messages.

/**
 * Container for batch of messages
 */
interface Messages<T> extends Iterable<Message<T>> {
    /** Get number of messages in batch */
    int size();
    
    /** Get list of message values */
    List<T> stream();
    
    /** Iterator over messages */
    Iterator<Message<T>> iterator();
}

/**
 * Batch receive policy configuration
 */
class BatchReceivePolicy {
    /** Create builder for batch receive policy */
    static BatchReceivePolicy.Builder builder();
    
    /** Default batch receive policy */
    static final BatchReceivePolicy DEFAULT_POLICY;
    
    /** Get maximum number of messages in batch */
    int getMaxNumMessages();
    
    /** Get maximum number of bytes in batch */
    long getMaxNumBytes();
    
    /** Get batch timeout in milliseconds */
    long getTimeoutMs();
    
    interface Builder {
        /** Set maximum messages in batch */
        Builder maxNumMessages(int maxNumMessages);
        
        /** Set maximum bytes in batch */
        Builder maxNumBytes(long maxNumBytes);
        
        /** Set batch timeout */
        Builder timeout(long timeout, TimeUnit timeUnit);
        
        /** Build the policy */
        BatchReceivePolicy build();
    }
}

Batch Processing Examples:

// Configure batch receive policy
BatchReceivePolicy batchPolicy = BatchReceivePolicy.builder()
    .maxNumMessages(100)
    .maxNumBytes(1024 * 1024)
    .timeout(100, TimeUnit.MILLISECONDS)
    .build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("batch-topic")
    .subscriptionName("batch-sub")
    .batchReceivePolicy(batchPolicy)
    .subscribe();

// Receive and process batch
Messages<String> messages = consumer.batchReceive();
for (Message<String> message : messages) {
    System.out.println("Processing: " + message.getValue());
}
// Acknowledge entire batch
consumer.acknowledge(messages);

Consumer Statistics

Interface for accessing consumer statistics and metrics.

/**
 * Consumer statistics interface
 */
interface ConsumerStats {
    /** Number of messages received */
    long getNumMsgsReceived();
    
    /** Number of bytes received */
    long getNumBytesReceived();
    
    /** Receive rate in messages per second */
    double getReceiveMsgsRate();
    
    /** Receive rate in bytes per second */
    double getReceiveBytesRate();
    
    /** Number of acknowledgments sent */
    long getNumAcksSent();
    
    /** Number of failed acknowledgments */
    long getNumAcksFailed();
    
    /** Total messages received since creation */
    long getTotalMsgsReceived();
    
    /** Total bytes received since creation */
    long getTotalBytesReceived();
    
    /** Total receive failures since creation */
    long getTotalReceivedFailed();
    
    /** Total acknowledgments sent since creation */
    long getTotalAcksSent();
    
    /** Total failed acknowledgments since creation */
    long getTotalAcksFailed();
    
    /** Available permits for receiving */
    int getAvailablePermits();
    
    /** Number of unacknowledged messages */
    int getNumUnackedMessages();
}

Dead Letter Queue Configuration

Configuration for handling failed message processing.

/**
 * Dead letter queue policy configuration
 */
class DeadLetterPolicy {
    /** Create builder for dead letter policy */
    static DeadLetterPolicy.Builder builder();
    
    /** Get maximum redelivery count */
    int getMaxRedeliverCount();
    
    /** Get retry letter topic name */
    String getRetryLetterTopic();
    
    /** Get dead letter topic name */
    String getDeadLetterTopic();
    
    /** Get initial subscription name */
    String getInitialSubscriptionName();
    
    interface Builder {
        /** Set maximum redelivery count */
        Builder maxRedeliverCount(int maxRedeliverCount);
        
        /** Set retry letter topic */
        Builder retryLetterTopic(String retryLetterTopic);
        
        /** Set dead letter topic */
        Builder deadLetterTopic(String deadLetterTopic);
        
        /** Set initial subscription name */
        Builder initialSubscriptionName(String initialSubscriptionName);
        
        /** Build the policy */
        DeadLetterPolicy build();
    }
}

Dead Letter Queue Example:

// Configure dead letter policy
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
    .maxRedeliverCount(3)
    .retryLetterTopic("my-topic-retry")
    .deadLetterTopic("my-topic-dlq")
    .build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-sub")
    .subscriptionType(SubscriptionType.Shared)
    .deadLetterPolicy(deadLetterPolicy)
    .enableRetry(true)
    .subscribe();

Supporting Types and Enums

enum SubscriptionType {
    /** Single consumer */
    Exclusive,
    /** Multiple consumers, round-robin */
    Shared,
    /** Multiple consumers, active/standby */
    Failover,
    /** Multiple consumers, key-based routing */
    Key_Shared
}

enum SubscriptionMode {
    /** Persistent subscription */
    Durable,
    /** Ephemeral subscription */
    NonDurable
}

enum SubscriptionInitialPosition {
    /** Start from latest message */
    Latest,
    /** Start from earliest message */
    Earliest
}

enum RegexSubscriptionMode {
    /** Persistent topics only */
    PersistentOnly,
    /** Non-persistent topics only */
    NonPersistentOnly,
    /** All topic types */
    AllTopics
}

enum ConsumerCryptoFailureAction {
    /** Fail the receive operation */
    FAIL,
    /** Discard the message */
    DISCARD,
    /** Consume message as-is */
    CONSUME
}

interface MessageListener<T> {
    /** Handle received message */
    void received(Consumer<T> consumer, Message<T> msg);
}

interface ConsumerEventListener {
    /** Consumer became active */
    void becameActive(Consumer<?> consumer, int partitionId);
    
    /** Consumer became inactive */
    void becameInactive(Consumer<?> consumer, int partitionId);
}

interface RedeliveryBackoff {
    /** Get next backoff delay */
    long next(int redeliveryCount);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-client

docs

authentication-security.md

client-management.md

index.md

message-consumption.md

message-production.md

message-reading.md

schema-serialization.md

transaction-support.md

tile.json