or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-co-cask-cdap--cdap-tms

CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/co.cask.cdap/cdap-tms@5.1.x

To install, run

npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-tms@5.1.0

0

# CDAP Transactional Messaging System (TMS)

1

2

CDAP TMS is a Java library providing transactional messaging capabilities built on the Apache Hadoop ecosystem. It implements reliable, ordered message delivery with ACID transaction support, enabling applications to publish and consume messages with strong consistency guarantees in distributed data processing environments.

3

4

## Package Information

5

6

- **Package Name**: cdap-tms

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Installation**: Add to your Maven `pom.xml`:

10

11

```xml

12

<dependency>

13

<groupId>co.cask.cdap</groupId>

14

<artifactId>cdap-tms</artifactId>

15

<version>5.1.2</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import co.cask.cdap.messaging.MessagingService;

23

import co.cask.cdap.messaging.TopicMetadata;

24

import co.cask.cdap.messaging.MessageFetcher;

25

import co.cask.cdap.messaging.StoreRequest;

26

import co.cask.cdap.messaging.client.StoreRequestBuilder;

27

import co.cask.cdap.proto.id.TopicId;

28

import co.cask.cdap.proto.id.NamespaceId;

29

```

30

31

For client usage:

32

```java

33

import co.cask.cdap.messaging.client.ClientMessagingService;

34

import co.cask.cdap.messaging.guice.MessagingClientModule;

35

```

36

37

For exception handling:

38

```java

39

import co.cask.cdap.api.messaging.TopicAlreadyExistsException;

40

import co.cask.cdap.api.messaging.TopicNotFoundException;

41

import co.cask.cdap.common.ServiceUnavailableException;

42

```

43

44

## Basic Usage

45

46

```java

47

import co.cask.cdap.messaging.MessagingService;

48

import co.cask.cdap.messaging.TopicMetadata;

49

import co.cask.cdap.messaging.MessageFetcher;

50

import co.cask.cdap.messaging.client.StoreRequestBuilder;

51

import co.cask.cdap.messaging.data.RawMessage;

52

import co.cask.cdap.proto.id.TopicId;

53

import co.cask.cdap.proto.id.NamespaceId;

54

import co.cask.cdap.api.dataset.lib.CloseableIterator;

55

56

// Create a topic

57

NamespaceId namespace = new NamespaceId("my-namespace");

58

TopicId topicId = namespace.topic("my-topic");

59

TopicMetadata metadata = new TopicMetadata(topicId,

60

TopicMetadata.TTL_KEY, "3600", // 1 hour TTL

61

TopicMetadata.GENERATION_KEY, "1"

62

);

63

64

MessagingService messagingService = // obtain via dependency injection

65

messagingService.createTopic(metadata);

66

67

// Publish messages

68

StoreRequest request = StoreRequestBuilder.of(topicId)

69

.addPayload("Hello, World!")

70

.addPayload("Another message")

71

.build();

72

73

messagingService.publish(request);

74

75

// Consume messages

76

MessageFetcher fetcher = messagingService.prepareFetch(topicId);

77

try (CloseableIterator<RawMessage> messages = fetcher.setLimit(10).fetch()) {

78

while (messages.hasNext()) {

79

RawMessage message = messages.next();

80

String payload = new String(message.getPayload());

81

System.out.println("Received: " + payload);

82

}

83

}

84

```

85

86

## Architecture

87

88

CDAP TMS is built around several key components:

89

90

- **MessagingService**: Primary interface for all messaging operations including topic management and message publishing/consuming

91

- **Storage Layer**: Abstracted storage with support for HBase and LevelDB backends via the TableFactory pattern

92

- **Client Layer**: HTTP-based client implementation for distributed deployments with service discovery

93

- **Transaction Support**: Integration with Apache Tephra for ACID transaction guarantees

94

- **Caching System**: In-memory message caching with weight-based eviction for improved performance

95

- **Subscriber Framework**: High-level consumer abstraction with failure handling and message ID persistence

96

97

## Capabilities

98

99

### Topic Management

100

101

Core topic lifecycle operations including creation, updates, deletion, and metadata retrieval. Essential for managing messaging channels in distributed systems.

102

103

```java { .api }

104

void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException;

105

void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException;

106

void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException;

107

TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException;

108

List<TopicId> listTopics(NamespaceId namespaceId) throws IOException;

109

```

110

111

[Topic Management](./topic-management.md)

112

113

### Message Publishing

114

115

Transactional message publishing with support for both immediate publishing and distributed transaction workflows via payload storage.

116

117

```java { .api }

118

RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException;

119

void storePayload(StoreRequest request) throws TopicNotFoundException, IOException;

120

void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException;

121

```

122

123

[Message Publishing](./message-publishing.md)

124

125

### Message Consumption

126

127

Flexible message fetching with support for various starting positions, transaction isolation, and configurable limits for building robust consumers.

128

129

```java { .api }

130

MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException;

131

132

interface MessageFetcher {

133

MessageFetcher setStartMessage(byte[] startOffset, boolean inclusive);

134

MessageFetcher setStartTime(long startTime);

135

MessageFetcher setTransaction(Transaction transaction);

136

MessageFetcher setLimit(int limit);

137

CloseableIterator<RawMessage> fetch() throws TopicNotFoundException, IOException;

138

}

139

```

140

141

[Message Consumption](./message-consumption.md)

142

143

### Client Services

144

145

HTTP-based messaging client for distributed environments with service discovery and comprehensive error handling.

146

147

```java { .api }

148

class ClientMessagingService implements MessagingService {

149

ClientMessagingService(DiscoveryServiceClient discoveryServiceClient);

150

}

151

152

class StoreRequestBuilder {

153

static StoreRequestBuilder of(TopicId topicId);

154

StoreRequestBuilder addPayload(String payload);

155

StoreRequestBuilder addPayload(byte[] payload);

156

StoreRequestBuilder setTransaction(Long txWritePointer);

157

StoreRequest build();

158

}

159

```

160

161

[Client Services](./client-services.md)

162

163

### High-Level Consumers

164

165

Abstract subscriber service providing automatic message processing, failure handling, transaction management, and message ID persistence for building robust consumers.

166

167

```java { .api }

168

abstract class AbstractMessagingSubscriberService<T> extends AbstractRetryableScheduledService {

169

protected AbstractMessagingSubscriberService(TopicId topicId, boolean transactionalFetch,

170

int fetchSize, int txTimeoutSeconds, int maxTxTimeoutSeconds,

171

long emptyFetchDelayMillis, RetryStrategy retryStrategy, MetricsContext metricsContext);

172

173

protected abstract MessagingContext getMessagingContext();

174

protected abstract String loadMessageId(DatasetContext datasetContext) throws Exception;

175

protected abstract void storeMessageId(DatasetContext datasetContext, String messageId) throws Exception;

176

protected abstract T decodeMessage(Message message) throws Exception;

177

protected abstract void processMessages(DatasetContext datasetContext, Iterator<ImmutablePair<String, T>> messages) throws Exception;

178

}

179

```

180

181

[High-Level Consumers](./high-level-consumers.md)

182

183

## Types

184

185

### Core Data Types

186

187

```java { .api }

188

class TopicMetadata {

189

TopicMetadata(TopicId topicId, Map<String, String> properties);

190

TopicMetadata(TopicId topicId, Object... properties);

191

192

TopicId getTopicId();

193

Map<String, String> getProperties();

194

int getGeneration();

195

boolean exists();

196

long getTTL();

197

198

static final String GENERATION_KEY = "generation";

199

static final String TTL_KEY = "ttl";

200

}

201

202

class RawMessage {

203

RawMessage(byte[] id, byte[] payload);

204

byte[] getId();

205

byte[] getPayload();

206

}

207

208

class MessageId {

209

MessageId(byte[] rawId);

210

long getPublishTimestamp();

211

short getSequenceId();

212

long getPayloadWriteTimestamp();

213

short getPayloadSequenceId();

214

byte[] getRawId();

215

216

static int putRawId(long publishTimestamp, short sequenceId,

217

long writeTimestamp, short payloadSequenceId, byte[] buffer, int offset);

218

219

static final int RAW_ID_SIZE = 24;

220

}

221

222

abstract class StoreRequest implements Iterable<byte[]> {

223

TopicId getTopicId();

224

boolean isTransactional();

225

long getTransactionWritePointer();

226

abstract boolean hasPayload();

227

}

228

229

interface RollbackDetail {

230

long getTransactionWritePointer();

231

long getStartTimestamp();

232

int getStartSequenceId();

233

long getEndTimestamp();

234

int getEndSequenceId();

235

}

236

```

237

238

## Exception Types

239

240

```java { .api }

241

class TopicAlreadyExistsException extends Exception;

242

class TopicNotFoundException extends Exception;

243

class ServiceUnavailableException extends Exception;

244

```