CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.
—
Flexible message fetching capabilities with support for various starting positions, transaction isolation, and configurable limits. Enables building robust consumers with precise control over message consumption patterns.
Creates a MessageFetcher for configuring and executing message consumption from a topic.
/**
* Prepares to fetch messages from the given topic.
* @param topicId the topic to fetch message from
* @return a MessageFetcher for setting up parameters for fetching messages
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to fetch messages
* @throws ServiceUnavailableException if the messaging service is not available
*/
MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException;Usage Examples:
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
// Basic message consumption
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
while (messages.hasNext()) {
RawMessage message = messages.next();
String payload = new String(message.getPayload());
System.out.println("Message: " + payload);
}
}MessageFetcher provides a fluent API for configuring consumption parameters.
abstract class MessageFetcher {
/**
* Setup the message fetching starting point based on message id.
* @param startOffset the message id to start fetching from
* @param inclusive if true, includes the message identified by the given message id
* @return this instance
*/
MessageFetcher setStartMessage(byte[] startOffset, boolean inclusive);
/**
* Setup the message fetching start time (publish time).
* @param startTime timestamp in milliseconds
* @return this instance
* @throws IllegalArgumentException if startTime < 0
*/
MessageFetcher setStartTime(long startTime);
/**
* Sets the transaction to use for fetching (transactional consumption).
* @param transaction the transaction to use for reading messages
* @return this instance
*/
MessageFetcher setTransaction(Transaction transaction);
/**
* Sets the maximum limit on number of messages to be fetched.
* @param limit maximum number of messages (default: Integer.MAX_VALUE)
* @return this instance
* @throws IllegalArgumentException if limit <= 0
*/
MessageFetcher setLimit(int limit);
/**
* Returns a CloseableIterator that iterates over messages fetched from the messaging system.
* @throws TopicNotFoundException if the topic does not exist
* @throws IOException if it fails to create the iterator
*/
abstract CloseableIterator<RawMessage> fetch() throws TopicNotFoundException, IOException;
}Usage Examples:
// Fetch from specific message ID
byte[] lastProcessedId = // get from persistence
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
.setStartMessage(lastProcessedId, false) // exclusive start
.setLimit(100);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
// Process up to 100 messages starting after lastProcessedId
}
// Fetch from timestamp
long oneHourAgo = System.currentTimeMillis() - 3600000;
MessageFetcher timeFetcher = messagingService.prepareFetch(topicId)
.setStartTime(oneHourAgo)
.setLimit(1000);
try (CloseableIterator<RawMessage> messages = timeFetcher.fetch()) {
// Process messages from the last hour
}
// Transactional fetch
Transaction tx = // obtain transaction
MessageFetcher txFetcher = messagingService.prepareFetch(topicId)
.setTransaction(tx)
.setLimit(50);
try (CloseableIterator<RawMessage> messages = txFetcher.fetch()) {
// Messages fetched within transaction context
}RawMessage provides access to message content and metadata.
class RawMessage {
/**
* Creates a message with unique ID and payload.
*/
RawMessage(byte[] id, byte[] payload);
/** Returns the unique id of this message */
byte[] getId();
/** Returns the published content of this message */
byte[] getPayload();
}Usage Examples:
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
while (messages.hasNext()) {
RawMessage message = messages.next();
// Access message content
byte[] messageId = message.getId();
byte[] payload = message.getPayload();
// Convert to string if text content
String content = new String(payload, StandardCharsets.UTF_8);
// Parse message ID for timestamp info
MessageId parsedId = new MessageId(messageId);
long publishTime = parsedId.getPublishTimestamp();
System.out.println("Message published at: " + new Date(publishTime));
System.out.println("Content: " + content);
}
}Provides detailed information about message identifiers and timestamps.
class MessageId {
/** Size of raw ID in bytes */
static final int RAW_ID_SIZE = 24;
/**
* Creates instance based on raw id bytes.
*/
MessageId(byte[] rawId);
/**
* Computes the message raw ID and stores it in the given byte array.
*/
static int putRawId(long publishTimestamp, short sequenceId,
long writeTimestamp, short payloadSequenceId,
byte[] buffer, int offset);
/** Returns the publish timestamp in milliseconds */
long getPublishTimestamp();
/** Returns the sequence id generated when the message was written */
short getSequenceId();
/** Returns the timestamp when message was written to Payload Table */
long getPayloadWriteTimestamp();
/** Returns the sequence id for Payload Table entry */
short getPayloadSequenceId();
/** Returns the raw bytes representation of the message id */
byte[] getRawId();
}Usage Examples:
// Parse message ID for detailed information
RawMessage message = // obtained from fetch
MessageId messageId = new MessageId(message.getId());
System.out.println("Published: " + new Date(messageId.getPublishTimestamp()));
System.out.println("Sequence: " + messageId.getSequenceId());
System.out.println("Payload timestamp: " + messageId.getPayloadWriteTimestamp());
// Create message ID for starting position
byte[] buffer = new byte[MessageId.RAW_ID_SIZE];
long startTime = System.currentTimeMillis() - 3600000; // 1 hour ago
MessageId.putRawId(startTime, (short) 0, 0L, (short) 0, buffer, 0);
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
.setStartMessage(buffer, true);Process messages in order, tracking progress with message IDs.
String lastProcessedId = loadLastProcessedMessageId(); // from persistence
byte[] startOffset = lastProcessedId != null ?
Bytes.fromHexString(lastProcessedId) : null;
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
.setLimit(100);
if (startOffset != null) {
fetcher.setStartMessage(startOffset, false); // exclusive
}
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
String lastId = null;
while (messages.hasNext()) {
RawMessage message = messages.next();
// Process message
processMessage(message);
// Track progress
lastId = Bytes.toHexString(message.getId());
}
// Persist last processed ID
if (lastId != null) {
saveLastProcessedMessageId(lastId);
}
}Process messages from a specific time window.
long startTime = // calculate start time
long endTime = System.currentTimeMillis();
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
.setStartTime(startTime)
.setLimit(Integer.MAX_VALUE);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
while (messages.hasNext()) {
RawMessage message = messages.next();
MessageId messageId = new MessageId(message.getId());
// Stop if we've passed the end time
if (messageId.getPublishTimestamp() > endTime) {
break;
}
processMessage(message);
}
}Consume messages within a transaction context for consistency.
Transaction tx = transactionManager.startLong();
try {
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
.setTransaction(tx)
.setLimit(50);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
while (messages.hasNext()) {
RawMessage message = messages.next();
// Process message and update state transactionally
processMessageTransactionally(message, tx);
}
}
transactionManager.commit(tx);
} catch (Exception e) {
transactionManager.abort(tx);
throw e;
}Common consumption error scenarios:
try {
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
while (messages.hasNext()) {
RawMessage message = messages.next();
try {
processMessage(message);
} catch (Exception e) {
// Handle individual message processing errors
logError("Failed to process message", message.getId(), e);
// Continue with next message or implement retry logic
}
}
}
} catch (TopicNotFoundException e) {
System.out.println("Topic not found: " + e.getTopicName());
} catch (IOException e) {
System.out.println("Fetch failed: " + e.getMessage());
// Implement retry with backoff
} catch (ServiceUnavailableException e) {
System.out.println("Service unavailable, retrying later");
}
// Validate fetcher parameters
try {
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
.setStartTime(-1); // This will throw IllegalArgumentException
} catch (IllegalArgumentException e) {
System.out.println("Invalid parameter: " + e.getMessage());
}// Use appropriate limits for your use case
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
.setLimit(1000); // Batch size based on message size and processing time
// For high-throughput scenarios, process in chunks
int batchSize = 500;
byte[] lastMessageId = null;
while (true) {
MessageFetcher chunkFetcher = messagingService.prepareFetch(topicId)
.setLimit(batchSize);
if (lastMessageId != null) {
chunkFetcher.setStartMessage(lastMessageId, false);
}
int processedCount = 0;
try (CloseableIterator<RawMessage> messages = chunkFetcher.fetch()) {
while (messages.hasNext()) {
RawMessage message = messages.next();
processMessage(message);
lastMessageId = message.getId();
processedCount++;
}
}
// Break if we got fewer messages than batch size
if (processedCount < batchSize) {
break;
}
}Always use try-with-resources for proper cleanup:
// Correct - iterator is properly closed
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
// Process messages
}
// Incorrect - potential resource leak
CloseableIterator<RawMessage> messages = fetcher.fetch();
// Process messages
// messages.close() might not be called if exception occursInstall with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-tms