Apache Pulsar Java client library for distributed pub-sub messaging platform
—
Low-level message reading with manual positioning for replay scenarios, custom consumption patterns, and precise message access control.
Low-level interface for reading messages with manual positioning, without using subscriptions.
/**
* Interface for reading messages with manual positioning
* Provides low-level abstraction for manual positioning in topics without subscriptions
* Suitable for replay scenarios and custom consumption patterns
*/
interface Reader<T> extends Closeable {
/** Get topic name */
String getTopic();
/** Read next message synchronously (blocks until message available) */
Message<T> readNext() throws PulsarClientException;
/** Read next message asynchronously */
CompletableFuture<Message<T>> readNextAsync();
/** Read next message with timeout */
Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException;
/** Seek to specific message ID */
void seek(MessageId messageId) throws PulsarClientException;
/** Seek to specific message ID asynchronously */
CompletableFuture<Void> seekAsync(MessageId messageId);
/** Seek to specific timestamp */
void seek(long timestamp) throws PulsarClientException;
/** Seek to specific timestamp asynchronously */
CompletableFuture<Void> seekAsync(long timestamp);
/** Seek using custom function */
void seek(Function<String, Object> function) throws PulsarClientException;
/** Seek using custom function asynchronously */
CompletableFuture<Void> seekAsync(Function<String, Object> function);
/** Check if messages are available */
boolean hasMessageAvailable() throws PulsarClientException;
/** Check if messages are available asynchronously */
CompletableFuture<Boolean> hasMessageAvailableAsync();
/** Check if reader is connected */
boolean isConnected();
/** Check if reader has reached end of topic */
boolean hasReachedEndOfTopic();
/** Get last message IDs for all partitions */
List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
/** Get last message IDs for all partitions asynchronously */
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
/** Close reader */
void close() throws PulsarClientException;
/** Close reader asynchronously */
CompletableFuture<Void> closeAsync();
}Usage Examples:
import org.apache.pulsar.client.api.*;
// Create reader starting from earliest message
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(MessageId.earliest)
.create();
// Read messages sequentially
while (reader.hasMessageAvailable()) {
Message<String> message = reader.readNext();
System.out.println("Read: " + message.getValue());
// Note: Readers don't need acknowledgments
}
// Async reading
reader.readNextAsync()
.thenAccept(message -> {
System.out.println("Async read: " + message.getValue());
})
.exceptionally(throwable -> {
System.err.println("Read failed: " + throwable.getMessage());
return null;
});
// Seek to specific position
MessageId specificMessageId = getStoredMessageId();
reader.seek(specificMessageId);
Message<String> message = reader.readNext();
// Seek to timestamp
long timestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
reader.seek(timestamp);Builder interface for configuring and creating Reader instances.
/**
* Builder for configuring and creating Reader instances
*/
interface ReaderBuilder<T> extends Serializable, Cloneable {
/** Create the reader synchronously */
Reader<T> create() throws PulsarClientException;
/** Create the reader asynchronously */
CompletableFuture<Reader<T>> createAsync();
/** Clone the builder */
ReaderBuilder<T> clone();
/** Set topic name (required) */
ReaderBuilder<T> topic(String topicName);
/** Set start message ID (required) */
ReaderBuilder<T> startMessageId(MessageId startMessageId);
/** Start from rollback duration */
ReaderBuilder<T> startMessageFromRollbackDuration(long rollbackDuration, TimeUnit timeunit);
/** Set reader name (optional, auto-generated if not set) */
ReaderBuilder<T> readerName(String readerName);
/** Set subscription name for position persistence */
ReaderBuilder<T> subscriptionName(String subscriptionName);
/** Set subscription role prefix */
ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix);
/** Set receiver queue size (default: 1000) */
ReaderBuilder<T> receiverQueueSize(int receiverQueueSize);
/** Set reader listener for push-style reading */
ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
/** Set reader listener executor */
ReaderBuilder<T> readerListenerExecutor(Executor executor);
/** Enable reading compacted messages only */
ReaderBuilder<T> readCompacted(boolean readCompacted);
/** Reset cursor to start position on reconnection */
ReaderBuilder<T> resetIncludeHead(boolean resetIncludeHead);
/** Set reader configuration */
ReaderBuilder<T> loadConf(Map<String, Object> config);
/** Add reader interceptor */
ReaderBuilder<T> intercept(ReaderInterceptor<T> interceptor);
/** Set key hash ranges for multi-topic readers */
ReaderBuilder<T> keyHashRange(Range... ranges);
/** Enable pooling messages */
ReaderBuilder<T> poolMessages(boolean poolMessages);
/** Start message ID inclusive */
ReaderBuilder<T> startMessageIdInclusive();
}Configure message decryption for readers.
interface ReaderBuilder<T> {
/** Set crypto key reader */
ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
/** Set default crypto key reader using private key path */
ReaderBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
/** Set default crypto key reader using key store */
ReaderBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
/** Set crypto failure action */
ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
}Reader Configuration Examples:
// Basic reader from earliest
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(MessageId.earliest)
.readerName("my-reader")
.create();
// Reader from specific message ID
MessageId lastProcessedId = getLastProcessedMessageId();
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(lastProcessedId)
.receiverQueueSize(2000)
.create();
// Reader with persistent position using subscription
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(MessageId.latest)
.subscriptionName("reader-position")
.create();
// Reader from time-based position
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageFromRollbackDuration(1, TimeUnit.HOURS)
.create();
// Compacted topic reader
Reader<String> reader = client.newReader(Schema.STRING)
.topic("compacted-topic")
.startMessageId(MessageId.earliest)
.readCompacted(true)
.create();
// Reader with push-style listener
Reader<String> reader = client.newReader(Schema.STRING)
.topic("listener-topic")
.startMessageId(MessageId.latest)
.readerListener((reader, message) -> {
System.out.println("Listener received: " + message.getValue());
})
.create();Key-value view of compacted topics, providing map-like access to the latest values.
/**
* Key-value view of a compacted topic
* Provides map-like interface to latest values for each key
* Messages without keys are ignored
*/
interface TableView<T> extends Closeable {
/** Get number of entries */
int size();
/** Check if table is empty */
boolean isEmpty();
/** Check if key exists */
boolean containsKey(String key);
/** Get value by key */
T get(String key);
/** Get all keys */
Set<String> keySet();
/** Get all values */
Collection<T> values();
/** Get all entries */
Set<Map.Entry<String, T>> entrySet();
/** Iterate over all entries */
void forEach(BiConsumer<String, T> action);
/** Refresh table view asynchronously */
CompletableFuture<Void> refreshAsync();
/** Listen for table updates */
void listen(BiConsumer<String, T> action);
/** Get topic name */
String getTopic();
/** Close table view */
void close() throws PulsarClientException;
/** Close table view asynchronously */
CompletableFuture<Void> closeAsync();
}Builder interface for configuring and creating TableView instances.
/**
* Builder for configuring and creating TableView instances
*/
interface TableViewBuilder<T> extends Serializable, Cloneable {
/** Create the table view synchronously */
TableView<T> create() throws PulsarClientException;
/** Create the table view asynchronously */
CompletableFuture<TableView<T>> createAsync();
/** Set topic name (required) */
TableViewBuilder<T> topic(String topic);
/** Set partition update interval */
TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
/** Set subscription name for position persistence */
TableViewBuilder<T> subscriptionName(String subscriptionName);
/** Set crypto key reader */
TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
/** Set default crypto key reader */
TableViewBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
/** Set default crypto key reader using key store */
TableViewBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
/** Load configuration from map */
TableViewBuilder<T> loadConf(Map<String, Object> config);
}TableView Examples:
// Basic table view
TableView<String> tableView = client.newTableView(Schema.STRING)
.topic("user-profiles")
.create();
// Access data like a map
String userProfile = tableView.get("user-123");
boolean hasUser = tableView.containsKey("user-456");
// Iterate over all entries
tableView.forEach((key, value) -> {
System.out.println("Key: " + key + ", Value: " + value);
});
// Listen for updates
tableView.listen((key, value) -> {
System.out.println("Updated: " + key + " = " + value);
});
// Table view with partition updates
TableView<String> tableView = client.newTableView(Schema.STRING)
.topic("config-topic")
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.subscriptionName("config-reader")
.create();Advanced message positioning capabilities for precise reading control.
/**
* Message ID interface for positioning
*/
interface MessageId extends Comparable<MessageId>, Serializable {
/** Serialize to byte array */
byte[] toByteArray();
/** Deserialize from byte array */
static MessageId fromByteArray(byte[] data) throws IOException;
/** Earliest message position */
static final MessageId earliest;
/** Latest message position */
static final MessageId latest;
}
/**
* Topic-specific message ID
*/
interface TopicMessageId extends MessageId {
/** Get topic name */
String getTopicName();
/** Get inner message ID */
MessageId getInnerMessageId();
}Positioning Examples:
// Store and restore reader position
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(MessageId.earliest)
.create();
// Process some messages and save position
Message<String> lastMessage = reader.readNext();
MessageId position = lastMessage.getMessageId();
// Save position to storage
byte[] positionBytes = position.toByteArray();
savePositionToStorage(positionBytes);
// Later, restore position
byte[] storedPosition = loadPositionFromStorage();
MessageId restoredPosition = MessageId.fromByteArray(storedPosition);
Reader<String> newReader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(restoredPosition)
.create();
// Seek operations
reader.seek(MessageId.latest); // Jump to end
reader.seek(someOtherMessageId); // Jump to specific message
reader.seek(System.currentTimeMillis() - 3600000); // Jump to 1 hour agointerface ReaderListener<T> {
/** Handle read message */
void received(Reader<T> reader, Message<T> msg);
}
interface ReaderInterceptor<T> extends AutoCloseable {
/** Intercept before read */
Message<T> beforeRead(Reader<T> reader, Message<T> message);
/** Handle partition changes */
void onPartitionsChange(String topicName, int partitions);
/** Close interceptor */
void close();
}
class Range {
/** Create range */
static Range of(int start, int end);
/** Get start of range */
int getStart();
/** Get end of range */
int getEnd();
}
interface TopicMetadata {
/** Get number of partitions */
int getNumPartitions();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-client