CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-client

Apache Pulsar Java client library for distributed pub-sub messaging platform

Pending
Overview
Eval results
Files

message-reading.mddocs/

Message Reading

Low-level message reading with manual positioning for replay scenarios, custom consumption patterns, and precise message access control.

Capabilities

Reader Interface

Low-level interface for reading messages with manual positioning, without using subscriptions.

/**
 * Interface for reading messages with manual positioning
 * Provides low-level abstraction for manual positioning in topics without subscriptions
 * Suitable for replay scenarios and custom consumption patterns
 */
interface Reader<T> extends Closeable {
    /** Get topic name */
    String getTopic();
    
    /** Read next message synchronously (blocks until message available) */
    Message<T> readNext() throws PulsarClientException;
    
    /** Read next message asynchronously */
    CompletableFuture<Message<T>> readNextAsync();
    
    /** Read next message with timeout */
    Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException;
    
    /** Seek to specific message ID */
    void seek(MessageId messageId) throws PulsarClientException;
    
    /** Seek to specific message ID asynchronously */
    CompletableFuture<Void> seekAsync(MessageId messageId);
    
    /** Seek to specific timestamp */
    void seek(long timestamp) throws PulsarClientException;
    
    /** Seek to specific timestamp asynchronously */
    CompletableFuture<Void> seekAsync(long timestamp);
    
    /** Seek using custom function */
    void seek(Function<String, Object> function) throws PulsarClientException;
    
    /** Seek using custom function asynchronously */
    CompletableFuture<Void> seekAsync(Function<String, Object> function);
    
    /** Check if messages are available */
    boolean hasMessageAvailable() throws PulsarClientException;
    
    /** Check if messages are available asynchronously */
    CompletableFuture<Boolean> hasMessageAvailableAsync();
    
    /** Check if reader is connected */
    boolean isConnected();
    
    /** Check if reader has reached end of topic */
    boolean hasReachedEndOfTopic();
    
    /** Get last message IDs for all partitions */
    List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
    
    /** Get last message IDs for all partitions asynchronously */
    CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
    
    /** Close reader */
    void close() throws PulsarClientException;
    
    /** Close reader asynchronously */
    CompletableFuture<Void> closeAsync();
}

Usage Examples:

import org.apache.pulsar.client.api.*;

// Create reader starting from earliest message
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageId(MessageId.earliest)
    .create();

// Read messages sequentially
while (reader.hasMessageAvailable()) {
    Message<String> message = reader.readNext();
    System.out.println("Read: " + message.getValue());
    // Note: Readers don't need acknowledgments
}

// Async reading
reader.readNextAsync()
    .thenAccept(message -> {
        System.out.println("Async read: " + message.getValue());
    })
    .exceptionally(throwable -> {
        System.err.println("Read failed: " + throwable.getMessage());
        return null;
    });

// Seek to specific position
MessageId specificMessageId = getStoredMessageId();
reader.seek(specificMessageId);
Message<String> message = reader.readNext();

// Seek to timestamp
long timestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
reader.seek(timestamp);

ReaderBuilder Configuration

Builder interface for configuring and creating Reader instances.

/**
 * Builder for configuring and creating Reader instances
 */
interface ReaderBuilder<T> extends Serializable, Cloneable {
    /** Create the reader synchronously */
    Reader<T> create() throws PulsarClientException;
    
    /** Create the reader asynchronously */
    CompletableFuture<Reader<T>> createAsync();
    
    /** Clone the builder */
    ReaderBuilder<T> clone();
    
    /** Set topic name (required) */
    ReaderBuilder<T> topic(String topicName);
    
    /** Set start message ID (required) */
    ReaderBuilder<T> startMessageId(MessageId startMessageId);
    
    /** Start from rollback duration */
    ReaderBuilder<T> startMessageFromRollbackDuration(long rollbackDuration, TimeUnit timeunit);
    
    /** Set reader name (optional, auto-generated if not set) */
    ReaderBuilder<T> readerName(String readerName);
    
    /** Set subscription name for position persistence */
    ReaderBuilder<T> subscriptionName(String subscriptionName);
    
    /** Set subscription role prefix */
    ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix);
    
    /** Set receiver queue size (default: 1000) */
    ReaderBuilder<T> receiverQueueSize(int receiverQueueSize);
    
    /** Set reader listener for push-style reading */
    ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
    
    /** Set reader listener executor */
    ReaderBuilder<T> readerListenerExecutor(Executor executor);
    
    /** Enable reading compacted messages only */
    ReaderBuilder<T> readCompacted(boolean readCompacted);
    
    /** Reset cursor to start position on reconnection */
    ReaderBuilder<T> resetIncludeHead(boolean resetIncludeHead);
    
    /** Set reader configuration */
    ReaderBuilder<T> loadConf(Map<String, Object> config);
    
    /** Add reader interceptor */
    ReaderBuilder<T> intercept(ReaderInterceptor<T> interceptor);
    
    /** Set key hash ranges for multi-topic readers */
    ReaderBuilder<T> keyHashRange(Range... ranges);
    
    /** Enable pooling messages */
    ReaderBuilder<T> poolMessages(boolean poolMessages);
    
    /** Start message ID inclusive */
    ReaderBuilder<T> startMessageIdInclusive();
}

Encryption Configuration

Configure message decryption for readers.

interface ReaderBuilder<T> {
    /** Set crypto key reader */
    ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
    
    /** Set default crypto key reader using private key path */
    ReaderBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
    
    /** Set default crypto key reader using key store */
    ReaderBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
    
    /** Set crypto failure action */
    ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
}

Reader Configuration Examples:

// Basic reader from earliest
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageId(MessageId.earliest)
    .readerName("my-reader")
    .create();

// Reader from specific message ID
MessageId lastProcessedId = getLastProcessedMessageId();
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageId(lastProcessedId)
    .receiverQueueSize(2000)
    .create();

// Reader with persistent position using subscription
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageId(MessageId.latest)
    .subscriptionName("reader-position")
    .create();

// Reader from time-based position
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageFromRollbackDuration(1, TimeUnit.HOURS)
    .create();

// Compacted topic reader
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("compacted-topic")
    .startMessageId(MessageId.earliest)
    .readCompacted(true)
    .create();

// Reader with push-style listener
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("listener-topic")
    .startMessageId(MessageId.latest)
    .readerListener((reader, message) -> {
        System.out.println("Listener received: " + message.getValue());
    })
    .create();

TableView Interface

Key-value view of compacted topics, providing map-like access to the latest values.

/**
 * Key-value view of a compacted topic
 * Provides map-like interface to latest values for each key
 * Messages without keys are ignored
 */
interface TableView<T> extends Closeable {
    /** Get number of entries */
    int size();
    
    /** Check if table is empty */
    boolean isEmpty();
    
    /** Check if key exists */
    boolean containsKey(String key);
    
    /** Get value by key */
    T get(String key);
    
    /** Get all keys */
    Set<String> keySet();
    
    /** Get all values */
    Collection<T> values();
    
    /** Get all entries */
    Set<Map.Entry<String, T>> entrySet();
    
    /** Iterate over all entries */
    void forEach(BiConsumer<String, T> action);
    
    /** Refresh table view asynchronously */
    CompletableFuture<Void> refreshAsync();
    
    /** Listen for table updates */
    void listen(BiConsumer<String, T> action);
    
    /** Get topic name */
    String getTopic();
    
    /** Close table view */
    void close() throws PulsarClientException;
    
    /** Close table view asynchronously */
    CompletableFuture<Void> closeAsync();
}

TableViewBuilder Configuration

Builder interface for configuring and creating TableView instances.

/**
 * Builder for configuring and creating TableView instances
 */
interface TableViewBuilder<T> extends Serializable, Cloneable {
    /** Create the table view synchronously */
    TableView<T> create() throws PulsarClientException;
    
    /** Create the table view asynchronously */
    CompletableFuture<TableView<T>> createAsync();
    
    /** Set topic name (required) */
    TableViewBuilder<T> topic(String topic);
    
    /** Set partition update interval */
    TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
    
    /** Set subscription name for position persistence */
    TableViewBuilder<T> subscriptionName(String subscriptionName);
    
    /** Set crypto key reader */
    TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
    
    /** Set default crypto key reader */
    TableViewBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
    
    /** Set default crypto key reader using key store */
    TableViewBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
    
    /** Load configuration from map */
    TableViewBuilder<T> loadConf(Map<String, Object> config);
}

TableView Examples:

// Basic table view
TableView<String> tableView = client.newTableView(Schema.STRING)
    .topic("user-profiles")
    .create();

// Access data like a map
String userProfile = tableView.get("user-123");
boolean hasUser = tableView.containsKey("user-456");

// Iterate over all entries
tableView.forEach((key, value) -> {
    System.out.println("Key: " + key + ", Value: " + value);
});

// Listen for updates
tableView.listen((key, value) -> {
    System.out.println("Updated: " + key + " = " + value);
});

// Table view with partition updates
TableView<String> tableView = client.newTableView(Schema.STRING)
    .topic("config-topic")
    .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
    .subscriptionName("config-reader")
    .create();

Message Positioning

Advanced message positioning capabilities for precise reading control.

/**
 * Message ID interface for positioning
 */
interface MessageId extends Comparable<MessageId>, Serializable {
    /** Serialize to byte array */
    byte[] toByteArray();
    
    /** Deserialize from byte array */
    static MessageId fromByteArray(byte[] data) throws IOException;
    
    /** Earliest message position */
    static final MessageId earliest;
    
    /** Latest message position */
    static final MessageId latest;
}

/**
 * Topic-specific message ID
 */
interface TopicMessageId extends MessageId {
    /** Get topic name */
    String getTopicName();
    
    /** Get inner message ID */
    MessageId getInnerMessageId();
}

Positioning Examples:

// Store and restore reader position
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageId(MessageId.earliest)
    .create();

// Process some messages and save position
Message<String> lastMessage = reader.readNext();
MessageId position = lastMessage.getMessageId();

// Save position to storage
byte[] positionBytes = position.toByteArray();
savePositionToStorage(positionBytes);

// Later, restore position
byte[] storedPosition = loadPositionFromStorage();
MessageId restoredPosition = MessageId.fromByteArray(storedPosition);

Reader<String> newReader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageId(restoredPosition)
    .create();

// Seek operations
reader.seek(MessageId.latest); // Jump to end
reader.seek(someOtherMessageId); // Jump to specific message
reader.seek(System.currentTimeMillis() - 3600000); // Jump to 1 hour ago

Supporting Types and Interfaces

interface ReaderListener<T> {
    /** Handle read message */
    void received(Reader<T> reader, Message<T> msg);
}

interface ReaderInterceptor<T> extends AutoCloseable {
    /** Intercept before read */
    Message<T> beforeRead(Reader<T> reader, Message<T> message);
    
    /** Handle partition changes */
    void onPartitionsChange(String topicName, int partitions);
    
    /** Close interceptor */
    void close();
}

class Range {
    /** Create range */
    static Range of(int start, int end);
    
    /** Get start of range */
    int getStart();
    
    /** Get end of range */
    int getEnd();
}

interface TopicMetadata {
    /** Get number of partitions */
    int getNumPartitions();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-client

docs

authentication-security.md

client-management.md

index.md

message-consumption.md

message-production.md

message-reading.md

schema-serialization.md

transaction-support.md

tile.json