or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-services.mdhigh-level-consumers.mdindex.mdmessage-consumption.mdmessage-publishing.mdtopic-management.md
tile.json

tessl/maven-co-cask-cdap--cdap-tms

CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/co.cask.cdap/cdap-tms@5.1.x

To install, run

npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-tms@5.1.0

index.mddocs/

CDAP Transactional Messaging System (TMS)

CDAP TMS is a Java library providing transactional messaging capabilities built on the Apache Hadoop ecosystem. It implements reliable, ordered message delivery with ACID transaction support, enabling applications to publish and consume messages with strong consistency guarantees in distributed data processing environments.

Package Information

  • Package Name: cdap-tms
  • Package Type: Maven
  • Language: Java
  • Installation: Add to your Maven pom.xml:
<dependency>
  <groupId>co.cask.cdap</groupId>
  <artifactId>cdap-tms</artifactId>
  <version>5.1.2</version>
</dependency>

Core Imports

import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.id.TopicId;
import co.cask.cdap.proto.id.NamespaceId;

For client usage:

import co.cask.cdap.messaging.client.ClientMessagingService;
import co.cask.cdap.messaging.guice.MessagingClientModule;

For exception handling:

import co.cask.cdap.api.messaging.TopicAlreadyExistsException;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.ServiceUnavailableException;

Basic Usage

import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.proto.id.TopicId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.api.dataset.lib.CloseableIterator;

// Create a topic
NamespaceId namespace = new NamespaceId("my-namespace");
TopicId topicId = namespace.topic("my-topic");
TopicMetadata metadata = new TopicMetadata(topicId, 
    TopicMetadata.TTL_KEY, "3600", // 1 hour TTL
    TopicMetadata.GENERATION_KEY, "1"
);

MessagingService messagingService = // obtain via dependency injection
messagingService.createTopic(metadata);

// Publish messages
StoreRequest request = StoreRequestBuilder.of(topicId)
    .addPayload("Hello, World!")
    .addPayload("Another message")
    .build();

messagingService.publish(request);

// Consume messages
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
try (CloseableIterator<RawMessage> messages = fetcher.setLimit(10).fetch()) {
    while (messages.hasNext()) {
        RawMessage message = messages.next();
        String payload = new String(message.getPayload());
        System.out.println("Received: " + payload);
    }
}

Architecture

CDAP TMS is built around several key components:

  • MessagingService: Primary interface for all messaging operations including topic management and message publishing/consuming
  • Storage Layer: Abstracted storage with support for HBase and LevelDB backends via the TableFactory pattern
  • Client Layer: HTTP-based client implementation for distributed deployments with service discovery
  • Transaction Support: Integration with Apache Tephra for ACID transaction guarantees
  • Caching System: In-memory message caching with weight-based eviction for improved performance
  • Subscriber Framework: High-level consumer abstraction with failure handling and message ID persistence

Capabilities

Topic Management

Core topic lifecycle operations including creation, updates, deletion, and metadata retrieval. Essential for managing messaging channels in distributed systems.

void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException;
void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException;
void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException;
TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException;
List<TopicId> listTopics(NamespaceId namespaceId) throws IOException;

Topic Management

Message Publishing

Transactional message publishing with support for both immediate publishing and distributed transaction workflows via payload storage.

RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException;
void storePayload(StoreRequest request) throws TopicNotFoundException, IOException;
void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException;

Message Publishing

Message Consumption

Flexible message fetching with support for various starting positions, transaction isolation, and configurable limits for building robust consumers.

MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException;

interface MessageFetcher {
    MessageFetcher setStartMessage(byte[] startOffset, boolean inclusive);
    MessageFetcher setStartTime(long startTime);
    MessageFetcher setTransaction(Transaction transaction);
    MessageFetcher setLimit(int limit);
    CloseableIterator<RawMessage> fetch() throws TopicNotFoundException, IOException;
}

Message Consumption

Client Services

HTTP-based messaging client for distributed environments with service discovery and comprehensive error handling.

class ClientMessagingService implements MessagingService {
    ClientMessagingService(DiscoveryServiceClient discoveryServiceClient);
}

class StoreRequestBuilder {
    static StoreRequestBuilder of(TopicId topicId);
    StoreRequestBuilder addPayload(String payload);
    StoreRequestBuilder addPayload(byte[] payload);
    StoreRequestBuilder setTransaction(Long txWritePointer);
    StoreRequest build();
}

Client Services

High-Level Consumers

Abstract subscriber service providing automatic message processing, failure handling, transaction management, and message ID persistence for building robust consumers.

abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {
    protected AbstractMessagingSubscriberService(TopicId topicId, boolean transactionalFetch, 
        int fetchSize, int txTimeoutSeconds, int maxTxTimeoutSeconds, 
        long emptyFetchDelayMillis, RetryStrategy retryStrategy, MetricsContext metricsContext);
    
    protected abstract MessagingContext getMessagingContext();
    protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;
    protected abstract void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception;
    protected abstract T decodeMessage(Message message) throws Exception;
    protected abstract void processMessages(DatasetContext datasetContext, Iterator<ImmutablePair<String, T>> messages) throws Exception;
}

High-Level Consumers

Types

Core Data Types

class TopicMetadata {
    TopicMetadata(TopicId topicId, Map<String, String> properties);
    TopicMetadata(TopicId topicId, Object... properties);
    
    TopicId getTopicId();
    Map<String, String> getProperties();
    int getGeneration();
    boolean exists();
    long getTTL();
    
    static final String GENERATION_KEY = "generation";
    static final String TTL_KEY = "ttl";
}

class RawMessage {
    RawMessage(byte[] id, byte[] payload);
    byte[] getId();
    byte[] getPayload();
}

class MessageId {
    MessageId(byte[] rawId);
    long getPublishTimestamp();
    short getSequenceId();
    long getPayloadWriteTimestamp();
    short getPayloadSequenceId();
    byte[] getRawId();
    
    static int putRawId(long publishTimestamp, short sequenceId, 
        long writeTimestamp, short payloadSequenceId, byte[] buffer, int offset);
    
    static final int RAW_ID_SIZE = 24;
}

abstract class StoreRequest implements Iterable<byte[]> {
    TopicId getTopicId();
    boolean isTransactional();
    long getTransactionWritePointer();
    abstract boolean hasPayload();
}

interface RollbackDetail {
    long getTransactionWritePointer();
    long getStartTimestamp();
    int getStartSequenceId();
    long getEndTimestamp();
    int getEndSequenceId();
}

Exception Types

class TopicAlreadyExistsException extends Exception;
class TopicNotFoundException extends Exception;
class ServiceUnavailableException extends Exception;