0
# Topic Management
1
2
Core topic lifecycle operations for managing messaging channels in distributed systems. Topics represent named communication channels with configurable properties like TTL and generation identifiers.
3
4
## Capabilities
5
6
### Create Topic
7
8
Creates a new messaging topic with the specified metadata and properties.
9
10
```java { .api }
11
/**
12
* Creates a topic with the given metadata.
13
* @param topicMetadata topic to be created with properties
14
* @throws TopicAlreadyExistsException if the topic already exists
15
* @throws IOException if failed to create the topic
16
* @throws ServiceUnavailableException if the messaging service is not available
17
*/
18
void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException;
19
```
20
21
**Usage Examples:**
22
23
```java
24
import co.cask.cdap.messaging.TopicMetadata;
25
import co.cask.cdap.proto.id.TopicId;
26
import co.cask.cdap.proto.id.NamespaceId;
27
28
// Create topic with default properties
29
NamespaceId namespace = new NamespaceId("analytics");
30
TopicId topicId = namespace.topic("user-events");
31
TopicMetadata metadata = new TopicMetadata(topicId,
32
TopicMetadata.TTL_KEY, "7200", // 2 hours
33
TopicMetadata.GENERATION_KEY, "1"
34
);
35
36
messagingService.createTopic(metadata);
37
38
// Create topic using properties map
39
Map<String, String> properties = new HashMap<>();
40
properties.put(TopicMetadata.TTL_KEY, "3600");
41
properties.put(TopicMetadata.GENERATION_KEY, "1");
42
TopicMetadata metadata2 = new TopicMetadata(topicId, properties);
43
messagingService.createTopic(metadata2);
44
```
45
46
### Update Topic
47
48
Updates the metadata and properties of an existing topic.
49
50
```java { .api }
51
/**
52
* Updates the metadata of a topic.
53
* @param topicMetadata the topic metadata to be updated
54
* @throws TopicNotFoundException if the topic doesn't exist
55
* @throws IOException if failed to update the topic metadata
56
* @throws ServiceUnavailableException if the messaging service is not available
57
*/
58
void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException;
59
```
60
61
**Usage Examples:**
62
63
```java
64
// Update topic TTL
65
TopicMetadata updatedMetadata = new TopicMetadata(topicId,
66
TopicMetadata.TTL_KEY, "10800", // 3 hours
67
TopicMetadata.GENERATION_KEY, "2"
68
);
69
70
messagingService.updateTopic(updatedMetadata);
71
```
72
73
### Delete Topic
74
75
Removes a topic and all its associated messages.
76
77
```java { .api }
78
/**
79
* Deletes a topic
80
* @param topicId the topic to be deleted
81
* @throws TopicNotFoundException if the topic doesn't exist
82
* @throws IOException if failed to delete the topic
83
* @throws ServiceUnavailableException if the messaging service is not available
84
*/
85
void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException;
86
```
87
88
**Usage Examples:**
89
90
```java
91
TopicId topicToDelete = new NamespaceId("temp").topic("expired-data");
92
messagingService.deleteTopic(topicToDelete);
93
```
94
95
### Get Topic Metadata
96
97
Retrieves the current metadata and properties of a topic.
98
99
```java { .api }
100
/**
101
* Returns the metadata of the given topic.
102
* @param topicId message topic
103
* @return the TopicMetadata of the given topic
104
* @throws TopicNotFoundException if the topic doesn't exist
105
* @throws IOException if failed to retrieve topic metadata
106
* @throws ServiceUnavailableException if the messaging service is not available
107
*/
108
TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException;
109
```
110
111
**Usage Examples:**
112
113
```java
114
TopicMetadata metadata = messagingService.getTopic(topicId);
115
System.out.println("Topic TTL: " + metadata.getTTL());
116
System.out.println("Topic Generation: " + metadata.getGeneration());
117
System.out.println("Topic exists: " + metadata.exists());
118
119
// Access raw properties
120
Map<String, String> properties = metadata.getProperties();
121
for (Map.Entry<String, String> entry : properties.entrySet()) {
122
System.out.println(entry.getKey() + ": " + entry.getValue());
123
}
124
```
125
126
### List Topics
127
128
Returns all topics within a specified namespace.
129
130
```java { .api }
131
/**
132
* Returns the list of topics available under the given namespace.
133
* @param namespaceId the namespace to list topics under
134
* @return a List of TopicId
135
* @throws IOException if failed to retrieve topics
136
* @throws ServiceUnavailableException if the messaging service is not available
137
*/
138
List<TopicId> listTopics(NamespaceId namespaceId) throws IOException;
139
```
140
141
**Usage Examples:**
142
143
```java
144
NamespaceId namespace = new NamespaceId("production");
145
List<TopicId> topics = messagingService.listTopics(namespace);
146
147
System.out.println("Topics in namespace " + namespace.getNamespace() + ":");
148
for (TopicId topic : topics) {
149
System.out.println(" - " + topic.getTopic());
150
}
151
```
152
153
## Topic Metadata Details
154
155
### TopicMetadata Class
156
157
Represents metadata about a messaging topic with validation and property management.
158
159
```java { .api }
160
class TopicMetadata {
161
/**
162
* Creates a new instance for the given topic with the associated properties.
163
*/
164
TopicMetadata(TopicId topicId, Map<String, String> properties);
165
166
/**
167
* Creates a new instance with validation option.
168
* @param validate if true, validates that properties contain valid values for all required properties
169
* @throws IllegalArgumentException if validate is true and the provided properties are not valid
170
*/
171
TopicMetadata(TopicId topicId, Map<String, String> properties, boolean validate);
172
173
/**
174
* Creates a new instance with varargs properties.
175
* @param properties a list of key/value pairs that will get converted into a Map
176
*/
177
TopicMetadata(TopicId topicId, Object... properties);
178
179
/** Returns the topic id that this metadata is associated with */
180
TopicId getTopicId();
181
182
/** Returns the raw properties for the topic */
183
Map<String, String> getProperties();
184
185
/** Returns the generation id for the topic */
186
int getGeneration();
187
188
/** Check whether the topic exists */
189
boolean exists();
190
191
/** Returns the time-to-live in seconds property of the topic */
192
long getTTL();
193
194
static final String GENERATION_KEY = "generation";
195
static final String TTL_KEY = "ttl";
196
}
197
```
198
199
### Property Validation
200
201
Topic properties are validated to ensure required values are present and valid:
202
203
- **TTL (Time-to-Live)**: Must be a positive integer representing seconds
204
- **Generation**: Must be a non-zero integer for topic versioning
205
206
```java
207
// This will throw IllegalArgumentException if properties are invalid
208
TopicMetadata metadata = new TopicMetadata(topicId, properties, true);
209
```
210
211
## Error Handling
212
213
Common exceptions when working with topics:
214
215
```java
216
try {
217
messagingService.createTopic(metadata);
218
} catch (TopicAlreadyExistsException e) {
219
System.out.println("Topic already exists: " + e.getTopicName());
220
} catch (IOException e) {
221
System.out.println("Failed to create topic: " + e.getMessage());
222
} catch (ServiceUnavailableException e) {
223
System.out.println("Messaging service unavailable");
224
}
225
226
try {
227
TopicMetadata metadata = messagingService.getTopic(topicId);
228
} catch (TopicNotFoundException e) {
229
System.out.println("Topic not found: " + e.getTopicName());
230
}
231
```