CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-tms

CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.

Pending
Overview
Eval results
Files

message-publishing.mddocs/

Message Publishing

Transactional message publishing capabilities supporting both immediate publishing and distributed transaction workflows. Provides ACID guarantees for message publishing with rollback support.

Capabilities

Publish Messages

Publishes messages to a topic, optionally within a transaction context. Returns rollback information for transactional publishes.

/**
 * Publishes a list of messages to the messaging system.
 * @param request the StoreRequest containing messages to be published
 * @return if the store request is transactional, returns a RollbackDetail containing
 *         information for rollback; otherwise null will be returned
 * @throws TopicNotFoundException if the topic doesn't exist
 * @throws IOException if failed to publish messages
 * @throws ServiceUnavailableException if the messaging service is not available
 */
RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException;

Usage Examples:

import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.RollbackDetail;

// Non-transactional publish
StoreRequest request = StoreRequestBuilder.of(topicId)
    .addPayload("Event: user login")
    .addPayload("Event: page view")
    .addPayload("Event: button click")
    .build();

RollbackDetail rollback = messagingService.publish(request);
// rollback will be null for non-transactional publishes

// Transactional publish
Transaction tx = // obtain transaction
StoreRequest txRequest = StoreRequestBuilder.of(topicId)
    .addPayload("Critical event")
    .setTransaction(tx.getWritePointer())
    .build();

RollbackDetail rollbackDetail = messagingService.publish(txRequest);
// Store rollback detail for potential rollback

Store Payload

Stores message payloads for long/distributed transactional publishing scenarios. Used in multi-phase commit patterns.

/**
 * Stores a list of messages to the messaging system. For long/distributed transactional publishing use case.
 * @param request the StoreRequest containing messages to be stored
 * @throws TopicNotFoundException if the topic doesn't exist
 * @throws IOException if failed to store messages
 * @throws ServiceUnavailableException if the messaging service is not available
 */
void storePayload(StoreRequest request) throws TopicNotFoundException, IOException;

Usage Examples:

// Store payloads for later commit in distributed transaction
StoreRequest storeRequest = StoreRequestBuilder.of(topicId)
    .addPayload("Batch operation 1")
    .addPayload("Batch operation 2")
    .setTransaction(transactionWritePointer)
    .build();

messagingService.storePayload(storeRequest);
// Payloads are stored but not yet visible to consumers
// Will be committed or rolled back later

Rollback Messages

Rolls back transactionally published messages using rollback detail information.

/**
 * Rollbacks messages published to the given topic with the given transaction.
 * @param topicId the topic where the messages were published
 * @param rollbackDetail the RollbackDetail as returned by the publish call,
 *                      which contains information needed for the rollback
 * @throws TopicNotFoundException if the topic doesn't exist
 * @throws IOException if failed to rollback changes
 * @throws ServiceUnavailableException if the messaging service is not available
 */
void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException;

Usage Examples:

try {
    // Perform transactional operations
    RollbackDetail rollback = messagingService.publish(txRequest);
    
    // ... other operations that might fail
    
    // If we reach here, commit the transaction
    // (transaction commit handled by transaction manager)
    
} catch (Exception e) {
    // Something failed, rollback the messages
    if (rollback != null) {
        messagingService.rollback(topicId, rollback);
    }
    throw e;
}

StoreRequest Building

StoreRequestBuilder

Builder pattern for creating StoreRequest instances with fluent API.

class StoreRequestBuilder {
    /** Creates a new StoreRequestBuilder instance */
    static StoreRequestBuilder of(TopicId topicId);
    
    /** Adds a single payload string to the request (UTF-8 encoded) */
    StoreRequestBuilder addPayload(String payload);
    
    /** Adds a single byte array to the payload */
    StoreRequestBuilder addPayload(byte[] payload);
    
    /** Adds multiple payloads from iterator */
    StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);
    
    /** Adds multiple payloads from iterable */
    StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);
    
    /** Sets the transaction write pointer for transactional publish */
    StoreRequestBuilder setTransaction(Long txWritePointer);
    
    /** Returns true if there is some payload in this builder */
    boolean hasPayload();
    
    /** Creates a StoreRequest based on the builder settings */
    StoreRequest build();
}

Usage Examples:

// Build request with multiple payload types
List<byte[]> binaryData = Arrays.asList(
    "data1".getBytes(),
    "data2".getBytes()
);

StoreRequest request = StoreRequestBuilder.of(topicId)
    .addPayload("String message")
    .addPayload("Another string".getBytes())
    .addPayloads(binaryData)
    .setTransaction(txWritePointer)
    .build();

// Check if builder has payloads before building
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
if (!builder.hasPayload()) {
    builder.addPayload("Default message");
}
StoreRequest request = builder.build();

StoreRequest Properties

Abstract class representing messages to store with transaction context.

abstract class StoreRequest implements Iterable<byte[]> {
    /** Returns the topic ID for this request */
    TopicId getTopicId();
    
    /** Returns true if the message should be published transactionally */
    boolean isTransactional();
    
    /** Returns the transaction write pointer if transactional */
    long getTransactionWritePointer();
    
    /** Returns true if there is payload in this request */
    abstract boolean hasPayload();
    
    /** Iterates over message payloads as byte arrays */
    Iterator<byte[]> iterator();
}

RollbackDetail

Information needed to rollback transactionally published messages.

interface RollbackDetail {
    /** Returns the transaction write pointer used when the message was published */
    long getTransactionWritePointer();
    
    /** Returns the timestamp for the first payload published with the transaction */
    long getStartTimestamp();
    
    /** Returns the sequence id for the first payload published with the transaction */
    int getStartSequenceId();
    
    /** Returns the timestamp for the last payload published with the transaction */
    long getEndTimestamp();
    
    /** Returns the sequence id for the last payload published with the transaction */
    int getEndSequenceId();
}

Usage Examples:

RollbackDetail rollback = messagingService.publish(txRequest);
if (rollback != null) {
    System.out.println("Transaction write pointer: " + rollback.getTransactionWritePointer());
    System.out.println("Time range: " + rollback.getStartTimestamp() + " to " + rollback.getEndTimestamp());
    System.out.println("Sequence range: " + rollback.getStartSequenceId() + " to " + rollback.getEndSequenceId());
}

Transaction Integration

CDAP TMS integrates with Apache Tephra for transaction support:

import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;

// In a transactional context
Transaction tx = transactionManager.startLong();
try {
    StoreRequest request = StoreRequestBuilder.of(topicId)
        .addPayload("Transactional message")
        .setTransaction(tx.getWritePointer())
        .build();
    
    RollbackDetail rollback = messagingService.publish(request);
    
    // Other transactional operations...
    
    transactionManager.commit(tx);
} catch (Exception e) {
    transactionManager.abort(tx);
    if (rollback != null) {
        messagingService.rollback(topicId, rollback);
    }
    throw e;
}

Error Handling

Common publishing error scenarios:

try {
    RollbackDetail rollback = messagingService.publish(request);
} catch (TopicNotFoundException e) {
    System.out.println("Topic does not exist: " + e.getTopicName());
    // Consider creating topic or using different topic
} catch (IOException e) {
    System.out.println("Publishing failed: " + e.getMessage());
    // Retry logic or error reporting
} catch (ServiceUnavailableException e) {
    System.out.println("Messaging service unavailable");
    // Wait and retry or use circuit breaker pattern
}

// Validate request before publishing
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
if (!builder.hasPayload()) {
    throw new IllegalArgumentException("Cannot publish empty request");
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-tms

docs

client-services.md

high-level-consumers.md

index.md

message-consumption.md

message-publishing.md

topic-management.md

tile.json