0
# Message Consumption
1
2
Flexible message fetching capabilities with support for various starting positions, transaction isolation, and configurable limits. Enables building robust consumers with precise control over message consumption patterns.
3
4
## Capabilities
5
6
### Prepare Message Fetcher
7
8
Creates a MessageFetcher for configuring and executing message consumption from a topic.
9
10
```java { .api }
11
/**
12
* Prepares to fetch messages from the given topic.
13
* @param topicId the topic to fetch message from
14
* @return a MessageFetcher for setting up parameters for fetching messages
15
* @throws TopicNotFoundException if the topic doesn't exist
16
* @throws IOException if failed to fetch messages
17
* @throws ServiceUnavailableException if the messaging service is not available
18
*/
19
MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException;
20
```
21
22
**Usage Examples:**
23
24
```java
25
import co.cask.cdap.messaging.MessageFetcher;
26
import co.cask.cdap.messaging.data.RawMessage;
27
import co.cask.cdap.api.dataset.lib.CloseableIterator;
28
29
// Basic message consumption
30
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
31
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
32
while (messages.hasNext()) {
33
RawMessage message = messages.next();
34
String payload = new String(message.getPayload());
35
System.out.println("Message: " + payload);
36
}
37
}
38
```
39
40
### Configure Fetching Parameters
41
42
MessageFetcher provides a fluent API for configuring consumption parameters.
43
44
```java { .api }
45
abstract class MessageFetcher {
46
/**
47
* Setup the message fetching starting point based on message id.
48
* @param startOffset the message id to start fetching from
49
* @param inclusive if true, includes the message identified by the given message id
50
* @return this instance
51
*/
52
MessageFetcher setStartMessage(byte[] startOffset, boolean inclusive);
53
54
/**
55
* Setup the message fetching start time (publish time).
56
* @param startTime timestamp in milliseconds
57
* @return this instance
58
* @throws IllegalArgumentException if startTime < 0
59
*/
60
MessageFetcher setStartTime(long startTime);
61
62
/**
63
* Sets the transaction to use for fetching (transactional consumption).
64
* @param transaction the transaction to use for reading messages
65
* @return this instance
66
*/
67
MessageFetcher setTransaction(Transaction transaction);
68
69
/**
70
* Sets the maximum limit on number of messages to be fetched.
71
* @param limit maximum number of messages (default: Integer.MAX_VALUE)
72
* @return this instance
73
* @throws IllegalArgumentException if limit <= 0
74
*/
75
MessageFetcher setLimit(int limit);
76
77
/**
78
* Returns a CloseableIterator that iterates over messages fetched from the messaging system.
79
* @throws TopicNotFoundException if the topic does not exist
80
* @throws IOException if it fails to create the iterator
81
*/
82
abstract CloseableIterator<RawMessage> fetch() throws TopicNotFoundException, IOException;
83
}
84
```
85
86
**Usage Examples:**
87
88
```java
89
// Fetch from specific message ID
90
byte[] lastProcessedId = // get from persistence
91
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
92
.setStartMessage(lastProcessedId, false) // exclusive start
93
.setLimit(100);
94
95
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
96
// Process up to 100 messages starting after lastProcessedId
97
}
98
99
// Fetch from timestamp
100
long oneHourAgo = System.currentTimeMillis() - 3600000;
101
MessageFetcher timeFetcher = messagingService.prepareFetch(topicId)
102
.setStartTime(oneHourAgo)
103
.setLimit(1000);
104
105
try (CloseableIterator<RawMessage> messages = timeFetcher.fetch()) {
106
// Process messages from the last hour
107
}
108
109
// Transactional fetch
110
Transaction tx = // obtain transaction
111
MessageFetcher txFetcher = messagingService.prepareFetch(topicId)
112
.setTransaction(tx)
113
.setLimit(50);
114
115
try (CloseableIterator<RawMessage> messages = txFetcher.fetch()) {
116
// Messages fetched within transaction context
117
}
118
```
119
120
### Message Data Access
121
122
RawMessage provides access to message content and metadata.
123
124
```java { .api }
125
class RawMessage {
126
/**
127
* Creates a message with unique ID and payload.
128
*/
129
RawMessage(byte[] id, byte[] payload);
130
131
/** Returns the unique id of this message */
132
byte[] getId();
133
134
/** Returns the published content of this message */
135
byte[] getPayload();
136
}
137
```
138
139
**Usage Examples:**
140
141
```java
142
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
143
while (messages.hasNext()) {
144
RawMessage message = messages.next();
145
146
// Access message content
147
byte[] messageId = message.getId();
148
byte[] payload = message.getPayload();
149
150
// Convert to string if text content
151
String content = new String(payload, StandardCharsets.UTF_8);
152
153
// Parse message ID for timestamp info
154
MessageId parsedId = new MessageId(messageId);
155
long publishTime = parsedId.getPublishTimestamp();
156
157
System.out.println("Message published at: " + new Date(publishTime));
158
System.out.println("Content: " + content);
159
}
160
}
161
```
162
163
## Message ID Operations
164
165
### MessageId Class
166
167
Provides detailed information about message identifiers and timestamps.
168
169
```java { .api }
170
class MessageId {
171
/** Size of raw ID in bytes */
172
static final int RAW_ID_SIZE = 24;
173
174
/**
175
* Creates instance based on raw id bytes.
176
*/
177
MessageId(byte[] rawId);
178
179
/**
180
* Computes the message raw ID and stores it in the given byte array.
181
*/
182
static int putRawId(long publishTimestamp, short sequenceId,
183
long writeTimestamp, short payloadSequenceId,
184
byte[] buffer, int offset);
185
186
/** Returns the publish timestamp in milliseconds */
187
long getPublishTimestamp();
188
189
/** Returns the sequence id generated when the message was written */
190
short getSequenceId();
191
192
/** Returns the timestamp when message was written to Payload Table */
193
long getPayloadWriteTimestamp();
194
195
/** Returns the sequence id for Payload Table entry */
196
short getPayloadSequenceId();
197
198
/** Returns the raw bytes representation of the message id */
199
byte[] getRawId();
200
}
201
```
202
203
**Usage Examples:**
204
205
```java
206
// Parse message ID for detailed information
207
RawMessage message = // obtained from fetch
208
MessageId messageId = new MessageId(message.getId());
209
210
System.out.println("Published: " + new Date(messageId.getPublishTimestamp()));
211
System.out.println("Sequence: " + messageId.getSequenceId());
212
System.out.println("Payload timestamp: " + messageId.getPayloadWriteTimestamp());
213
214
// Create message ID for starting position
215
byte[] buffer = new byte[MessageId.RAW_ID_SIZE];
216
long startTime = System.currentTimeMillis() - 3600000; // 1 hour ago
217
MessageId.putRawId(startTime, (short) 0, 0L, (short) 0, buffer, 0);
218
219
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
220
.setStartMessage(buffer, true);
221
```
222
223
## Consumption Patterns
224
225
### Sequential Processing
226
227
Process messages in order, tracking progress with message IDs.
228
229
```java
230
String lastProcessedId = loadLastProcessedMessageId(); // from persistence
231
byte[] startOffset = lastProcessedId != null ?
232
Bytes.fromHexString(lastProcessedId) : null;
233
234
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
235
.setLimit(100);
236
237
if (startOffset != null) {
238
fetcher.setStartMessage(startOffset, false); // exclusive
239
}
240
241
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
242
String lastId = null;
243
while (messages.hasNext()) {
244
RawMessage message = messages.next();
245
246
// Process message
247
processMessage(message);
248
249
// Track progress
250
lastId = Bytes.toHexString(message.getId());
251
}
252
253
// Persist last processed ID
254
if (lastId != null) {
255
saveLastProcessedMessageId(lastId);
256
}
257
}
258
```
259
260
### Time-Based Processing
261
262
Process messages from a specific time window.
263
264
```java
265
long startTime = // calculate start time
266
long endTime = System.currentTimeMillis();
267
268
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
269
.setStartTime(startTime)
270
.setLimit(Integer.MAX_VALUE);
271
272
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
273
while (messages.hasNext()) {
274
RawMessage message = messages.next();
275
MessageId messageId = new MessageId(message.getId());
276
277
// Stop if we've passed the end time
278
if (messageId.getPublishTimestamp() > endTime) {
279
break;
280
}
281
282
processMessage(message);
283
}
284
}
285
```
286
287
### Transactional Consumption
288
289
Consume messages within a transaction context for consistency.
290
291
```java
292
Transaction tx = transactionManager.startLong();
293
try {
294
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
295
.setTransaction(tx)
296
.setLimit(50);
297
298
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
299
while (messages.hasNext()) {
300
RawMessage message = messages.next();
301
302
// Process message and update state transactionally
303
processMessageTransactionally(message, tx);
304
}
305
}
306
307
transactionManager.commit(tx);
308
} catch (Exception e) {
309
transactionManager.abort(tx);
310
throw e;
311
}
312
```
313
314
## Error Handling
315
316
Common consumption error scenarios:
317
318
```java
319
try {
320
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
321
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
322
while (messages.hasNext()) {
323
RawMessage message = messages.next();
324
try {
325
processMessage(message);
326
} catch (Exception e) {
327
// Handle individual message processing errors
328
logError("Failed to process message", message.getId(), e);
329
// Continue with next message or implement retry logic
330
}
331
}
332
}
333
} catch (TopicNotFoundException e) {
334
System.out.println("Topic not found: " + e.getTopicName());
335
} catch (IOException e) {
336
System.out.println("Fetch failed: " + e.getMessage());
337
// Implement retry with backoff
338
} catch (ServiceUnavailableException e) {
339
System.out.println("Service unavailable, retrying later");
340
}
341
342
// Validate fetcher parameters
343
try {
344
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
345
.setStartTime(-1); // This will throw IllegalArgumentException
346
} catch (IllegalArgumentException e) {
347
System.out.println("Invalid parameter: " + e.getMessage());
348
}
349
```
350
351
## Performance Considerations
352
353
### Optimal Batch Sizes
354
355
```java
356
// Use appropriate limits for your use case
357
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
358
.setLimit(1000); // Batch size based on message size and processing time
359
360
// For high-throughput scenarios, process in chunks
361
int batchSize = 500;
362
byte[] lastMessageId = null;
363
364
while (true) {
365
MessageFetcher chunkFetcher = messagingService.prepareFetch(topicId)
366
.setLimit(batchSize);
367
368
if (lastMessageId != null) {
369
chunkFetcher.setStartMessage(lastMessageId, false);
370
}
371
372
int processedCount = 0;
373
try (CloseableIterator<RawMessage> messages = chunkFetcher.fetch()) {
374
while (messages.hasNext()) {
375
RawMessage message = messages.next();
376
processMessage(message);
377
lastMessageId = message.getId();
378
processedCount++;
379
}
380
}
381
382
// Break if we got fewer messages than batch size
383
if (processedCount < batchSize) {
384
break;
385
}
386
}
387
```
388
389
### Resource Management
390
391
Always use try-with-resources for proper cleanup:
392
393
```java
394
// Correct - iterator is properly closed
395
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
396
// Process messages
397
}
398
399
// Incorrect - potential resource leak
400
CloseableIterator<RawMessage> messages = fetcher.fetch();
401
// Process messages
402
// messages.close() might not be called if exception occurs
403
```