CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-tms

CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.

Pending
Overview
Eval results
Files

topic-management.mddocs/

Topic Management

Core topic lifecycle operations for managing messaging channels in distributed systems. Topics represent named communication channels with configurable properties like TTL and generation identifiers.

Capabilities

Create Topic

Creates a new messaging topic with the specified metadata and properties.

/**
 * Creates a topic with the given metadata.
 * @param topicMetadata topic to be created with properties
 * @throws TopicAlreadyExistsException if the topic already exists
 * @throws IOException if failed to create the topic
 * @throws ServiceUnavailableException if the messaging service is not available
 */
void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException;

Usage Examples:

import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.proto.id.TopicId;
import co.cask.cdap.proto.id.NamespaceId;

// Create topic with default properties
NamespaceId namespace = new NamespaceId("analytics");
TopicId topicId = namespace.topic("user-events");
TopicMetadata metadata = new TopicMetadata(topicId, 
    TopicMetadata.TTL_KEY, "7200", // 2 hours
    TopicMetadata.GENERATION_KEY, "1"
);

messagingService.createTopic(metadata);

// Create topic using properties map
Map<String, String> properties = new HashMap<>();
properties.put(TopicMetadata.TTL_KEY, "3600");
properties.put(TopicMetadata.GENERATION_KEY, "1");
TopicMetadata metadata2 = new TopicMetadata(topicId, properties);
messagingService.createTopic(metadata2);

Update Topic

Updates the metadata and properties of an existing topic.

/**
 * Updates the metadata of a topic.
 * @param topicMetadata the topic metadata to be updated
 * @throws TopicNotFoundException if the topic doesn't exist
 * @throws IOException if failed to update the topic metadata
 * @throws ServiceUnavailableException if the messaging service is not available
 */
void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException;

Usage Examples:

// Update topic TTL
TopicMetadata updatedMetadata = new TopicMetadata(topicId, 
    TopicMetadata.TTL_KEY, "10800", // 3 hours
    TopicMetadata.GENERATION_KEY, "2"
);

messagingService.updateTopic(updatedMetadata);

Delete Topic

Removes a topic and all its associated messages.

/**
 * Deletes a topic
 * @param topicId the topic to be deleted
 * @throws TopicNotFoundException if the topic doesn't exist
 * @throws IOException if failed to delete the topic
 * @throws ServiceUnavailableException if the messaging service is not available
 */
void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException;

Usage Examples:

TopicId topicToDelete = new NamespaceId("temp").topic("expired-data");
messagingService.deleteTopic(topicToDelete);

Get Topic Metadata

Retrieves the current metadata and properties of a topic.

/**
 * Returns the metadata of the given topic.
 * @param topicId message topic
 * @return the TopicMetadata of the given topic
 * @throws TopicNotFoundException if the topic doesn't exist
 * @throws IOException if failed to retrieve topic metadata
 * @throws ServiceUnavailableException if the messaging service is not available
 */
TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException;

Usage Examples:

TopicMetadata metadata = messagingService.getTopic(topicId);
System.out.println("Topic TTL: " + metadata.getTTL());
System.out.println("Topic Generation: " + metadata.getGeneration());
System.out.println("Topic exists: " + metadata.exists());

// Access raw properties
Map<String, String> properties = metadata.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
    System.out.println(entry.getKey() + ": " + entry.getValue());
}

List Topics

Returns all topics within a specified namespace.

/**
 * Returns the list of topics available under the given namespace.
 * @param namespaceId the namespace to list topics under
 * @return a List of TopicId
 * @throws IOException if failed to retrieve topics
 * @throws ServiceUnavailableException if the messaging service is not available
 */
List<TopicId> listTopics(NamespaceId namespaceId) throws IOException;

Usage Examples:

NamespaceId namespace = new NamespaceId("production");
List<TopicId> topics = messagingService.listTopics(namespace);

System.out.println("Topics in namespace " + namespace.getNamespace() + ":");
for (TopicId topic : topics) {
    System.out.println("  - " + topic.getTopic());
}

Topic Metadata Details

TopicMetadata Class

Represents metadata about a messaging topic with validation and property management.

class TopicMetadata {
    /**
     * Creates a new instance for the given topic with the associated properties.
     */
    TopicMetadata(TopicId topicId, Map<String, String> properties);
    
    /**
     * Creates a new instance with validation option.
     * @param validate if true, validates that properties contain valid values for all required properties
     * @throws IllegalArgumentException if validate is true and the provided properties are not valid
     */
    TopicMetadata(TopicId topicId, Map<String, String> properties, boolean validate);
    
    /**
     * Creates a new instance with varargs properties.
     * @param properties a list of key/value pairs that will get converted into a Map
     */
    TopicMetadata(TopicId topicId, Object... properties);
    
    /** Returns the topic id that this metadata is associated with */
    TopicId getTopicId();
    
    /** Returns the raw properties for the topic */
    Map<String, String> getProperties();
    
    /** Returns the generation id for the topic */
    int getGeneration();
    
    /** Check whether the topic exists */
    boolean exists();
    
    /** Returns the time-to-live in seconds property of the topic */
    long getTTL();
    
    static final String GENERATION_KEY = "generation";
    static final String TTL_KEY = "ttl";
}

Property Validation

Topic properties are validated to ensure required values are present and valid:

  • TTL (Time-to-Live): Must be a positive integer representing seconds
  • Generation: Must be a non-zero integer for topic versioning
// This will throw IllegalArgumentException if properties are invalid
TopicMetadata metadata = new TopicMetadata(topicId, properties, true);

Error Handling

Common exceptions when working with topics:

try {
    messagingService.createTopic(metadata);
} catch (TopicAlreadyExistsException e) {
    System.out.println("Topic already exists: " + e.getTopicName());
} catch (IOException e) {
    System.out.println("Failed to create topic: " + e.getMessage());
} catch (ServiceUnavailableException e) {
    System.out.println("Messaging service unavailable");
}

try {
    TopicMetadata metadata = messagingService.getTopic(topicId);
} catch (TopicNotFoundException e) {
    System.out.println("Topic not found: " + e.getTopicName());
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-tms

docs

client-services.md

high-level-consumers.md

index.md

message-consumption.md

message-publishing.md

topic-management.md

tile.json