CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-tms

CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.

Pending
Overview
Eval results
Files

message-consumption.mddocs/

Message Consumption

Flexible message fetching capabilities with support for various starting positions, transaction isolation, and configurable limits. Enables building robust consumers with precise control over message consumption patterns.

Capabilities

Prepare Message Fetcher

Creates a MessageFetcher for configuring and executing message consumption from a topic.

/**
 * Prepares to fetch messages from the given topic.
 * @param topicId the topic to fetch message from
 * @return a MessageFetcher for setting up parameters for fetching messages
 * @throws TopicNotFoundException if the topic doesn't exist
 * @throws IOException if failed to fetch messages
 * @throws ServiceUnavailableException if the messaging service is not available
 */
MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException;

Usage Examples:

import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.api.dataset.lib.CloseableIterator;

// Basic message consumption
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
    while (messages.hasNext()) {
        RawMessage message = messages.next();
        String payload = new String(message.getPayload());
        System.out.println("Message: " + payload);
    }
}

Configure Fetching Parameters

MessageFetcher provides a fluent API for configuring consumption parameters.

abstract class MessageFetcher {
    /**
     * Setup the message fetching starting point based on message id.
     * @param startOffset the message id to start fetching from
     * @param inclusive if true, includes the message identified by the given message id
     * @return this instance
     */
    MessageFetcher setStartMessage(byte[] startOffset, boolean inclusive);
    
    /**
     * Setup the message fetching start time (publish time).
     * @param startTime timestamp in milliseconds
     * @return this instance
     * @throws IllegalArgumentException if startTime < 0
     */
    MessageFetcher setStartTime(long startTime);
    
    /**
     * Sets the transaction to use for fetching (transactional consumption).
     * @param transaction the transaction to use for reading messages
     * @return this instance
     */
    MessageFetcher setTransaction(Transaction transaction);
    
    /**
     * Sets the maximum limit on number of messages to be fetched.
     * @param limit maximum number of messages (default: Integer.MAX_VALUE)
     * @return this instance
     * @throws IllegalArgumentException if limit <= 0
     */
    MessageFetcher setLimit(int limit);
    
    /**
     * Returns a CloseableIterator that iterates over messages fetched from the messaging system.
     * @throws TopicNotFoundException if the topic does not exist
     * @throws IOException if it fails to create the iterator
     */
    abstract CloseableIterator<RawMessage> fetch() throws TopicNotFoundException, IOException;
}

Usage Examples:

// Fetch from specific message ID
byte[] lastProcessedId = // get from persistence
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
    .setStartMessage(lastProcessedId, false) // exclusive start
    .setLimit(100);

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
    // Process up to 100 messages starting after lastProcessedId
}

// Fetch from timestamp
long oneHourAgo = System.currentTimeMillis() - 3600000;
MessageFetcher timeFetcher = messagingService.prepareFetch(topicId)
    .setStartTime(oneHourAgo)
    .setLimit(1000);

try (CloseableIterator<RawMessage> messages = timeFetcher.fetch()) {
    // Process messages from the last hour
}

// Transactional fetch
Transaction tx = // obtain transaction
MessageFetcher txFetcher = messagingService.prepareFetch(topicId)
    .setTransaction(tx)
    .setLimit(50);

try (CloseableIterator<RawMessage> messages = txFetcher.fetch()) {
    // Messages fetched within transaction context
}

Message Data Access

RawMessage provides access to message content and metadata.

class RawMessage {
    /**
     * Creates a message with unique ID and payload.
     */
    RawMessage(byte[] id, byte[] payload);
    
    /** Returns the unique id of this message */
    byte[] getId();
    
    /** Returns the published content of this message */
    byte[] getPayload();
}

Usage Examples:

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
    while (messages.hasNext()) {
        RawMessage message = messages.next();
        
        // Access message content
        byte[] messageId = message.getId();
        byte[] payload = message.getPayload();
        
        // Convert to string if text content
        String content = new String(payload, StandardCharsets.UTF_8);
        
        // Parse message ID for timestamp info
        MessageId parsedId = new MessageId(messageId);
        long publishTime = parsedId.getPublishTimestamp();
        
        System.out.println("Message published at: " + new Date(publishTime));
        System.out.println("Content: " + content);
    }
}

Message ID Operations

MessageId Class

Provides detailed information about message identifiers and timestamps.

class MessageId {
    /** Size of raw ID in bytes */
    static final int RAW_ID_SIZE = 24;
    
    /**
     * Creates instance based on raw id bytes.
     */
    MessageId(byte[] rawId);
    
    /**
     * Computes the message raw ID and stores it in the given byte array.
     */
    static int putRawId(long publishTimestamp, short sequenceId,
                       long writeTimestamp, short payloadSequenceId, 
                       byte[] buffer, int offset);
    
    /** Returns the publish timestamp in milliseconds */
    long getPublishTimestamp();
    
    /** Returns the sequence id generated when the message was written */
    short getSequenceId();
    
    /** Returns the timestamp when message was written to Payload Table */
    long getPayloadWriteTimestamp();
    
    /** Returns the sequence id for Payload Table entry */
    short getPayloadSequenceId();
    
    /** Returns the raw bytes representation of the message id */
    byte[] getRawId();
}

Usage Examples:

// Parse message ID for detailed information
RawMessage message = // obtained from fetch
MessageId messageId = new MessageId(message.getId());

System.out.println("Published: " + new Date(messageId.getPublishTimestamp()));
System.out.println("Sequence: " + messageId.getSequenceId());
System.out.println("Payload timestamp: " + messageId.getPayloadWriteTimestamp());

// Create message ID for starting position
byte[] buffer = new byte[MessageId.RAW_ID_SIZE];
long startTime = System.currentTimeMillis() - 3600000; // 1 hour ago
MessageId.putRawId(startTime, (short) 0, 0L, (short) 0, buffer, 0);

MessageFetcher fetcher = messagingService.prepareFetch(topicId)
    .setStartMessage(buffer, true);

Consumption Patterns

Sequential Processing

Process messages in order, tracking progress with message IDs.

String lastProcessedId = loadLastProcessedMessageId(); // from persistence
byte[] startOffset = lastProcessedId != null ? 
    Bytes.fromHexString(lastProcessedId) : null;

MessageFetcher fetcher = messagingService.prepareFetch(topicId)
    .setLimit(100);

if (startOffset != null) {
    fetcher.setStartMessage(startOffset, false); // exclusive
}

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
    String lastId = null;
    while (messages.hasNext()) {
        RawMessage message = messages.next();
        
        // Process message
        processMessage(message);
        
        // Track progress
        lastId = Bytes.toHexString(message.getId());
    }
    
    // Persist last processed ID
    if (lastId != null) {
        saveLastProcessedMessageId(lastId);
    }
}

Time-Based Processing

Process messages from a specific time window.

long startTime = // calculate start time
long endTime = System.currentTimeMillis();

MessageFetcher fetcher = messagingService.prepareFetch(topicId)
    .setStartTime(startTime)
    .setLimit(Integer.MAX_VALUE);

try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
    while (messages.hasNext()) {
        RawMessage message = messages.next();
        MessageId messageId = new MessageId(message.getId());
        
        // Stop if we've passed the end time
        if (messageId.getPublishTimestamp() > endTime) {
            break;
        }
        
        processMessage(message);
    }
}

Transactional Consumption

Consume messages within a transaction context for consistency.

Transaction tx = transactionManager.startLong();
try {
    MessageFetcher fetcher = messagingService.prepareFetch(topicId)
        .setTransaction(tx)
        .setLimit(50);
    
    try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
        while (messages.hasNext()) {
            RawMessage message = messages.next();
            
            // Process message and update state transactionally
            processMessageTransactionally(message, tx);
        }
    }
    
    transactionManager.commit(tx);
} catch (Exception e) {
    transactionManager.abort(tx);
    throw e;
}

Error Handling

Common consumption error scenarios:

try {
    MessageFetcher fetcher = messagingService.prepareFetch(topicId);
    try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
        while (messages.hasNext()) {
            RawMessage message = messages.next();
            try {
                processMessage(message);
            } catch (Exception e) {
                // Handle individual message processing errors
                logError("Failed to process message", message.getId(), e);
                // Continue with next message or implement retry logic
            }
        }
    }
} catch (TopicNotFoundException e) {
    System.out.println("Topic not found: " + e.getTopicName());
} catch (IOException e) {
    System.out.println("Fetch failed: " + e.getMessage());
    // Implement retry with backoff
} catch (ServiceUnavailableException e) {
    System.out.println("Service unavailable, retrying later");
}

// Validate fetcher parameters
try {
    MessageFetcher fetcher = messagingService.prepareFetch(topicId)
        .setStartTime(-1); // This will throw IllegalArgumentException
} catch (IllegalArgumentException e) {
    System.out.println("Invalid parameter: " + e.getMessage());
}

Performance Considerations

Optimal Batch Sizes

// Use appropriate limits for your use case
MessageFetcher fetcher = messagingService.prepareFetch(topicId)
    .setLimit(1000); // Batch size based on message size and processing time

// For high-throughput scenarios, process in chunks
int batchSize = 500;
byte[] lastMessageId = null;

while (true) {
    MessageFetcher chunkFetcher = messagingService.prepareFetch(topicId)
        .setLimit(batchSize);
    
    if (lastMessageId != null) {
        chunkFetcher.setStartMessage(lastMessageId, false);
    }
    
    int processedCount = 0;
    try (CloseableIterator<RawMessage> messages = chunkFetcher.fetch()) {
        while (messages.hasNext()) {
            RawMessage message = messages.next();
            processMessage(message);
            lastMessageId = message.getId();
            processedCount++;
        }
    }
    
    // Break if we got fewer messages than batch size
    if (processedCount < batchSize) {
        break;
    }
}

Resource Management

Always use try-with-resources for proper cleanup:

// Correct - iterator is properly closed
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
    // Process messages
}

// Incorrect - potential resource leak
CloseableIterator<RawMessage> messages = fetcher.fetch();
// Process messages
// messages.close() might not be called if exception occurs

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-tms

docs

client-services.md

high-level-consumers.md

index.md

message-consumption.md

message-publishing.md

topic-management.md

tile.json