CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-tms

CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.

Pending
Overview
Eval results
Files

high-level-consumers.mddocs/

High-Level Consumers

Abstract subscriber service providing automatic message processing, failure handling, transaction management, and message ID persistence. Simplifies building robust, long-running message consumers with automatic retry and state management.

Capabilities

Abstract Messaging Subscriber Service

Base class for implementing reliable message consumers with built-in failure handling, transaction support, and progress tracking.

abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {
    /**
     * Constructor for messaging subscriber service.
     * @param topicId the topic to consume from
     * @param transactionalFetch true to perform fetching inside transaction
     * @param fetchSize number of messages to fetch in each batch
     * @param txTimeoutSeconds transaction timeout in seconds
     * @param maxTxTimeoutSeconds max transaction timeout in seconds
     * @param emptyFetchDelayMillis milliseconds to sleep after empty fetch
     * @param retryStrategy strategy for retrying on failures
     * @param metricsContext metrics context for monitoring
     */
    protected AbstractMessagingSubscriberService(TopicId topicId, boolean transactionalFetch, 
        int fetchSize, int txTimeoutSeconds, int maxTxTimeoutSeconds, 
        long emptyFetchDelayMillis, RetryStrategy retryStrategy, MetricsContext metricsContext);
    
    /** Returns the TopicId that this service is fetching from */
    protected final TopicId getTopicId();
    
    /** Returns the MessagingContext for interacting with TMS */
    protected abstract MessagingContext getMessagingContext();
    
    /** Returns the Transactional for executing tasks in transaction */
    protected abstract Transactional getTransactional();
    
    /** Loads last persisted message id (called from transaction) */
    protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;
    
    /** Persists message id (called from same transaction as processMessages) */
    protected abstract void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception;
    
    /** Decodes raw Message into object of type T */
    protected abstract T decodeMessage(Message message) throws Exception;
    
    /** Processes decoded messages (called from transaction) */
    protected abstract void processMessages(DatasetContext datasetContext,
        Iterator<ImmutablePair<String, T>> messages) throws Exception;
    
    /** Whether message should run in separate transaction (expensive operations) */
    protected boolean shouldRunInSeparateTx(T message);
    
    /** Post processing after batch completion (outside transaction) */
    protected void postProcess();
}

Usage Examples:

import co.cask.cdap.messaging.subscriber.AbstractMessagingSubscriberService;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.common.utils.ImmutablePair;

public class UserEventProcessor extends AbstractMessagingSubscriberService<UserEvent> {
    
    private final MessagingContext messagingContext;
    private final Transactional transactional;
    private final UserEventDataset eventDataset;
    
    public UserEventProcessor(TopicId topicId, MessagingContext messagingContext,
                             Transactional transactional, UserEventDataset eventDataset) {
        super(topicId, 
              true,  // transactional fetch
              100,   // fetch size
              30,    // tx timeout seconds
              300,   // max tx timeout seconds  
              1000,  // empty fetch delay ms
              RetryStrategies.exponentialDelay(1000, 60000),
              metricsContext);
        
        this.messagingContext = messagingContext;
        this.transactional = transactional;
        this.eventDataset = eventDataset;
    }
    
    @Override
    protected MessagingContext getMessagingContext() {
        return messagingContext;
    }
    
    @Override
    protected Transactional getTransactional() {
        return transactional;
    }
    
    @Override
    protected String loadMessageId(DatasetContext datasetContext) throws Exception {
        return eventDataset.getLastProcessedMessageId();
    }
    
    @Override
    protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {
        eventDataset.setLastProcessedMessageId(messageId);
    }
    
    @Override
    protected UserEvent decodeMessage(Message message) throws Exception {
        return gson.fromJson(message.getPayloadAsString(), UserEvent.class);
    }
    
    @Override
    protected void processMessages(DatasetContext datasetContext,
                                  Iterator<ImmutablePair<String, UserEvent>> messages) throws Exception {
        while (messages.hasNext()) {
            ImmutablePair<String, UserEvent> pair = messages.next();
            String messageId = pair.getFirst();
            UserEvent event = pair.getSecond();
            
            // Process the event
            processUserEvent(event);
            
            // Event is automatically committed with message ID
        }
    }
    
    private void processUserEvent(UserEvent event) {
        // Business logic for processing user events
        System.out.println("Processing event: " + event.getType() + " for user " + event.getUserId());
    }
}

Service Lifecycle Management

The subscriber service extends AbstractRetryableScheduledService for robust lifecycle management:

// Start the subscriber service
UserEventProcessor processor = new UserEventProcessor(topicId, messagingContext, 
    transactional, eventDataset);

// Start service (begins consuming messages)
processor.startAsync();

// Wait for service to be running
processor.awaitRunning();

// Stop service gracefully
processor.stopAsync();
processor.awaitTerminated();

Advanced Processing Patterns

Separate Transaction for Expensive Operations

Handle expensive operations in separate transactions to avoid timeouts:

public class HeavyProcessingSubscriber extends AbstractMessagingSubscriberService<HeavyTask> {
    
    @Override
    protected boolean shouldRunInSeparateTx(HeavyTask task) {
        // Run expensive tasks in separate transactions
        return task.getEstimatedProcessingTimeMs() > 10000;
    }
    
    @Override
    protected void processMessages(DatasetContext datasetContext,
                                  Iterator<ImmutablePair<String, HeavyTask>> messages) throws Exception {
        while (messages.hasNext()) {
            ImmutablePair<String, HeavyTask> pair = messages.next();
            HeavyTask task = pair.getSecond();
            
            if (shouldRunInSeparateTx(task)) {
                // This will be processed in its own transaction
                processHeavyTask(task);
            } else {
                // Process normally in current transaction
                processLightTask(task);
            }
        }
    }
}

Post-Processing Hook

Handle post-processing operations outside of transactions:

public class EventAggregatorSubscriber extends AbstractMessagingSubscriberService<Event> {
    
    private final List<Event> processedEvents = new ArrayList<>();
    
    @Override
    protected void processMessages(DatasetContext datasetContext,
                                  Iterator<ImmutablePair<String, Event>> messages) throws Exception {
        while (messages.hasNext()) {
            Event event = messages.next().getSecond();
            processedEvents.add(event);
            
            // Store event in dataset (transactional)
            eventDataset.addEvent(event);
        }
    }
    
    @Override
    protected void postProcess() {
        try {
            // Send aggregated metrics (non-transactional)
            if (!processedEvents.isEmpty()) {
                sendAggregatedMetrics(processedEvents);
                processedEvents.clear();
            }
        } catch (Exception e) {
            LOG.warn("Failed to send aggregated metrics", e);
        }
    }
}

Error Handling and Retry

Built-in error handling with configurable retry strategies:

import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.common.service.RetryStrategies;

public class RobustEventProcessor extends AbstractMessagingSubscriberService<Event> {
    
    public RobustEventProcessor() {
        super(topicId,
              true,  // transactional fetch
              50,    // smaller batch size for reliability
              60,    // 1 minute tx timeout
              600,   // 10 minute max tx timeout
              5000,  // 5 second delay on empty fetch
              RetryStrategies.exponentialDelay(1000, 300000), // 1s to 5min backoff
              metricsContext);
    }
    
    @Override
    protected Event decodeMessage(Message message) throws Exception {
        try {
            return gson.fromJson(message.getPayloadAsString(), Event.class);
        } catch (JsonSyntaxException e) {
            // Log and skip malformed messages
            LOG.warn("Skipping malformed message: {}", message.getId(), e);
            throw e; // Will cause message to be skipped
        }
    }
    
    @Override
    protected void processMessages(DatasetContext datasetContext,
                                  Iterator<ImmutablePair<String, Event>> messages) throws Exception {
        int processed = 0;
        while (messages.hasNext()) {
            try {
                Event event = messages.next().getSecond();
                processEvent(event);
                processed++;
            } catch (Exception e) {
                LOG.error("Failed to process event, aborting batch", e);
                throw e; // Will trigger retry of entire batch
            }
        }
        
        LOG.info("Successfully processed {} events", processed);
    }
}

Metrics and Monitoring

Built-in metrics collection for monitoring subscriber health:

import co.cask.cdap.api.metrics.MetricsContext;

public class MonitoredSubscriber extends AbstractMessagingSubscriberService<Event> {
    
    public MonitoredSubscriber(MetricsContext metricsContext) {
        super(topicId, true, 100, 30, 300, 1000,
              RetryStrategies.exponentialDelay(1000, 60000),
              metricsContext); // Metrics context provided to parent
    }
    
    @Override
    protected void processMessages(DatasetContext datasetContext,
                                  Iterator<ImmutablePair<String, Event>> messages) throws Exception {
        int processedCount = 0;
        long startTime = System.currentTimeMillis();
        
        while (messages.hasNext()) {
            Event event = messages.next().getSecond();
            processEvent(event);
            processedCount++;
        }
        
        // Custom metrics (parent also emits built-in metrics)
        long processingTime = System.currentTimeMillis() - startTime;
        MetricsContext metrics = getMetricsContext();
        metrics.increment("events.processed", processedCount);
        metrics.gauge("processing.time.ms", processingTime);
    }
}

Integration Patterns

Dataset Integration

Integrate with CDAP datasets for state persistence:

public class DatasetIntegratedSubscriber extends AbstractMessagingSubscriberService<Event> {
    
    private final String datasetName;
    
    @Override
    protected String loadMessageId(DatasetContext datasetContext) throws Exception {
        KeyValueTable stateTable = datasetContext.getDataset(datasetName);
        byte[] messageIdBytes = stateTable.read("last.message.id");
        return messageIdBytes != null ? Bytes.toString(messageIdBytes) : null;
    }
    
    @Override
    protected void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception {
        KeyValueTable stateTable = datasetContext.getDataset(datasetName);
        stateTable.write("last.message.id", Bytes.toBytes(messageId));
    }
    
    @Override
    protected void processMessages(DatasetContext datasetContext,
                                  Iterator<ImmutablePair<String, Event>> messages) throws Exception {
        Table eventTable = datasetContext.getDataset("events");
        
        while (messages.hasNext()) {
            Event event = messages.next().getSecond();
            
            // Store event in dataset
            Put put = new Put(Bytes.toBytes(event.getId()));
            put.add("data", "payload", Bytes.toBytes(gson.toJson(event)));
            put.add("data", "timestamp", Bytes.toBytes(System.currentTimeMillis()));
            eventTable.put(put);
        }
    }
}

Service Discovery Integration

Use with CDAP service discovery and dependency injection:

public class ServiceDiscoverySubscriber extends AbstractMessagingSubscriberService<Event> {
    
    private final MessagingContext messagingContext;
    private final Transactional transactional;
    
    @Inject
    public ServiceDiscoverySubscriber(@Named("event.topic") TopicId topicId,
                                     MessagingContext messagingContext,
                                     Transactional transactional,
                                     MetricsContext metricsContext) {
        super(topicId, true, 100, 30, 300, 1000,
              RetryStrategies.exponentialDelay(1000, 60000),
              metricsContext);
        
        this.messagingContext = messagingContext;
        this.transactional = transactional;
    }
    
    // Implementation methods...
}

Multi-Topic Processing

Handle multiple topics with separate subscriber instances:

public class MultiTopicProcessor {
    
    private final List<AbstractMessagingSubscriberService<? extends Event>> subscribers;
    
    public MultiTopicProcessor() {
        this.subscribers = Arrays.asList(
            new UserEventProcessor(userEventTopic, messagingContext, transactional, datasets),
            new SystemEventProcessor(systemEventTopic, messagingContext, transactional, datasets),
            new AuditEventProcessor(auditEventTopic, messagingContext, transactional, datasets)
        );
    }
    
    public void start() {
        for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
            subscriber.startAsync();
        }
        
        // Wait for all to be running
        for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
            subscriber.awaitRunning();
        }
    }
    
    public void stop() {
        for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
            subscriber.stopAsync();
        }
        
        // Wait for all to stop
        for (AbstractMessagingSubscriberService<?> subscriber : subscribers) {
            subscriber.awaitTerminated();
        }
    }
}

Performance Tuning

Batch Size Optimization

// For high-throughput topics
public class HighThroughputSubscriber extends AbstractMessagingSubscriberService<Event> {
    public HighThroughputSubscriber() {
        super(topicId, 
              true,    // transactional
              1000,    // large batch size
              120,     // longer timeout
              600,     // max timeout
              100,     // short empty delay
              retryStrategy, metricsContext);
    }
}

// For low-latency processing
public class LowLatencySubscriber extends AbstractMessagingSubscriberService<Event> {
    public LowLatencySubscriber() {
        super(topicId,
              false,   // non-transactional for speed
              10,      // small batch size
              10,      // short timeout
              30,      // short max timeout
              100,     // quick retry on empty
              retryStrategy, metricsContext);
    }
}

Transaction Timeout Handling

The service automatically increases transaction timeouts on failures:

// Initial timeout: 30 seconds
// On TransactionNotInProgressException:
// - Retry with 60 seconds
// - Then 120 seconds  
// - Up to maxTxTimeoutSeconds (300)
// Then fails if still timing out

This automatic timeout scaling helps handle variable processing loads without manual intervention.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-tms

docs

client-services.md

high-level-consumers.md

index.md

message-consumption.md

message-publishing.md

topic-management.md

tile.json