0
# Message Publishing
1
2
Transactional message publishing capabilities supporting both immediate publishing and distributed transaction workflows. Provides ACID guarantees for message publishing with rollback support.
3
4
## Capabilities
5
6
### Publish Messages
7
8
Publishes messages to a topic, optionally within a transaction context. Returns rollback information for transactional publishes.
9
10
```java { .api }
11
/**
12
* Publishes a list of messages to the messaging system.
13
* @param request the StoreRequest containing messages to be published
14
* @return if the store request is transactional, returns a RollbackDetail containing
15
* information for rollback; otherwise null will be returned
16
* @throws TopicNotFoundException if the topic doesn't exist
17
* @throws IOException if failed to publish messages
18
* @throws ServiceUnavailableException if the messaging service is not available
19
*/
20
RollbackDetail publish(StoreRequest request) throws TopicNotFoundException, IOException;
21
```
22
23
**Usage Examples:**
24
25
```java
26
import co.cask.cdap.messaging.client.StoreRequestBuilder;
27
import co.cask.cdap.messaging.StoreRequest;
28
import co.cask.cdap.messaging.RollbackDetail;
29
30
// Non-transactional publish
31
StoreRequest request = StoreRequestBuilder.of(topicId)
32
.addPayload("Event: user login")
33
.addPayload("Event: page view")
34
.addPayload("Event: button click")
35
.build();
36
37
RollbackDetail rollback = messagingService.publish(request);
38
// rollback will be null for non-transactional publishes
39
40
// Transactional publish
41
Transaction tx = // obtain transaction
42
StoreRequest txRequest = StoreRequestBuilder.of(topicId)
43
.addPayload("Critical event")
44
.setTransaction(tx.getWritePointer())
45
.build();
46
47
RollbackDetail rollbackDetail = messagingService.publish(txRequest);
48
// Store rollback detail for potential rollback
49
```
50
51
### Store Payload
52
53
Stores message payloads for long/distributed transactional publishing scenarios. Used in multi-phase commit patterns.
54
55
```java { .api }
56
/**
57
* Stores a list of messages to the messaging system. For long/distributed transactional publishing use case.
58
* @param request the StoreRequest containing messages to be stored
59
* @throws TopicNotFoundException if the topic doesn't exist
60
* @throws IOException if failed to store messages
61
* @throws ServiceUnavailableException if the messaging service is not available
62
*/
63
void storePayload(StoreRequest request) throws TopicNotFoundException, IOException;
64
```
65
66
**Usage Examples:**
67
68
```java
69
// Store payloads for later commit in distributed transaction
70
StoreRequest storeRequest = StoreRequestBuilder.of(topicId)
71
.addPayload("Batch operation 1")
72
.addPayload("Batch operation 2")
73
.setTransaction(transactionWritePointer)
74
.build();
75
76
messagingService.storePayload(storeRequest);
77
// Payloads are stored but not yet visible to consumers
78
// Will be committed or rolled back later
79
```
80
81
### Rollback Messages
82
83
Rolls back transactionally published messages using rollback detail information.
84
85
```java { .api }
86
/**
87
* Rollbacks messages published to the given topic with the given transaction.
88
* @param topicId the topic where the messages were published
89
* @param rollbackDetail the RollbackDetail as returned by the publish call,
90
* which contains information needed for the rollback
91
* @throws TopicNotFoundException if the topic doesn't exist
92
* @throws IOException if failed to rollback changes
93
* @throws ServiceUnavailableException if the messaging service is not available
94
*/
95
void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException;
96
```
97
98
**Usage Examples:**
99
100
```java
101
try {
102
// Perform transactional operations
103
RollbackDetail rollback = messagingService.publish(txRequest);
104
105
// ... other operations that might fail
106
107
// If we reach here, commit the transaction
108
// (transaction commit handled by transaction manager)
109
110
} catch (Exception e) {
111
// Something failed, rollback the messages
112
if (rollback != null) {
113
messagingService.rollback(topicId, rollback);
114
}
115
throw e;
116
}
117
```
118
119
## StoreRequest Building
120
121
### StoreRequestBuilder
122
123
Builder pattern for creating StoreRequest instances with fluent API.
124
125
```java { .api }
126
class StoreRequestBuilder {
127
/** Creates a new StoreRequestBuilder instance */
128
static StoreRequestBuilder of(TopicId topicId);
129
130
/** Adds a single payload string to the request (UTF-8 encoded) */
131
StoreRequestBuilder addPayload(String payload);
132
133
/** Adds a single byte array to the payload */
134
StoreRequestBuilder addPayload(byte[] payload);
135
136
/** Adds multiple payloads from iterator */
137
StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);
138
139
/** Adds multiple payloads from iterable */
140
StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);
141
142
/** Sets the transaction write pointer for transactional publish */
143
StoreRequestBuilder setTransaction(Long txWritePointer);
144
145
/** Returns true if there is some payload in this builder */
146
boolean hasPayload();
147
148
/** Creates a StoreRequest based on the builder settings */
149
StoreRequest build();
150
}
151
```
152
153
**Usage Examples:**
154
155
```java
156
// Build request with multiple payload types
157
List<byte[]> binaryData = Arrays.asList(
158
"data1".getBytes(),
159
"data2".getBytes()
160
);
161
162
StoreRequest request = StoreRequestBuilder.of(topicId)
163
.addPayload("String message")
164
.addPayload("Another string".getBytes())
165
.addPayloads(binaryData)
166
.setTransaction(txWritePointer)
167
.build();
168
169
// Check if builder has payloads before building
170
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
171
if (!builder.hasPayload()) {
172
builder.addPayload("Default message");
173
}
174
StoreRequest request = builder.build();
175
```
176
177
### StoreRequest Properties
178
179
Abstract class representing messages to store with transaction context.
180
181
```java { .api }
182
abstract class StoreRequest implements Iterable<byte[]> {
183
/** Returns the topic ID for this request */
184
TopicId getTopicId();
185
186
/** Returns true if the message should be published transactionally */
187
boolean isTransactional();
188
189
/** Returns the transaction write pointer if transactional */
190
long getTransactionWritePointer();
191
192
/** Returns true if there is payload in this request */
193
abstract boolean hasPayload();
194
195
/** Iterates over message payloads as byte arrays */
196
Iterator<byte[]> iterator();
197
}
198
```
199
200
## RollbackDetail
201
202
Information needed to rollback transactionally published messages.
203
204
```java { .api }
205
interface RollbackDetail {
206
/** Returns the transaction write pointer used when the message was published */
207
long getTransactionWritePointer();
208
209
/** Returns the timestamp for the first payload published with the transaction */
210
long getStartTimestamp();
211
212
/** Returns the sequence id for the first payload published with the transaction */
213
int getStartSequenceId();
214
215
/** Returns the timestamp for the last payload published with the transaction */
216
long getEndTimestamp();
217
218
/** Returns the sequence id for the last payload published with the transaction */
219
int getEndSequenceId();
220
}
221
```
222
223
**Usage Examples:**
224
225
```java
226
RollbackDetail rollback = messagingService.publish(txRequest);
227
if (rollback != null) {
228
System.out.println("Transaction write pointer: " + rollback.getTransactionWritePointer());
229
System.out.println("Time range: " + rollback.getStartTimestamp() + " to " + rollback.getEndTimestamp());
230
System.out.println("Sequence range: " + rollback.getStartSequenceId() + " to " + rollback.getEndSequenceId());
231
}
232
```
233
234
## Transaction Integration
235
236
CDAP TMS integrates with Apache Tephra for transaction support:
237
238
```java
239
import org.apache.tephra.Transaction;
240
import org.apache.tephra.TransactionManager;
241
242
// In a transactional context
243
Transaction tx = transactionManager.startLong();
244
try {
245
StoreRequest request = StoreRequestBuilder.of(topicId)
246
.addPayload("Transactional message")
247
.setTransaction(tx.getWritePointer())
248
.build();
249
250
RollbackDetail rollback = messagingService.publish(request);
251
252
// Other transactional operations...
253
254
transactionManager.commit(tx);
255
} catch (Exception e) {
256
transactionManager.abort(tx);
257
if (rollback != null) {
258
messagingService.rollback(topicId, rollback);
259
}
260
throw e;
261
}
262
```
263
264
## Error Handling
265
266
Common publishing error scenarios:
267
268
```java
269
try {
270
RollbackDetail rollback = messagingService.publish(request);
271
} catch (TopicNotFoundException e) {
272
System.out.println("Topic does not exist: " + e.getTopicName());
273
// Consider creating topic or using different topic
274
} catch (IOException e) {
275
System.out.println("Publishing failed: " + e.getMessage());
276
// Retry logic or error reporting
277
} catch (ServiceUnavailableException e) {
278
System.out.println("Messaging service unavailable");
279
// Wait and retry or use circuit breaker pattern
280
}
281
282
// Validate request before publishing
283
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
284
if (!builder.hasPayload()) {
285
throw new IllegalArgumentException("Cannot publish empty request");
286
}
287
```