CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.
npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-tms@5.1.00
# CDAP Transactional Messaging System (TMS)
1
2
CDAP TMS is a Java library providing transactional messaging capabilities built on the Apache Hadoop ecosystem. It implements reliable, ordered message delivery with ACID transaction support, enabling applications to publish and consume messages with strong consistency guarantees in distributed data processing environments.
3
4
## Package Information
5
6
- **Package Name**: cdap-tms
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>co.cask.cdap</groupId>
14
<artifactId>cdap-tms</artifactId>
15
<version>5.1.2</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import co.cask.cdap.messaging.MessagingService;
23
import co.cask.cdap.messaging.TopicMetadata;
24
import co.cask.cdap.messaging.MessageFetcher;
25
import co.cask.cdap.messaging.StoreRequest;
26
import co.cask.cdap.messaging.client.StoreRequestBuilder;
27
import co.cask.cdap.proto.id.TopicId;
28
import co.cask.cdap.proto.id.NamespaceId;
29
```
30
31
For client usage:
32
```java
33
import co.cask.cdap.messaging.client.ClientMessagingService;
34
import co.cask.cdap.messaging.guice.MessagingClientModule;
35
```
36
37
For exception handling:
38
```java
39
import co.cask.cdap.api.messaging.TopicAlreadyExistsException;
40
import co.cask.cdap.api.messaging.TopicNotFoundException;
41
import co.cask.cdap.common.ServiceUnavailableException;
42
```
43
44
## Basic Usage
45
46
```java
47
import co.cask.cdap.messaging.MessagingService;
48
import co.cask.cdap.messaging.TopicMetadata;
49
import co.cask.cdap.messaging.MessageFetcher;
50
import co.cask.cdap.messaging.client.StoreRequestBuilder;
51
import co.cask.cdap.messaging.data.RawMessage;
52
import co.cask.cdap.proto.id.TopicId;
53
import co.cask.cdap.proto.id.NamespaceId;
54
import co.cask.cdap.api.dataset.lib.CloseableIterator;
55
56
// Create a topic
57
NamespaceId namespace = new NamespaceId("my-namespace");
58
TopicId topicId = namespace.topic("my-topic");
59
TopicMetadata metadata = new TopicMetadata(topicId,
60
TopicMetadata.TTL_KEY, "3600", // 1 hour TTL
61
TopicMetadata.GENERATION_KEY, "1"
62
);
63
64
MessagingService messagingService = // obtain via dependency injection
65
messagingService.createTopic(metadata);
66
67
// Publish messages
68
StoreRequest request = StoreRequestBuilder.of(topicId)
69
.addPayload("Hello, World!")
70
.addPayload("Another message")
71
.build();
72
73
messagingService.publish(request);
74
75
// Consume messages
76
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
77
try (CloseableIterator<RawMessage> messages = fetcher.setLimit(10).fetch()) {
78
while (messages.hasNext()) {
79
RawMessage message = messages.next();
80
String payload = new String(message.getPayload());
81
System.out.println("Received: " + payload);
82
}
83
}
84
```
85
86
## Architecture
87
88
CDAP TMS is built around several key components:
89
90
- **MessagingService**: Primary interface for all messaging operations including topic management and message publishing/consuming
91
- **Storage Layer**: Abstracted storage with support for HBase and LevelDB backends via the TableFactory pattern
92
- **Client Layer**: HTTP-based client implementation for distributed deployments with service discovery
93
- **Transaction Support**: Integration with Apache Tephra for ACID transaction guarantees
94
- **Caching System**: In-memory message caching with weight-based eviction for improved performance
95
- **Subscriber Framework**: High-level consumer abstraction with failure handling and message ID persistence
96
97
## Capabilities
98
99
### Topic Management
100
101
Core topic lifecycle operations including creation, updates, deletion, and metadata retrieval. Essential for managing messaging channels in distributed systems.
102
103
```java { .api }
104
void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException;
105
void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException;
106
void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException;
107
TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException;
108
List<TopicId> listTopics(NamespaceId namespaceId) throws IOException;
109
```
110
111
[Topic Management](./topic-management.md)
112
113
### Message Publishing
114
115
Transactional message publishing with support for both immediate publishing and distributed transaction workflows via payload storage.
116
117
```java { .api }
118
RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException;
119
void storePayload(StoreRequest request) throws TopicNotFoundException, IOException;
120
void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException;
121
```
122
123
[Message Publishing](./message-publishing.md)
124
125
### Message Consumption
126
127
Flexible message fetching with support for various starting positions, transaction isolation, and configurable limits for building robust consumers.
128
129
```java { .api }
130
MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException;
131
132
interface MessageFetcher {
133
MessageFetcher setStartMessage(byte[] startOffset, boolean inclusive);
134
MessageFetcher setStartTime(long startTime);
135
MessageFetcher setTransaction(Transaction transaction);
136
MessageFetcher setLimit(int limit);
137
CloseableIterator<RawMessage> fetch() throws TopicNotFoundException, IOException;
138
}
139
```
140
141
[Message Consumption](./message-consumption.md)
142
143
### Client Services
144
145
HTTP-based messaging client for distributed environments with service discovery and comprehensive error handling.
146
147
```java { .api }
148
class ClientMessagingService implements MessagingService {
149
ClientMessagingService(DiscoveryServiceClient discoveryServiceClient);
150
}
151
152
class StoreRequestBuilder {
153
static StoreRequestBuilder of(TopicId topicId);
154
StoreRequestBuilder addPayload(String payload);
155
StoreRequestBuilder addPayload(byte[] payload);
156
StoreRequestBuilder setTransaction(Long txWritePointer);
157
StoreRequest build();
158
}
159
```
160
161
[Client Services](./client-services.md)
162
163
### High-Level Consumers
164
165
Abstract subscriber service providing automatic message processing, failure handling, transaction management, and message ID persistence for building robust consumers.
166
167
```java { .api }
168
abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {
169
protected AbstractMessagingSubscriberService(TopicId topicId, boolean transactionalFetch,
170
int fetchSize, int txTimeoutSeconds, int maxTxTimeoutSeconds,
171
long emptyFetchDelayMillis, RetryStrategy retryStrategy, MetricsContext metricsContext);
172
173
protected abstract MessagingContext getMessagingContext();
174
protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;
175
protected abstract void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception;
176
protected abstract T decodeMessage(Message message) throws Exception;
177
protected abstract void processMessages(DatasetContext datasetContext, Iterator<ImmutablePair<String, T>> messages) throws Exception;
178
}
179
```
180
181
[High-Level Consumers](./high-level-consumers.md)
182
183
## Types
184
185
### Core Data Types
186
187
```java { .api }
188
class TopicMetadata {
189
TopicMetadata(TopicId topicId, Map<String, String> properties);
190
TopicMetadata(TopicId topicId, Object... properties);
191
192
TopicId getTopicId();
193
Map<String, String> getProperties();
194
int getGeneration();
195
boolean exists();
196
long getTTL();
197
198
static final String GENERATION_KEY = "generation";
199
static final String TTL_KEY = "ttl";
200
}
201
202
class RawMessage {
203
RawMessage(byte[] id, byte[] payload);
204
byte[] getId();
205
byte[] getPayload();
206
}
207
208
class MessageId {
209
MessageId(byte[] rawId);
210
long getPublishTimestamp();
211
short getSequenceId();
212
long getPayloadWriteTimestamp();
213
short getPayloadSequenceId();
214
byte[] getRawId();
215
216
static int putRawId(long publishTimestamp, short sequenceId,
217
long writeTimestamp, short payloadSequenceId, byte[] buffer, int offset);
218
219
static final int RAW_ID_SIZE = 24;
220
}
221
222
abstract class StoreRequest implements Iterable<byte[]> {
223
TopicId getTopicId();
224
boolean isTransactional();
225
long getTransactionWritePointer();
226
abstract boolean hasPayload();
227
}
228
229
interface RollbackDetail {
230
long getTransactionWritePointer();
231
long getStartTimestamp();
232
int getStartSequenceId();
233
long getEndTimestamp();
234
int getEndSequenceId();
235
}
236
```
237
238
## Exception Types
239
240
```java { .api }
241
class TopicAlreadyExistsException extends Exception;
242
class TopicNotFoundException extends Exception;
243
class ServiceUnavailableException extends Exception;
244
```