or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-services.mdhigh-level-consumers.mdindex.mdmessage-consumption.mdmessage-publishing.mdtopic-management.md

message-consumption.mddocs/

0

# Message Consumption

1

2

Flexible message fetching capabilities with support for various starting positions, transaction isolation, and configurable limits. Enables building robust consumers with precise control over message consumption patterns.

3

4

## Capabilities

5

6

### Prepare Message Fetcher

7

8

Creates a MessageFetcher for configuring and executing message consumption from a topic.

9

10

```java { .api }

11

/**

12

* Prepares to fetch messages from the given topic.

13

* @param topicId the topic to fetch message from

14

* @return a MessageFetcher for setting up parameters for fetching messages

15

* @throws TopicNotFoundException if the topic doesn't exist

16

* @throws IOException if failed to fetch messages

17

* @throws ServiceUnavailableException if the messaging service is not available

18

*/

19

MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException;

20

```

21

22

**Usage Examples:**

23

24

```java

25

import co.cask.cdap.messaging.MessageFetcher;

26

import co.cask.cdap.messaging.data.RawMessage;

27

import co.cask.cdap.api.dataset.lib.CloseableIterator;

28

29

// Basic message consumption

30

MessageFetcher fetcher = messagingService.prepareFetch(topicId);

31

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

32

while (messages.hasNext()) {

33

RawMessage message = messages.next();

34

String payload = new String(message.getPayload());

35

System.out.println("Message: " + payload);

36

}

37

}

38

```

39

40

### Configure Fetching Parameters

41

42

MessageFetcher provides a fluent API for configuring consumption parameters.

43

44

```java { .api }

45

abstract class MessageFetcher {

46

/**

47

* Setup the message fetching starting point based on message id.

48

* @param startOffset the message id to start fetching from

49

* @param inclusive if true, includes the message identified by the given message id

50

* @return this instance

51

*/

52

MessageFetcher setStartMessage(byte[] startOffset, boolean inclusive);

53

54

/**

55

* Setup the message fetching start time (publish time).

56

* @param startTime timestamp in milliseconds

57

* @return this instance

58

* @throws IllegalArgumentException if startTime < 0

59

*/

60

MessageFetcher setStartTime(long startTime);

61

62

/**

63

* Sets the transaction to use for fetching (transactional consumption).

64

* @param transaction the transaction to use for reading messages

65

* @return this instance

66

*/

67

MessageFetcher setTransaction(Transaction transaction);

68

69

/**

70

* Sets the maximum limit on number of messages to be fetched.

71

* @param limit maximum number of messages (default: Integer.MAX_VALUE)

72

* @return this instance

73

* @throws IllegalArgumentException if limit <= 0

74

*/

75

MessageFetcher setLimit(int limit);

76

77

/**

78

* Returns a CloseableIterator that iterates over messages fetched from the messaging system.

79

* @throws TopicNotFoundException if the topic does not exist

80

* @throws IOException if it fails to create the iterator

81

*/

82

abstract CloseableIterator<RawMessage> fetch() throws TopicNotFoundException, IOException;

83

}

84

```

85

86

**Usage Examples:**

87

88

```java

89

// Fetch from specific message ID

90

byte[] lastProcessedId = // get from persistence

91

MessageFetcher fetcher = messagingService.prepareFetch(topicId)

92

.setStartMessage(lastProcessedId, false) // exclusive start

93

.setLimit(100);

94

95

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

96

// Process up to 100 messages starting after lastProcessedId

97

}

98

99

// Fetch from timestamp

100

long oneHourAgo = System.currentTimeMillis() - 3600000;

101

MessageFetcher timeFetcher = messagingService.prepareFetch(topicId)

102

.setStartTime(oneHourAgo)

103

.setLimit(1000);

104

105

try (CloseableIterator<RawMessage> messages = timeFetcher.fetch()) {

106

// Process messages from the last hour

107

}

108

109

// Transactional fetch

110

Transaction tx = // obtain transaction

111

MessageFetcher txFetcher = messagingService.prepareFetch(topicId)

112

.setTransaction(tx)

113

.setLimit(50);

114

115

try (CloseableIterator<RawMessage> messages = txFetcher.fetch()) {

116

// Messages fetched within transaction context

117

}

118

```

119

120

### Message Data Access

121

122

RawMessage provides access to message content and metadata.

123

124

```java { .api }

125

class RawMessage {

126

/**

127

* Creates a message with unique ID and payload.

128

*/

129

RawMessage(byte[] id, byte[] payload);

130

131

/** Returns the unique id of this message */

132

byte[] getId();

133

134

/** Returns the published content of this message */

135

byte[] getPayload();

136

}

137

```

138

139

**Usage Examples:**

140

141

```java

142

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

143

while (messages.hasNext()) {

144

RawMessage message = messages.next();

145

146

// Access message content

147

byte[] messageId = message.getId();

148

byte[] payload = message.getPayload();

149

150

// Convert to string if text content

151

String content = new String(payload, StandardCharsets.UTF_8);

152

153

// Parse message ID for timestamp info

154

MessageId parsedId = new MessageId(messageId);

155

long publishTime = parsedId.getPublishTimestamp();

156

157

System.out.println("Message published at: " + new Date(publishTime));

158

System.out.println("Content: " + content);

159

}

160

}

161

```

162

163

## Message ID Operations

164

165

### MessageId Class

166

167

Provides detailed information about message identifiers and timestamps.

168

169

```java { .api }

170

class MessageId {

171

/** Size of raw ID in bytes */

172

static final int RAW_ID_SIZE = 24;

173

174

/**

175

* Creates instance based on raw id bytes.

176

*/

177

MessageId(byte[] rawId);

178

179

/**

180

* Computes the message raw ID and stores it in the given byte array.

181

*/

182

static int putRawId(long publishTimestamp, short sequenceId,

183

long writeTimestamp, short payloadSequenceId,

184

byte[] buffer, int offset);

185

186

/** Returns the publish timestamp in milliseconds */

187

long getPublishTimestamp();

188

189

/** Returns the sequence id generated when the message was written */

190

short getSequenceId();

191

192

/** Returns the timestamp when message was written to Payload Table */

193

long getPayloadWriteTimestamp();

194

195

/** Returns the sequence id for Payload Table entry */

196

short getPayloadSequenceId();

197

198

/** Returns the raw bytes representation of the message id */

199

byte[] getRawId();

200

}

201

```

202

203

**Usage Examples:**

204

205

```java

206

// Parse message ID for detailed information

207

RawMessage message = // obtained from fetch

208

MessageId messageId = new MessageId(message.getId());

209

210

System.out.println("Published: " + new Date(messageId.getPublishTimestamp()));

211

System.out.println("Sequence: " + messageId.getSequenceId());

212

System.out.println("Payload timestamp: " + messageId.getPayloadWriteTimestamp());

213

214

// Create message ID for starting position

215

byte[] buffer = new byte[MessageId.RAW_ID_SIZE];

216

long startTime = System.currentTimeMillis() - 3600000; // 1 hour ago

217

MessageId.putRawId(startTime, (short) 0, 0L, (short) 0, buffer, 0);

218

219

MessageFetcher fetcher = messagingService.prepareFetch(topicId)

220

.setStartMessage(buffer, true);

221

```

222

223

## Consumption Patterns

224

225

### Sequential Processing

226

227

Process messages in order, tracking progress with message IDs.

228

229

```java

230

String lastProcessedId = loadLastProcessedMessageId(); // from persistence

231

byte[] startOffset = lastProcessedId != null ?

232

Bytes.fromHexString(lastProcessedId) : null;

233

234

MessageFetcher fetcher = messagingService.prepareFetch(topicId)

235

.setLimit(100);

236

237

if (startOffset != null) {

238

fetcher.setStartMessage(startOffset, false); // exclusive

239

}

240

241

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

242

String lastId = null;

243

while (messages.hasNext()) {

244

RawMessage message = messages.next();

245

246

// Process message

247

processMessage(message);

248

249

// Track progress

250

lastId = Bytes.toHexString(message.getId());

251

}

252

253

// Persist last processed ID

254

if (lastId != null) {

255

saveLastProcessedMessageId(lastId);

256

}

257

}

258

```

259

260

### Time-Based Processing

261

262

Process messages from a specific time window.

263

264

```java

265

long startTime = // calculate start time

266

long endTime = System.currentTimeMillis();

267

268

MessageFetcher fetcher = messagingService.prepareFetch(topicId)

269

.setStartTime(startTime)

270

.setLimit(Integer.MAX_VALUE);

271

272

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

273

while (messages.hasNext()) {

274

RawMessage message = messages.next();

275

MessageId messageId = new MessageId(message.getId());

276

277

// Stop if we've passed the end time

278

if (messageId.getPublishTimestamp() > endTime) {

279

break;

280

}

281

282

processMessage(message);

283

}

284

}

285

```

286

287

### Transactional Consumption

288

289

Consume messages within a transaction context for consistency.

290

291

```java

292

Transaction tx = transactionManager.startLong();

293

try {

294

MessageFetcher fetcher = messagingService.prepareFetch(topicId)

295

.setTransaction(tx)

296

.setLimit(50);

297

298

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

299

while (messages.hasNext()) {

300

RawMessage message = messages.next();

301

302

// Process message and update state transactionally

303

processMessageTransactionally(message, tx);

304

}

305

}

306

307

transactionManager.commit(tx);

308

} catch (Exception e) {

309

transactionManager.abort(tx);

310

throw e;

311

}

312

```

313

314

## Error Handling

315

316

Common consumption error scenarios:

317

318

```java

319

try {

320

MessageFetcher fetcher = messagingService.prepareFetch(topicId);

321

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

322

while (messages.hasNext()) {

323

RawMessage message = messages.next();

324

try {

325

processMessage(message);

326

} catch (Exception e) {

327

// Handle individual message processing errors

328

logError("Failed to process message", message.getId(), e);

329

// Continue with next message or implement retry logic

330

}

331

}

332

}

333

} catch (TopicNotFoundException e) {

334

System.out.println("Topic not found: " + e.getTopicName());

335

} catch (IOException e) {

336

System.out.println("Fetch failed: " + e.getMessage());

337

// Implement retry with backoff

338

} catch (ServiceUnavailableException e) {

339

System.out.println("Service unavailable, retrying later");

340

}

341

342

// Validate fetcher parameters

343

try {

344

MessageFetcher fetcher = messagingService.prepareFetch(topicId)

345

.setStartTime(-1); // This will throw IllegalArgumentException

346

} catch (IllegalArgumentException e) {

347

System.out.println("Invalid parameter: " + e.getMessage());

348

}

349

```

350

351

## Performance Considerations

352

353

### Optimal Batch Sizes

354

355

```java

356

// Use appropriate limits for your use case

357

MessageFetcher fetcher = messagingService.prepareFetch(topicId)

358

.setLimit(1000); // Batch size based on message size and processing time

359

360

// For high-throughput scenarios, process in chunks

361

int batchSize = 500;

362

byte[] lastMessageId = null;

363

364

while (true) {

365

MessageFetcher chunkFetcher = messagingService.prepareFetch(topicId)

366

.setLimit(batchSize);

367

368

if (lastMessageId != null) {

369

chunkFetcher.setStartMessage(lastMessageId, false);

370

}

371

372

int processedCount = 0;

373

try (CloseableIterator<RawMessage> messages = chunkFetcher.fetch()) {

374

while (messages.hasNext()) {

375

RawMessage message = messages.next();

376

processMessage(message);

377

lastMessageId = message.getId();

378

processedCount++;

379

}

380

}

381

382

// Break if we got fewer messages than batch size

383

if (processedCount < batchSize) {

384

break;

385

}

386

}

387

```

388

389

### Resource Management

390

391

Always use try-with-resources for proper cleanup:

392

393

```java

394

// Correct - iterator is properly closed

395

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {

396

// Process messages

397

}

398

399

// Incorrect - potential resource leak

400

CloseableIterator<RawMessage> messages = fetcher.fetch();

401

// Process messages

402

// messages.close() might not be called if exception occurs

403

```