0
# Message Reading
1
2
Low-level message reading with manual positioning for replay scenarios, custom consumption patterns, and precise message access control.
3
4
## Capabilities
5
6
### Reader Interface
7
8
Low-level interface for reading messages with manual positioning, without using subscriptions.
9
10
```java { .api }
11
/**
12
* Interface for reading messages with manual positioning
13
* Provides low-level abstraction for manual positioning in topics without subscriptions
14
* Suitable for replay scenarios and custom consumption patterns
15
*/
16
interface Reader<T> extends Closeable {
17
/** Get topic name */
18
String getTopic();
19
20
/** Read next message synchronously (blocks until message available) */
21
Message<T> readNext() throws PulsarClientException;
22
23
/** Read next message asynchronously */
24
CompletableFuture<Message<T>> readNextAsync();
25
26
/** Read next message with timeout */
27
Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException;
28
29
/** Seek to specific message ID */
30
void seek(MessageId messageId) throws PulsarClientException;
31
32
/** Seek to specific message ID asynchronously */
33
CompletableFuture<Void> seekAsync(MessageId messageId);
34
35
/** Seek to specific timestamp */
36
void seek(long timestamp) throws PulsarClientException;
37
38
/** Seek to specific timestamp asynchronously */
39
CompletableFuture<Void> seekAsync(long timestamp);
40
41
/** Seek using custom function */
42
void seek(Function<String, Object> function) throws PulsarClientException;
43
44
/** Seek using custom function asynchronously */
45
CompletableFuture<Void> seekAsync(Function<String, Object> function);
46
47
/** Check if messages are available */
48
boolean hasMessageAvailable() throws PulsarClientException;
49
50
/** Check if messages are available asynchronously */
51
CompletableFuture<Boolean> hasMessageAvailableAsync();
52
53
/** Check if reader is connected */
54
boolean isConnected();
55
56
/** Check if reader has reached end of topic */
57
boolean hasReachedEndOfTopic();
58
59
/** Get last message IDs for all partitions */
60
List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
61
62
/** Get last message IDs for all partitions asynchronously */
63
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();
64
65
/** Close reader */
66
void close() throws PulsarClientException;
67
68
/** Close reader asynchronously */
69
CompletableFuture<Void> closeAsync();
70
}
71
```
72
73
**Usage Examples:**
74
75
```java
76
import org.apache.pulsar.client.api.*;
77
78
// Create reader starting from earliest message
79
Reader<String> reader = client.newReader(Schema.STRING)
80
.topic("my-topic")
81
.startMessageId(MessageId.earliest)
82
.create();
83
84
// Read messages sequentially
85
while (reader.hasMessageAvailable()) {
86
Message<String> message = reader.readNext();
87
System.out.println("Read: " + message.getValue());
88
// Note: Readers don't need acknowledgments
89
}
90
91
// Async reading
92
reader.readNextAsync()
93
.thenAccept(message -> {
94
System.out.println("Async read: " + message.getValue());
95
})
96
.exceptionally(throwable -> {
97
System.err.println("Read failed: " + throwable.getMessage());
98
return null;
99
});
100
101
// Seek to specific position
102
MessageId specificMessageId = getStoredMessageId();
103
reader.seek(specificMessageId);
104
Message<String> message = reader.readNext();
105
106
// Seek to timestamp
107
long timestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);
108
reader.seek(timestamp);
109
```
110
111
### ReaderBuilder Configuration
112
113
Builder interface for configuring and creating Reader instances.
114
115
```java { .api }
116
/**
117
* Builder for configuring and creating Reader instances
118
*/
119
interface ReaderBuilder<T> extends Serializable, Cloneable {
120
/** Create the reader synchronously */
121
Reader<T> create() throws PulsarClientException;
122
123
/** Create the reader asynchronously */
124
CompletableFuture<Reader<T>> createAsync();
125
126
/** Clone the builder */
127
ReaderBuilder<T> clone();
128
129
/** Set topic name (required) */
130
ReaderBuilder<T> topic(String topicName);
131
132
/** Set start message ID (required) */
133
ReaderBuilder<T> startMessageId(MessageId startMessageId);
134
135
/** Start from rollback duration */
136
ReaderBuilder<T> startMessageFromRollbackDuration(long rollbackDuration, TimeUnit timeunit);
137
138
/** Set reader name (optional, auto-generated if not set) */
139
ReaderBuilder<T> readerName(String readerName);
140
141
/** Set subscription name for position persistence */
142
ReaderBuilder<T> subscriptionName(String subscriptionName);
143
144
/** Set subscription role prefix */
145
ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix);
146
147
/** Set receiver queue size (default: 1000) */
148
ReaderBuilder<T> receiverQueueSize(int receiverQueueSize);
149
150
/** Set reader listener for push-style reading */
151
ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
152
153
/** Set reader listener executor */
154
ReaderBuilder<T> readerListenerExecutor(Executor executor);
155
156
/** Enable reading compacted messages only */
157
ReaderBuilder<T> readCompacted(boolean readCompacted);
158
159
/** Reset cursor to start position on reconnection */
160
ReaderBuilder<T> resetIncludeHead(boolean resetIncludeHead);
161
162
/** Set reader configuration */
163
ReaderBuilder<T> loadConf(Map<String, Object> config);
164
165
/** Add reader interceptor */
166
ReaderBuilder<T> intercept(ReaderInterceptor<T> interceptor);
167
168
/** Set key hash ranges for multi-topic readers */
169
ReaderBuilder<T> keyHashRange(Range... ranges);
170
171
/** Enable pooling messages */
172
ReaderBuilder<T> poolMessages(boolean poolMessages);
173
174
/** Start message ID inclusive */
175
ReaderBuilder<T> startMessageIdInclusive();
176
}
177
```
178
179
### Encryption Configuration
180
181
Configure message decryption for readers.
182
183
```java { .api }
184
interface ReaderBuilder<T> {
185
/** Set crypto key reader */
186
ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
187
188
/** Set default crypto key reader using private key path */
189
ReaderBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
190
191
/** Set default crypto key reader using key store */
192
ReaderBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
193
194
/** Set crypto failure action */
195
ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
196
}
197
```
198
199
**Reader Configuration Examples:**
200
201
```java
202
// Basic reader from earliest
203
Reader<String> reader = client.newReader(Schema.STRING)
204
.topic("my-topic")
205
.startMessageId(MessageId.earliest)
206
.readerName("my-reader")
207
.create();
208
209
// Reader from specific message ID
210
MessageId lastProcessedId = getLastProcessedMessageId();
211
Reader<String> reader = client.newReader(Schema.STRING)
212
.topic("my-topic")
213
.startMessageId(lastProcessedId)
214
.receiverQueueSize(2000)
215
.create();
216
217
// Reader with persistent position using subscription
218
Reader<String> reader = client.newReader(Schema.STRING)
219
.topic("my-topic")
220
.startMessageId(MessageId.latest)
221
.subscriptionName("reader-position")
222
.create();
223
224
// Reader from time-based position
225
Reader<String> reader = client.newReader(Schema.STRING)
226
.topic("my-topic")
227
.startMessageFromRollbackDuration(1, TimeUnit.HOURS)
228
.create();
229
230
// Compacted topic reader
231
Reader<String> reader = client.newReader(Schema.STRING)
232
.topic("compacted-topic")
233
.startMessageId(MessageId.earliest)
234
.readCompacted(true)
235
.create();
236
237
// Reader with push-style listener
238
Reader<String> reader = client.newReader(Schema.STRING)
239
.topic("listener-topic")
240
.startMessageId(MessageId.latest)
241
.readerListener((reader, message) -> {
242
System.out.println("Listener received: " + message.getValue());
243
})
244
.create();
245
```
246
247
### TableView Interface
248
249
Key-value view of compacted topics, providing map-like access to the latest values.
250
251
```java { .api }
252
/**
253
* Key-value view of a compacted topic
254
* Provides map-like interface to latest values for each key
255
* Messages without keys are ignored
256
*/
257
interface TableView<T> extends Closeable {
258
/** Get number of entries */
259
int size();
260
261
/** Check if table is empty */
262
boolean isEmpty();
263
264
/** Check if key exists */
265
boolean containsKey(String key);
266
267
/** Get value by key */
268
T get(String key);
269
270
/** Get all keys */
271
Set<String> keySet();
272
273
/** Get all values */
274
Collection<T> values();
275
276
/** Get all entries */
277
Set<Map.Entry<String, T>> entrySet();
278
279
/** Iterate over all entries */
280
void forEach(BiConsumer<String, T> action);
281
282
/** Refresh table view asynchronously */
283
CompletableFuture<Void> refreshAsync();
284
285
/** Listen for table updates */
286
void listen(BiConsumer<String, T> action);
287
288
/** Get topic name */
289
String getTopic();
290
291
/** Close table view */
292
void close() throws PulsarClientException;
293
294
/** Close table view asynchronously */
295
CompletableFuture<Void> closeAsync();
296
}
297
```
298
299
### TableViewBuilder Configuration
300
301
Builder interface for configuring and creating TableView instances.
302
303
```java { .api }
304
/**
305
* Builder for configuring and creating TableView instances
306
*/
307
interface TableViewBuilder<T> extends Serializable, Cloneable {
308
/** Create the table view synchronously */
309
TableView<T> create() throws PulsarClientException;
310
311
/** Create the table view asynchronously */
312
CompletableFuture<TableView<T>> createAsync();
313
314
/** Set topic name (required) */
315
TableViewBuilder<T> topic(String topic);
316
317
/** Set partition update interval */
318
TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
319
320
/** Set subscription name for position persistence */
321
TableViewBuilder<T> subscriptionName(String subscriptionName);
322
323
/** Set crypto key reader */
324
TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
325
326
/** Set default crypto key reader */
327
TableViewBuilder<T> defaultCryptoKeyReader(String privateKeyPath);
328
329
/** Set default crypto key reader using key store */
330
TableViewBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
331
332
/** Load configuration from map */
333
TableViewBuilder<T> loadConf(Map<String, Object> config);
334
}
335
```
336
337
**TableView Examples:**
338
339
```java
340
// Basic table view
341
TableView<String> tableView = client.newTableView(Schema.STRING)
342
.topic("user-profiles")
343
.create();
344
345
// Access data like a map
346
String userProfile = tableView.get("user-123");
347
boolean hasUser = tableView.containsKey("user-456");
348
349
// Iterate over all entries
350
tableView.forEach((key, value) -> {
351
System.out.println("Key: " + key + ", Value: " + value);
352
});
353
354
// Listen for updates
355
tableView.listen((key, value) -> {
356
System.out.println("Updated: " + key + " = " + value);
357
});
358
359
// Table view with partition updates
360
TableView<String> tableView = client.newTableView(Schema.STRING)
361
.topic("config-topic")
362
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
363
.subscriptionName("config-reader")
364
.create();
365
```
366
367
### Message Positioning
368
369
Advanced message positioning capabilities for precise reading control.
370
371
```java { .api }
372
/**
373
* Message ID interface for positioning
374
*/
375
interface MessageId extends Comparable<MessageId>, Serializable {
376
/** Serialize to byte array */
377
byte[] toByteArray();
378
379
/** Deserialize from byte array */
380
static MessageId fromByteArray(byte[] data) throws IOException;
381
382
/** Earliest message position */
383
static final MessageId earliest;
384
385
/** Latest message position */
386
static final MessageId latest;
387
}
388
389
/**
390
* Topic-specific message ID
391
*/
392
interface TopicMessageId extends MessageId {
393
/** Get topic name */
394
String getTopicName();
395
396
/** Get inner message ID */
397
MessageId getInnerMessageId();
398
}
399
```
400
401
**Positioning Examples:**
402
403
```java
404
// Store and restore reader position
405
Reader<String> reader = client.newReader(Schema.STRING)
406
.topic("my-topic")
407
.startMessageId(MessageId.earliest)
408
.create();
409
410
// Process some messages and save position
411
Message<String> lastMessage = reader.readNext();
412
MessageId position = lastMessage.getMessageId();
413
414
// Save position to storage
415
byte[] positionBytes = position.toByteArray();
416
savePositionToStorage(positionBytes);
417
418
// Later, restore position
419
byte[] storedPosition = loadPositionFromStorage();
420
MessageId restoredPosition = MessageId.fromByteArray(storedPosition);
421
422
Reader<String> newReader = client.newReader(Schema.STRING)
423
.topic("my-topic")
424
.startMessageId(restoredPosition)
425
.create();
426
427
// Seek operations
428
reader.seek(MessageId.latest); // Jump to end
429
reader.seek(someOtherMessageId); // Jump to specific message
430
reader.seek(System.currentTimeMillis() - 3600000); // Jump to 1 hour ago
431
```
432
433
## Supporting Types and Interfaces
434
435
```java { .api }
436
interface ReaderListener<T> {
437
/** Handle read message */
438
void received(Reader<T> reader, Message<T> msg);
439
}
440
441
interface ReaderInterceptor<T> extends AutoCloseable {
442
/** Intercept before read */
443
Message<T> beforeRead(Reader<T> reader, Message<T> message);
444
445
/** Handle partition changes */
446
void onPartitionsChange(String topicName, int partitions);
447
448
/** Close interceptor */
449
void close();
450
}
451
452
class Range {
453
/** Create range */
454
static Range of(int start, int end);
455
456
/** Get start of range */
457
int getStart();
458
459
/** Get end of range */
460
int getEnd();
461
}
462
463
interface TopicMetadata {
464
/** Get number of partitions */
465
int getNumPartitions();
466
}
467
```