CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.
—
Core topic lifecycle operations for managing messaging channels in distributed systems. Topics represent named communication channels with configurable properties like TTL and generation identifiers.
Creates a new messaging topic with the specified metadata and properties.
/**
* Creates a topic with the given metadata.
* @param topicMetadata topic to be created with properties
* @throws TopicAlreadyExistsException if the topic already exists
* @throws IOException if failed to create the topic
* @throws ServiceUnavailableException if the messaging service is not available
*/
void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException;Usage Examples:
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.proto.id.TopicId;
import co.cask.cdap.proto.id.NamespaceId;
// Create topic with default properties
NamespaceId namespace = new NamespaceId("analytics");
TopicId topicId = namespace.topic("user-events");
TopicMetadata metadata = new TopicMetadata(topicId,
TopicMetadata.TTL_KEY, "7200", // 2 hours
TopicMetadata.GENERATION_KEY, "1"
);
messagingService.createTopic(metadata);
// Create topic using properties map
Map<String, String> properties = new HashMap<>();
properties.put(TopicMetadata.TTL_KEY, "3600");
properties.put(TopicMetadata.GENERATION_KEY, "1");
TopicMetadata metadata2 = new TopicMetadata(topicId, properties);
messagingService.createTopic(metadata2);Updates the metadata and properties of an existing topic.
/**
* Updates the metadata of a topic.
* @param topicMetadata the topic metadata to be updated
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to update the topic metadata
* @throws ServiceUnavailableException if the messaging service is not available
*/
void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException;Usage Examples:
// Update topic TTL
TopicMetadata updatedMetadata = new TopicMetadata(topicId,
TopicMetadata.TTL_KEY, "10800", // 3 hours
TopicMetadata.GENERATION_KEY, "2"
);
messagingService.updateTopic(updatedMetadata);Removes a topic and all its associated messages.
/**
* Deletes a topic
* @param topicId the topic to be deleted
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to delete the topic
* @throws ServiceUnavailableException if the messaging service is not available
*/
void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException;Usage Examples:
TopicId topicToDelete = new NamespaceId("temp").topic("expired-data");
messagingService.deleteTopic(topicToDelete);Retrieves the current metadata and properties of a topic.
/**
* Returns the metadata of the given topic.
* @param topicId message topic
* @return the TopicMetadata of the given topic
* @throws TopicNotFoundException if the topic doesn't exist
* @throws IOException if failed to retrieve topic metadata
* @throws ServiceUnavailableException if the messaging service is not available
*/
TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException;Usage Examples:
TopicMetadata metadata = messagingService.getTopic(topicId);
System.out.println("Topic TTL: " + metadata.getTTL());
System.out.println("Topic Generation: " + metadata.getGeneration());
System.out.println("Topic exists: " + metadata.exists());
// Access raw properties
Map<String, String> properties = metadata.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}Returns all topics within a specified namespace.
/**
* Returns the list of topics available under the given namespace.
* @param namespaceId the namespace to list topics under
* @return a List of TopicId
* @throws IOException if failed to retrieve topics
* @throws ServiceUnavailableException if the messaging service is not available
*/
List<TopicId> listTopics(NamespaceId namespaceId) throws IOException;Usage Examples:
NamespaceId namespace = new NamespaceId("production");
List<TopicId> topics = messagingService.listTopics(namespace);
System.out.println("Topics in namespace " + namespace.getNamespace() + ":");
for (TopicId topic : topics) {
System.out.println(" - " + topic.getTopic());
}Represents metadata about a messaging topic with validation and property management.
class TopicMetadata {
/**
* Creates a new instance for the given topic with the associated properties.
*/
TopicMetadata(TopicId topicId, Map<String, String> properties);
/**
* Creates a new instance with validation option.
* @param validate if true, validates that properties contain valid values for all required properties
* @throws IllegalArgumentException if validate is true and the provided properties are not valid
*/
TopicMetadata(TopicId topicId, Map<String, String> properties, boolean validate);
/**
* Creates a new instance with varargs properties.
* @param properties a list of key/value pairs that will get converted into a Map
*/
TopicMetadata(TopicId topicId, Object... properties);
/** Returns the topic id that this metadata is associated with */
TopicId getTopicId();
/** Returns the raw properties for the topic */
Map<String, String> getProperties();
/** Returns the generation id for the topic */
int getGeneration();
/** Check whether the topic exists */
boolean exists();
/** Returns the time-to-live in seconds property of the topic */
long getTTL();
static final String GENERATION_KEY = "generation";
static final String TTL_KEY = "ttl";
}Topic properties are validated to ensure required values are present and valid:
// This will throw IllegalArgumentException if properties are invalid
TopicMetadata metadata = new TopicMetadata(topicId, properties, true);Common exceptions when working with topics:
try {
messagingService.createTopic(metadata);
} catch (TopicAlreadyExistsException e) {
System.out.println("Topic already exists: " + e.getTopicName());
} catch (IOException e) {
System.out.println("Failed to create topic: " + e.getMessage());
} catch (ServiceUnavailableException e) {
System.out.println("Messaging service unavailable");
}
try {
TopicMetadata metadata = messagingService.getTopic(topicId);
} catch (TopicNotFoundException e) {
System.out.println("Topic not found: " + e.getTopicName());
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-tms