CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.
—
Abstract subscriber service providing automatic message processing, failure handling, transaction management, and message ID persistence. Simplifies building robust, long-running message consumers with automatic retry and state management.
Base class for implementing reliable message consumers with built-in failure handling, transaction support, and progress tracking.
abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {
/**
* Constructor for messaging subscriber service.
* @param topicId the topic to consume from
* @param transactionalFetch true to perform fetching inside transaction
* @param fetchSize number of messages to fetch in each batch
* @param txTimeoutSeconds transaction timeout in seconds
* @param maxTxTimeoutSeconds max transaction timeout in seconds
* @param emptyFetchDelayMillis milliseconds to sleep after empty fetch
* @param retryStrategy strategy for retrying on failures
* @param metricsContext metrics context for monitoring
*/
protected AbstractMessagingSubscriberService(TopicId topicId, boolean transactionalFetch,
int fetchSize, int txTimeoutSeconds, int maxTxTimeoutSeconds,
long emptyFetchDelayMillis, RetryStrategy retryStrategy, MetricsContext metricsContext);
/** Returns the TopicId that this service is fetching from */
protected final TopicId getTopicId();
/** Returns the MessagingContext for interacting with TMS */
protected abstract MessagingContext getMessagingContext();
/** Returns the Transactional for executing tasks in transaction */
protected abstract Transactional getTransactional();
/** Loads last persisted message id (called from transaction) */
protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;
/** Persists message id (called from same transaction as processMessages) */
protected abstract void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception;
/** Decodes raw Message into object of type T */
protected abstract T decodeMessage(Message message) throws Exception;
/** Processes decoded messages (called from transaction) */
protected abstract void processMessages(DatasetContext datasetContext,
Iterator<ImmutablePair<String, T>> messages) throws Exception;
/** Whether message should run in separate transaction (expensive operations) */
protected boolean shouldRunInSeparateTx(T message);
/** Post processing after batch completion (outside transaction) */
protected void postProcess();
}Usage Examples:
import co.cask.cdap.messaging.subscriber.AbstractMessagingSubscriberService;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.common.utils.ImmutablePair;
public class UserEventProcessor extends AbstractMessagingSubscriberService<UserEvent> {
private final MessagingContext messagingContext;
private final Transactional transactional;
private final UserEventDataset eventDataset;
public UserEventProcessor(TopicId topicId, MessagingContext messagingContext,
Transactional transactional, UserEventDataset eventDataset) {
super(topicId,
true, // transactional fetch
100, // fetch size
30, // tx timeout seconds
300, // max tx timeout seconds
1000, // empty fetch delay ms
RetryStrategies.exponentialDelay(1000, 60000),
metricsContext);
this.messagingContext = messagingContext;
this.transactional = transactional;
this.eventDataset = eventDataset;
}
@Override
protected MessagingContext getMessagingContext() {
return messagingContext;
}
@Override
protected Transactional getTransactional() {
return transactional;
}
@Override
protected String loadMessageId(DatasetContext datasetContext) throws Exception {
return eventDataset.getLastProcessedMessageId();
}
@Override
protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {
eventDataset.setLastProcessedMessageId(messageId);
}
@Override
protected UserEvent decodeMessage(Message message) throws Exception {
return gson.fromJson(message.getPayloadAsString(), UserEvent.class);
}
@Override
protected void processMessages(DatasetContext datasetContext,
Iterator<ImmutablePair<String, UserEvent>> messages) throws Exception {
while (messages.hasNext()) {
ImmutablePair<String, UserEvent> pair = messages.next();
String messageId = pair.getFirst();
UserEvent event = pair.getSecond();
// Process the event
processUserEvent(event);
// Event is automatically committed with message ID
}
}
private void processUserEvent(UserEvent event) {
// Business logic for processing user events
System.out.println("Processing event: " + event.getType() + " for user " + event.getUserId());
}
}The subscriber service extends AbstractRetryableScheduledService for robust lifecycle management:
// Start the subscriber service
UserEventProcessor processor = new UserEventProcessor(topicId, messagingContext,
transactional, eventDataset);
// Start service (begins consuming messages)
processor.startAsync();
// Wait for service to be running
processor.awaitRunning();
// Stop service gracefully
processor.stopAsync();
processor.awaitTerminated();Handle expensive operations in separate transactions to avoid timeouts:
public class HeavyProcessingSubscriber extends AbstractMessagingSubscriberService<HeavyTask> {
@Override
protected boolean shouldRunInSeparateTx(HeavyTask task) {
// Run expensive tasks in separate transactions
return task.getEstimatedProcessingTimeMs() > 10000;
}
@Override
protected void processMessages(DatasetContext datasetContext,
Iterator<ImmutablePair<String, HeavyTask>> messages) throws Exception {
while (messages.hasNext()) {
ImmutablePair<String, HeavyTask> pair = messages.next();
HeavyTask task = pair.getSecond();
if (shouldRunInSeparateTx(task)) {
// This will be processed in its own transaction
processHeavyTask(task);
} else {
// Process normally in current transaction
processLightTask(task);
}
}
}
}Handle post-processing operations outside of transactions:
public class EventAggregatorSubscriber extends AbstractMessagingSubscriberService<Event> {
private final List<Event> processedEvents = new ArrayList<>();
@Override
protected void processMessages(DatasetContext datasetContext,
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
while (messages.hasNext()) {
Event event = messages.next().getSecond();
processedEvents.add(event);
// Store event in dataset (transactional)
eventDataset.addEvent(event);
}
}
@Override
protected void postProcess() {
try {
// Send aggregated metrics (non-transactional)
if (!processedEvents.isEmpty()) {
sendAggregatedMetrics(processedEvents);
processedEvents.clear();
}
} catch (Exception e) {
LOG.warn("Failed to send aggregated metrics", e);
}
}
}Built-in error handling with configurable retry strategies:
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.common.service.RetryStrategies;
public class RobustEventProcessor extends AbstractMessagingSubscriberService<Event> {
public RobustEventProcessor() {
super(topicId,
true, // transactional fetch
50, // smaller batch size for reliability
60, // 1 minute tx timeout
600, // 10 minute max tx timeout
5000, // 5 second delay on empty fetch
RetryStrategies.exponentialDelay(1000, 300000), // 1s to 5min backoff
metricsContext);
}
@Override
protected Event decodeMessage(Message message) throws Exception {
try {
return gson.fromJson(message.getPayloadAsString(), Event.class);
} catch (JsonSyntaxException e) {
// Log and skip malformed messages
LOG.warn("Skipping malformed message: {}", message.getId(), e);
throw e; // Will cause message to be skipped
}
}
@Override
protected void processMessages(DatasetContext datasetContext,
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
int processed = 0;
while (messages.hasNext()) {
try {
Event event = messages.next().getSecond();
processEvent(event);
processed++;
} catch (Exception e) {
LOG.error("Failed to process event, aborting batch", e);
throw e; // Will trigger retry of entire batch
}
}
LOG.info("Successfully processed {} events", processed);
}
}Built-in metrics collection for monitoring subscriber health:
import co.cask.cdap.api.metrics.MetricsContext;
public class MonitoredSubscriber extends AbstractMessagingSubscriberService<Event> {
public MonitoredSubscriber(MetricsContext metricsContext) {
super(topicId, true, 100, 30, 300, 1000,
RetryStrategies.exponentialDelay(1000, 60000),
metricsContext); // Metrics context provided to parent
}
@Override
protected void processMessages(DatasetContext datasetContext,
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
int processedCount = 0;
long startTime = System.currentTimeMillis();
while (messages.hasNext()) {
Event event = messages.next().getSecond();
processEvent(event);
processedCount++;
}
// Custom metrics (parent also emits built-in metrics)
long processingTime = System.currentTimeMillis() - startTime;
MetricsContext metrics = getMetricsContext();
metrics.increment("events.processed", processedCount);
metrics.gauge("processing.time.ms", processingTime);
}
}Integrate with CDAP datasets for state persistence:
public class DatasetIntegratedSubscriber extends AbstractMessagingSubscriberService<Event> {
private final String datasetName;
@Override
protected String loadMessageId(DatasetContext datasetContext) throws Exception {
KeyValueTable stateTable = datasetContext.getDataset(datasetName);
byte[] messageIdBytes = stateTable.read("last.message.id");
return messageIdBytes != null ? Bytes.toString(messageIdBytes) : null;
}
@Override
protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {
KeyValueTable stateTable = datasetContext.getDataset(datasetName);
stateTable.write("last.message.id", Bytes.toBytes(messageId));
}
@Override
protected void processMessages(DatasetContext datasetContext,
Iterator<ImmutablePair<String, Event>> messages) throws Exception {
Table eventTable = datasetContext.getDataset("events");
while (messages.hasNext()) {
Event event = messages.next().getSecond();
// Store event in dataset
Put put = new Put(Bytes.toBytes(event.getId()));
put.add("data", "payload", Bytes.toBytes(gson.toJson(event)));
put.add("data", "timestamp", Bytes.toBytes(System.currentTimeMillis()));
eventTable.put(put);
}
}
}Use with CDAP service discovery and dependency injection:
public class ServiceDiscoverySubscriber extends AbstractMessagingSubscriberService<Event> {
private final MessagingContext messagingContext;
private final Transactional transactional;
@Inject
public ServiceDiscoverySubscriber(@Named("event.topic") TopicId topicId,
MessagingContext messagingContext,
Transactional transactional,
MetricsContext metricsContext) {
super(topicId, true, 100, 30, 300, 1000,
RetryStrategies.exponentialDelay(1000, 60000),
metricsContext);
this.messagingContext = messagingContext;
this.transactional = transactional;
}
// Implementation methods...
}Handle multiple topics with separate subscriber instances:
public class MultiTopicProcessor {
private final List<AbstractMessagingSubscriberService<? extends Event>> subscribers;
public MultiTopicProcessor() {
this.subscribers = Arrays.asList(
new UserEventProcessor(userEventTopic, messagingContext, transactional, datasets),
new SystemEventProcessor(systemEventTopic, messagingContext, transactional, datasets),
new AuditEventProcessor(auditEventTopic, messagingContext, transactional, datasets)
);
}
public void start() {
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
subscriber.startAsync();
}
// Wait for all to be running
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
subscriber.awaitRunning();
}
}
public void stop() {
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
subscriber.stopAsync();
}
// Wait for all to stop
for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
subscriber.awaitTerminated();
}
}
}// For high-throughput topics
public class HighThroughputSubscriber extends AbstractMessagingSubscriberService<Event> {
public HighThroughputSubscriber() {
super(topicId,
true, // transactional
1000, // large batch size
120, // longer timeout
600, // max timeout
100, // short empty delay
retryStrategy, metricsContext);
}
}
// For low-latency processing
public class LowLatencySubscriber extends AbstractMessagingSubscriberService<Event> {
public LowLatencySubscriber() {
super(topicId,
false, // non-transactional for speed
10, // small batch size
10, // short timeout
30, // short max timeout
100, // quick retry on empty
retryStrategy, metricsContext);
}
}The service automatically increases transaction timeouts on failures:
// Initial timeout: 30 seconds
// On TransactionNotInProgressException:
// - Retry with 60 seconds
// - Then 120 seconds
// - Up to maxTxTimeoutSeconds (300)
// Then fails if still timing outThis automatic timeout scaling helps handle variable processing loads without manual intervention.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-tms