or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdclient-management.mdindex.mdmessage-consumption.mdmessage-production.mdmessage-reading.mdschema-serialization.mdtransaction-support.md

message-reading.mddocs/

0

# Message Reading

1

2

Low-level message reading with manual positioning for replay scenarios, custom consumption patterns, and precise message access control.

3

4

## Capabilities

5

6

### Reader Interface

7

8

Low-level interface for reading messages with manual positioning, without using subscriptions.

9

10

```java { .api }

11

/**

12

* Interface for reading messages with manual positioning

13

* Provides low-level abstraction for manual positioning in topics without subscriptions

14

* Suitable for replay scenarios and custom consumption patterns

15

*/

16

interface Reader<T> extends Closeable {

17

/** Get topic name */

18

String getTopic();

19

20

/** Read next message synchronously (blocks until message available) */

21

Message<T> readNext() throws PulsarClientException;

22

23

/** Read next message asynchronously */

24

CompletableFuture<Message<T>> readNextAsync();

25

26

/** Read next message with timeout */

27

Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException;

28

29

/** Seek to specific message ID */

30

void seek(MessageId messageId) throws PulsarClientException;

31

32

/** Seek to specific message ID asynchronously */

33

CompletableFuture<Void> seekAsync(MessageId messageId);

34

35

/** Seek to specific timestamp */

36

void seek(long timestamp) throws PulsarClientException;

37

38

/** Seek to specific timestamp asynchronously */

39

CompletableFuture<Void> seekAsync(long timestamp);

40

41

/** Seek using custom function */

42

void seek(Function<String, Object> function) throws PulsarClientException;

43

44

/** Seek using custom function asynchronously */

45

CompletableFuture<Void> seekAsync(Function<String, Object> function);

46

47

/** Check if messages are available */

48

boolean hasMessageAvailable() throws PulsarClientException;

49

50

/** Check if messages are available asynchronously */

51

CompletableFuture<Boolean> hasMessageAvailableAsync();

52

53

/** Check if reader is connected */

54

boolean isConnected();

55

56

/** Check if reader has reached end of topic */

57

boolean hasReachedEndOfTopic();

58

59

/** Get last message IDs for all partitions */

60

List<TopicMessageId> getLastMessageIds() throws PulsarClientException;

61

62

/** Get last message IDs for all partitions asynchronously */

63

CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();

64

65

/** Close reader */

66

void close() throws PulsarClientException;

67

68

/** Close reader asynchronously */

69

CompletableFuture<Void> closeAsync();

70

}

71

```

72

73

**Usage Examples:**

74

75

```java

76

import org.apache.pulsar.client.api.*;

77

78

// Create reader starting from earliest message

79

Reader<String> reader = client.newReader(Schema.STRING)

80

.topic("my-topic")

81

.startMessageId(MessageId.earliest)

82

.create();

83

84

// Read messages sequentially

85

while (reader.hasMessageAvailable()) {

86

Message<String> message = reader.readNext();

87

System.out.println("Read: " + message.getValue());

88

// Note: Readers don't need acknowledgments

89

}

90

91

// Async reading

92

reader.readNextAsync()

93

.thenAccept(message -> {

94

System.out.println("Async read: " + message.getValue());

95

})

96

.exceptionally(throwable -> {

97

System.err.println("Read failed: " + throwable.getMessage());

98

return null;

99

});

100

101

// Seek to specific position

102

MessageId specificMessageId = getStoredMessageId();

103

reader.seek(specificMessageId);

104

Message<String> message = reader.readNext();

105

106

// Seek to timestamp

107

long timestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);

108

reader.seek(timestamp);

109

```

110

111

### ReaderBuilder Configuration

112

113

Builder interface for configuring and creating Reader instances.

114

115

```java { .api }

116

/**

117

* Builder for configuring and creating Reader instances

118

*/

119

interface ReaderBuilder<T> extends Serializable, Cloneable {

120

/** Create the reader synchronously */

121

Reader<T> create() throws PulsarClientException;

122

123

/** Create the reader asynchronously */

124

CompletableFuture<Reader<T>> createAsync();

125

126

/** Clone the builder */

127

ReaderBuilder<T> clone();

128

129

/** Set topic name (required) */

130

ReaderBuilder<T> topic(String topicName);

131

132

/** Set start message ID (required) */

133

ReaderBuilder<T> startMessageId(MessageId startMessageId);

134

135

/** Start from rollback duration */

136

ReaderBuilder<T> startMessageFromRollbackDuration(long rollbackDuration, TimeUnit timeunit);

137

138

/** Set reader name (optional, auto-generated if not set) */

139

ReaderBuilder<T> readerName(String readerName);

140

141

/** Set subscription name for position persistence */

142

ReaderBuilder<T> subscriptionName(String subscriptionName);

143

144

/** Set subscription role prefix */

145

ReaderBuilder<T> subscriptionRolePrefix(String subscriptionRolePrefix);

146

147

/** Set receiver queue size (default: 1000) */

148

ReaderBuilder<T> receiverQueueSize(int receiverQueueSize);

149

150

/** Set reader listener for push-style reading */

151

ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);

152

153

/** Set reader listener executor */

154

ReaderBuilder<T> readerListenerExecutor(Executor executor);

155

156

/** Enable reading compacted messages only */

157

ReaderBuilder<T> readCompacted(boolean readCompacted);

158

159

/** Reset cursor to start position on reconnection */

160

ReaderBuilder<T> resetIncludeHead(boolean resetIncludeHead);

161

162

/** Set reader configuration */

163

ReaderBuilder<T> loadConf(Map<String, Object> config);

164

165

/** Add reader interceptor */

166

ReaderBuilder<T> intercept(ReaderInterceptor<T> interceptor);

167

168

/** Set key hash ranges for multi-topic readers */

169

ReaderBuilder<T> keyHashRange(Range... ranges);

170

171

/** Enable pooling messages */

172

ReaderBuilder<T> poolMessages(boolean poolMessages);

173

174

/** Start message ID inclusive */

175

ReaderBuilder<T> startMessageIdInclusive();

176

}

177

```

178

179

### Encryption Configuration

180

181

Configure message decryption for readers.

182

183

```java { .api }

184

interface ReaderBuilder<T> {

185

/** Set crypto key reader */

186

ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);

187

188

/** Set default crypto key reader using private key path */

189

ReaderBuilder<T> defaultCryptoKeyReader(String privateKeyPath);

190

191

/** Set default crypto key reader using key store */

192

ReaderBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);

193

194

/** Set crypto failure action */

195

ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);

196

}

197

```

198

199

**Reader Configuration Examples:**

200

201

```java

202

// Basic reader from earliest

203

Reader<String> reader = client.newReader(Schema.STRING)

204

.topic("my-topic")

205

.startMessageId(MessageId.earliest)

206

.readerName("my-reader")

207

.create();

208

209

// Reader from specific message ID

210

MessageId lastProcessedId = getLastProcessedMessageId();

211

Reader<String> reader = client.newReader(Schema.STRING)

212

.topic("my-topic")

213

.startMessageId(lastProcessedId)

214

.receiverQueueSize(2000)

215

.create();

216

217

// Reader with persistent position using subscription

218

Reader<String> reader = client.newReader(Schema.STRING)

219

.topic("my-topic")

220

.startMessageId(MessageId.latest)

221

.subscriptionName("reader-position")

222

.create();

223

224

// Reader from time-based position

225

Reader<String> reader = client.newReader(Schema.STRING)

226

.topic("my-topic")

227

.startMessageFromRollbackDuration(1, TimeUnit.HOURS)

228

.create();

229

230

// Compacted topic reader

231

Reader<String> reader = client.newReader(Schema.STRING)

232

.topic("compacted-topic")

233

.startMessageId(MessageId.earliest)

234

.readCompacted(true)

235

.create();

236

237

// Reader with push-style listener

238

Reader<String> reader = client.newReader(Schema.STRING)

239

.topic("listener-topic")

240

.startMessageId(MessageId.latest)

241

.readerListener((reader, message) -> {

242

System.out.println("Listener received: " + message.getValue());

243

})

244

.create();

245

```

246

247

### TableView Interface

248

249

Key-value view of compacted topics, providing map-like access to the latest values.

250

251

```java { .api }

252

/**

253

* Key-value view of a compacted topic

254

* Provides map-like interface to latest values for each key

255

* Messages without keys are ignored

256

*/

257

interface TableView<T> extends Closeable {

258

/** Get number of entries */

259

int size();

260

261

/** Check if table is empty */

262

boolean isEmpty();

263

264

/** Check if key exists */

265

boolean containsKey(String key);

266

267

/** Get value by key */

268

T get(String key);

269

270

/** Get all keys */

271

Set<String> keySet();

272

273

/** Get all values */

274

Collection<T> values();

275

276

/** Get all entries */

277

Set<Map.Entry<String, T>> entrySet();

278

279

/** Iterate over all entries */

280

void forEach(BiConsumer<String, T> action);

281

282

/** Refresh table view asynchronously */

283

CompletableFuture<Void> refreshAsync();

284

285

/** Listen for table updates */

286

void listen(BiConsumer<String, T> action);

287

288

/** Get topic name */

289

String getTopic();

290

291

/** Close table view */

292

void close() throws PulsarClientException;

293

294

/** Close table view asynchronously */

295

CompletableFuture<Void> closeAsync();

296

}

297

```

298

299

### TableViewBuilder Configuration

300

301

Builder interface for configuring and creating TableView instances.

302

303

```java { .api }

304

/**

305

* Builder for configuring and creating TableView instances

306

*/

307

interface TableViewBuilder<T> extends Serializable, Cloneable {

308

/** Create the table view synchronously */

309

TableView<T> create() throws PulsarClientException;

310

311

/** Create the table view asynchronously */

312

CompletableFuture<TableView<T>> createAsync();

313

314

/** Set topic name (required) */

315

TableViewBuilder<T> topic(String topic);

316

317

/** Set partition update interval */

318

TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);

319

320

/** Set subscription name for position persistence */

321

TableViewBuilder<T> subscriptionName(String subscriptionName);

322

323

/** Set crypto key reader */

324

TableViewBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);

325

326

/** Set default crypto key reader */

327

TableViewBuilder<T> defaultCryptoKeyReader(String privateKeyPath);

328

329

/** Set default crypto key reader using key store */

330

TableViewBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);

331

332

/** Load configuration from map */

333

TableViewBuilder<T> loadConf(Map<String, Object> config);

334

}

335

```

336

337

**TableView Examples:**

338

339

```java

340

// Basic table view

341

TableView<String> tableView = client.newTableView(Schema.STRING)

342

.topic("user-profiles")

343

.create();

344

345

// Access data like a map

346

String userProfile = tableView.get("user-123");

347

boolean hasUser = tableView.containsKey("user-456");

348

349

// Iterate over all entries

350

tableView.forEach((key, value) -> {

351

System.out.println("Key: " + key + ", Value: " + value);

352

});

353

354

// Listen for updates

355

tableView.listen((key, value) -> {

356

System.out.println("Updated: " + key + " = " + value);

357

});

358

359

// Table view with partition updates

360

TableView<String> tableView = client.newTableView(Schema.STRING)

361

.topic("config-topic")

362

.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)

363

.subscriptionName("config-reader")

364

.create();

365

```

366

367

### Message Positioning

368

369

Advanced message positioning capabilities for precise reading control.

370

371

```java { .api }

372

/**

373

* Message ID interface for positioning

374

*/

375

interface MessageId extends Comparable<MessageId>, Serializable {

376

/** Serialize to byte array */

377

byte[] toByteArray();

378

379

/** Deserialize from byte array */

380

static MessageId fromByteArray(byte[] data) throws IOException;

381

382

/** Earliest message position */

383

static final MessageId earliest;

384

385

/** Latest message position */

386

static final MessageId latest;

387

}

388

389

/**

390

* Topic-specific message ID

391

*/

392

interface TopicMessageId extends MessageId {

393

/** Get topic name */

394

String getTopicName();

395

396

/** Get inner message ID */

397

MessageId getInnerMessageId();

398

}

399

```

400

401

**Positioning Examples:**

402

403

```java

404

// Store and restore reader position

405

Reader<String> reader = client.newReader(Schema.STRING)

406

.topic("my-topic")

407

.startMessageId(MessageId.earliest)

408

.create();

409

410

// Process some messages and save position

411

Message<String> lastMessage = reader.readNext();

412

MessageId position = lastMessage.getMessageId();

413

414

// Save position to storage

415

byte[] positionBytes = position.toByteArray();

416

savePositionToStorage(positionBytes);

417

418

// Later, restore position

419

byte[] storedPosition = loadPositionFromStorage();

420

MessageId restoredPosition = MessageId.fromByteArray(storedPosition);

421

422

Reader<String> newReader = client.newReader(Schema.STRING)

423

.topic("my-topic")

424

.startMessageId(restoredPosition)

425

.create();

426

427

// Seek operations

428

reader.seek(MessageId.latest); // Jump to end

429

reader.seek(someOtherMessageId); // Jump to specific message

430

reader.seek(System.currentTimeMillis() - 3600000); // Jump to 1 hour ago

431

```

432

433

## Supporting Types and Interfaces

434

435

```java { .api }

436

interface ReaderListener<T> {

437

/** Handle read message */

438

void received(Reader<T> reader, Message<T> msg);

439

}

440

441

interface ReaderInterceptor<T> extends AutoCloseable {

442

/** Intercept before read */

443

Message<T> beforeRead(Reader<T> reader, Message<T> message);

444

445

/** Handle partition changes */

446

void onPartitionsChange(String topicName, int partitions);

447

448

/** Close interceptor */

449

void close();

450

}

451

452

class Range {

453

/** Create range */

454

static Range of(int start, int end);

455

456

/** Get start of range */

457

int getStart();

458

459

/** Get end of range */

460

int getEnd();

461

}

462

463

interface TopicMetadata {

464

/** Get number of partitions */

465

int getNumPartitions();

466

}

467

```