CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.
—
Transactional message publishing capabilities supporting both immediate publishing and distributed transaction workflows. Provides ACID guarantees for message publishing with rollback support.
Publishes messages to a topic, optionally within a transaction context. Returns rollback information for transactional publishes.
/**
* Publishes a list of messages to the messaging system.
* @param request the StoreRequest containing messages to be published
* @return if the store request is transactional, returns a RollbackDetail containing
* information for rollback; otherwise null will be returned
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to publish messages
* @throws ServiceUnavailableException if the messaging service is not available
*/
RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException;Usage Examples:
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.RollbackDetail;
// Non-transactional publish
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload("Event: user login")
.addPayload("Event: page view")
.addPayload("Event: button click")
.build();
RollbackDetail rollback = messagingService.publish(request);
// rollback will be null for non-transactional publishes
// Transactional publish
Transaction tx = // obtain transaction
StoreRequest txRequest = StoreRequestBuilder.of(topicId)
.addPayload("Critical event")
.setTransaction(tx.getWritePointer())
.build();
RollbackDetail rollbackDetail = messagingService.publish(txRequest);
// Store rollback detail for potential rollbackStores message payloads for long/distributed transactional publishing scenarios. Used in multi-phase commit patterns.
/**
* Stores a list of messages to the messaging system. For long/distributed transactional publishing use case.
* @param request the StoreRequest containing messages to be stored
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to store messages
* @throws ServiceUnavailableException if the messaging service is not available
*/
void storePayload(StoreRequest request) throws TopicNotFoundException, IOException;Usage Examples:
// Store payloads for later commit in distributed transaction
StoreRequest storeRequest = StoreRequestBuilder.of(topicId)
.addPayload("Batch operation 1")
.addPayload("Batch operation 2")
.setTransaction(transactionWritePointer)
.build();
messagingService.storePayload(storeRequest);
// Payloads are stored but not yet visible to consumers
// Will be committed or rolled back laterRolls back transactionally published messages using rollback detail information.
/**
* Rollbacks messages published to the given topic with the given transaction.
* @param topicId the topic where the messages were published
* @param rollbackDetail the RollbackDetail as returned by the publish call,
* which contains information needed for the rollback
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to rollback changes
* @throws ServiceUnavailableException if the messaging service is not available
*/
void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException;Usage Examples:
try {
// Perform transactional operations
RollbackDetail rollback = messagingService.publish(txRequest);
// ... other operations that might fail
// If we reach here, commit the transaction
// (transaction commit handled by transaction manager)
} catch (Exception e) {
// Something failed, rollback the messages
if (rollback != null) {
messagingService.rollback(topicId, rollback);
}
throw e;
}Builder pattern for creating StoreRequest instances with fluent API.
class StoreRequestBuilder {
/** Creates a new StoreRequestBuilder instance */
static StoreRequestBuilder of(TopicId topicId);
/** Adds a single payload string to the request (UTF-8 encoded) */
StoreRequestBuilder addPayload(String payload);
/** Adds a single byte array to the payload */
StoreRequestBuilder addPayload(byte[] payload);
/** Adds multiple payloads from iterator */
StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);
/** Adds multiple payloads from iterable */
StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);
/** Sets the transaction write pointer for transactional publish */
StoreRequestBuilder setTransaction(Long txWritePointer);
/** Returns true if there is some payload in this builder */
boolean hasPayload();
/** Creates a StoreRequest based on the builder settings */
StoreRequest build();
}Usage Examples:
// Build request with multiple payload types
List<byte[]> binaryData = Arrays.asList(
"data1".getBytes(),
"data2".getBytes()
);
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload("String message")
.addPayload("Another string".getBytes())
.addPayloads(binaryData)
.setTransaction(txWritePointer)
.build();
// Check if builder has payloads before building
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
if (!builder.hasPayload()) {
builder.addPayload("Default message");
}
StoreRequest request = builder.build();Abstract class representing messages to store with transaction context.
abstract class StoreRequest implements Iterable<byte[]> {
/** Returns the topic ID for this request */
TopicId getTopicId();
/** Returns true if the message should be published transactionally */
boolean isTransactional();
/** Returns the transaction write pointer if transactional */
long getTransactionWritePointer();
/** Returns true if there is payload in this request */
abstract boolean hasPayload();
/** Iterates over message payloads as byte arrays */
Iterator<byte[]> iterator();
}Information needed to rollback transactionally published messages.
interface RollbackDetail {
/** Returns the transaction write pointer used when the message was published */
long getTransactionWritePointer();
/** Returns the timestamp for the first payload published with the transaction */
long getStartTimestamp();
/** Returns the sequence id for the first payload published with the transaction */
int getStartSequenceId();
/** Returns the timestamp for the last payload published with the transaction */
long getEndTimestamp();
/** Returns the sequence id for the last payload published with the transaction */
int getEndSequenceId();
}Usage Examples:
RollbackDetail rollback = messagingService.publish(txRequest);
if (rollback != null) {
System.out.println("Transaction write pointer: " + rollback.getTransactionWritePointer());
System.out.println("Time range: " + rollback.getStartTimestamp() + " to " + rollback.getEndTimestamp());
System.out.println("Sequence range: " + rollback.getStartSequenceId() + " to " + rollback.getEndSequenceId());
}CDAP TMS integrates with Apache Tephra for transaction support:
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
// In a transactional context
Transaction tx = transactionManager.startLong();
try {
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload("Transactional message")
.setTransaction(tx.getWritePointer())
.build();
RollbackDetail rollback = messagingService.publish(request);
// Other transactional operations...
transactionManager.commit(tx);
} catch (Exception e) {
transactionManager.abort(tx);
if (rollback != null) {
messagingService.rollback(topicId, rollback);
}
throw e;
}Common publishing error scenarios:
try {
RollbackDetail rollback = messagingService.publish(request);
} catch (TopicNotFoundException e) {
System.out.println("Topic does not exist: " + e.getTopicName());
// Consider creating topic or using different topic
} catch (IOException e) {
System.out.println("Publishing failed: " + e.getMessage());
// Retry logic or error reporting
} catch (ServiceUnavailableException e) {
System.out.println("Messaging service unavailable");
// Wait and retry or use circuit breaker pattern
}
// Validate request before publishing
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
if (!builder.hasPayload()) {
throw new IllegalArgumentException("Cannot publish empty request");
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-tms