or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-services.mdhigh-level-consumers.mdindex.mdmessage-consumption.mdmessage-publishing.mdtopic-management.md

message-publishing.mddocs/

0

# Message Publishing

1

2

Transactional message publishing capabilities supporting both immediate publishing and distributed transaction workflows. Provides ACID guarantees for message publishing with rollback support.

3

4

## Capabilities

5

6

### Publish Messages

7

8

Publishes messages to a topic, optionally within a transaction context. Returns rollback information for transactional publishes.

9

10

```java { .api }

11

/**

12

* Publishes a list of messages to the messaging system.

13

* @param request the StoreRequest containing messages to be published

14

* @return if the store request is transactional, returns a RollbackDetail containing

15

* information for rollback; otherwise null will be returned

16

* @throws TopicNotFoundException if the topic doesn't exist

17

* @throws IOException if failed to publish messages

18

* @throws ServiceUnavailableException if the messaging service is not available

19

*/

20

RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException;

21

```

22

23

**Usage Examples:**

24

25

```java

26

import co.cask.cdap.messaging.client.StoreRequestBuilder;

27

import co.cask.cdap.messaging.StoreRequest;

28

import co.cask.cdap.messaging.RollbackDetail;

29

30

// Non-transactional publish

31

StoreRequest request = StoreRequestBuilder.of(topicId)

32

.addPayload("Event: user login")

33

.addPayload("Event: page view")

34

.addPayload("Event: button click")

35

.build();

36

37

RollbackDetail rollback = messagingService.publish(request);

38

// rollback will be null for non-transactional publishes

39

40

// Transactional publish

41

Transaction tx = // obtain transaction

42

StoreRequest txRequest = StoreRequestBuilder.of(topicId)

43

.addPayload("Critical event")

44

.setTransaction(tx.getWritePointer())

45

.build();

46

47

RollbackDetail rollbackDetail = messagingService.publish(txRequest);

48

// Store rollback detail for potential rollback

49

```

50

51

### Store Payload

52

53

Stores message payloads for long/distributed transactional publishing scenarios. Used in multi-phase commit patterns.

54

55

```java { .api }

56

/**

57

* Stores a list of messages to the messaging system. For long/distributed transactional publishing use case.

58

* @param request the StoreRequest containing messages to be stored

59

* @throws TopicNotFoundException if the topic doesn't exist

60

* @throws IOException if failed to store messages

61

* @throws ServiceUnavailableException if the messaging service is not available

62

*/

63

void storePayload(StoreRequest request) throws TopicNotFoundException, IOException;

64

```

65

66

**Usage Examples:**

67

68

```java

69

// Store payloads for later commit in distributed transaction

70

StoreRequest storeRequest = StoreRequestBuilder.of(topicId)

71

.addPayload("Batch operation 1")

72

.addPayload("Batch operation 2")

73

.setTransaction(transactionWritePointer)

74

.build();

75

76

messagingService.storePayload(storeRequest);

77

// Payloads are stored but not yet visible to consumers

78

// Will be committed or rolled back later

79

```

80

81

### Rollback Messages

82

83

Rolls back transactionally published messages using rollback detail information.

84

85

```java { .api }

86

/**

87

* Rollbacks messages published to the given topic with the given transaction.

88

* @param topicId the topic where the messages were published

89

* @param rollbackDetail the RollbackDetail as returned by the publish call,

90

* which contains information needed for the rollback

91

* @throws TopicNotFoundException if the topic doesn't exist

92

* @throws IOException if failed to rollback changes

93

* @throws ServiceUnavailableException if the messaging service is not available

94

*/

95

void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException;

96

```

97

98

**Usage Examples:**

99

100

```java

101

try {

102

// Perform transactional operations

103

RollbackDetail rollback = messagingService.publish(txRequest);

104

105

// ... other operations that might fail

106

107

// If we reach here, commit the transaction

108

// (transaction commit handled by transaction manager)

109

110

} catch (Exception e) {

111

// Something failed, rollback the messages

112

if (rollback != null) {

113

messagingService.rollback(topicId, rollback);

114

}

115

throw e;

116

}

117

```

118

119

## StoreRequest Building

120

121

### StoreRequestBuilder

122

123

Builder pattern for creating StoreRequest instances with fluent API.

124

125

```java { .api }

126

class StoreRequestBuilder {

127

/** Creates a new StoreRequestBuilder instance */

128

static StoreRequestBuilder of(TopicId topicId);

129

130

/** Adds a single payload string to the request (UTF-8 encoded) */

131

StoreRequestBuilder addPayload(String payload);

132

133

/** Adds a single byte array to the payload */

134

StoreRequestBuilder addPayload(byte[] payload);

135

136

/** Adds multiple payloads from iterator */

137

StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);

138

139

/** Adds multiple payloads from iterable */

140

StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);

141

142

/** Sets the transaction write pointer for transactional publish */

143

StoreRequestBuilder setTransaction(Long txWritePointer);

144

145

/** Returns true if there is some payload in this builder */

146

boolean hasPayload();

147

148

/** Creates a StoreRequest based on the builder settings */

149

StoreRequest build();

150

}

151

```

152

153

**Usage Examples:**

154

155

```java

156

// Build request with multiple payload types

157

List<byte[]> binaryData = Arrays.asList(

158

"data1".getBytes(),

159

"data2".getBytes()

160

);

161

162

StoreRequest request = StoreRequestBuilder.of(topicId)

163

.addPayload("String message")

164

.addPayload("Another string".getBytes())

165

.addPayloads(binaryData)

166

.setTransaction(txWritePointer)

167

.build();

168

169

// Check if builder has payloads before building

170

StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);

171

if (!builder.hasPayload()) {

172

builder.addPayload("Default message");

173

}

174

StoreRequest request = builder.build();

175

```

176

177

### StoreRequest Properties

178

179

Abstract class representing messages to store with transaction context.

180

181

```java { .api }

182

abstract class StoreRequest implements Iterable<byte[]> {

183

/** Returns the topic ID for this request */

184

TopicId getTopicId();

185

186

/** Returns true if the message should be published transactionally */

187

boolean isTransactional();

188

189

/** Returns the transaction write pointer if transactional */

190

long getTransactionWritePointer();

191

192

/** Returns true if there is payload in this request */

193

abstract boolean hasPayload();

194

195

/** Iterates over message payloads as byte arrays */

196

Iterator<byte[]> iterator();

197

}

198

```

199

200

## RollbackDetail

201

202

Information needed to rollback transactionally published messages.

203

204

```java { .api }

205

interface RollbackDetail {

206

/** Returns the transaction write pointer used when the message was published */

207

long getTransactionWritePointer();

208

209

/** Returns the timestamp for the first payload published with the transaction */

210

long getStartTimestamp();

211

212

/** Returns the sequence id for the first payload published with the transaction */

213

int getStartSequenceId();

214

215

/** Returns the timestamp for the last payload published with the transaction */

216

long getEndTimestamp();

217

218

/** Returns the sequence id for the last payload published with the transaction */

219

int getEndSequenceId();

220

}

221

```

222

223

**Usage Examples:**

224

225

```java

226

RollbackDetail rollback = messagingService.publish(txRequest);

227

if (rollback != null) {

228

System.out.println("Transaction write pointer: " + rollback.getTransactionWritePointer());

229

System.out.println("Time range: " + rollback.getStartTimestamp() + " to " + rollback.getEndTimestamp());

230

System.out.println("Sequence range: " + rollback.getStartSequenceId() + " to " + rollback.getEndSequenceId());

231

}

232

```

233

234

## Transaction Integration

235

236

CDAP TMS integrates with Apache Tephra for transaction support:

237

238

```java

239

import org.apache.tephra.Transaction;

240

import org.apache.tephra.TransactionManager;

241

242

// In a transactional context

243

Transaction tx = transactionManager.startLong();

244

try {

245

StoreRequest request = StoreRequestBuilder.of(topicId)

246

.addPayload("Transactional message")

247

.setTransaction(tx.getWritePointer())

248

.build();

249

250

RollbackDetail rollback = messagingService.publish(request);

251

252

// Other transactional operations...

253

254

transactionManager.commit(tx);

255

} catch (Exception e) {

256

transactionManager.abort(tx);

257

if (rollback != null) {

258

messagingService.rollback(topicId, rollback);

259

}

260

throw e;

261

}

262

```

263

264

## Error Handling

265

266

Common publishing error scenarios:

267

268

```java

269

try {

270

RollbackDetail rollback = messagingService.publish(request);

271

} catch (TopicNotFoundException e) {

272

System.out.println("Topic does not exist: " + e.getTopicName());

273

// Consider creating topic or using different topic

274

} catch (IOException e) {

275

System.out.println("Publishing failed: " + e.getMessage());

276

// Retry logic or error reporting

277

} catch (ServiceUnavailableException e) {

278

System.out.println("Messaging service unavailable");

279

// Wait and retry or use circuit breaker pattern

280

}

281

282

// Validate request before publishing

283

StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);

284

if (!builder.hasPayload()) {

285

throw new IllegalArgumentException("Cannot publish empty request");

286

}

287

```