0
# Transaction Support
1
2
Transactional messaging for exactly-once semantics, multi-topic atomic operations, and coordinated message processing across producers and consumers.
3
4
## Capabilities
5
6
### Transaction Interface
7
8
Core interface for transactional operations providing exactly-once message processing guarantees.
9
10
```java { .api }
11
/**
12
* Transaction interface for atomic message operations
13
* Provides exactly-once semantics across multiple topics and operations
14
*/
15
interface Transaction {
16
/** Get transaction ID */
17
TxnID getTxnID();
18
19
/** Get transaction state */
20
TransactionState getState();
21
22
/** Commit transaction */
23
CompletableFuture<Void> commit();
24
25
/** Abort transaction */
26
CompletableFuture<Void> abort();
27
28
/** Get transaction timeout */
29
long getTransactionTimeout();
30
31
/** Get transaction start timestamp */
32
long getTransactionStartTime();
33
}
34
```
35
36
### TransactionBuilder Configuration
37
38
Builder interface for creating and configuring transactions.
39
40
```java { .api }
41
/**
42
* Builder for creating and configuring transactions
43
*/
44
interface TransactionBuilder {
45
/** Build the transaction asynchronously */
46
CompletableFuture<Transaction> build();
47
48
/** Set transaction timeout (default: coordinator configured timeout) */
49
TransactionBuilder withTransactionTimeout(long timeout, TimeUnit timeUnit);
50
}
51
```
52
53
**Basic Transaction Usage:**
54
55
```java
56
import org.apache.pulsar.client.api.*;
57
import org.apache.pulsar.client.api.transaction.*;
58
59
// Enable transactions in client
60
PulsarClient client = PulsarClient.builder()
61
.serviceUrl("pulsar://localhost:6650")
62
.enableTransaction(true)
63
.build();
64
65
// Create transaction
66
Transaction txn = client.newTransaction()
67
.withTransactionTimeout(1, TimeUnit.MINUTES)
68
.build()
69
.get();
70
71
try {
72
// Produce messages in transaction
73
Producer<String> producer = client.newProducer(Schema.STRING)
74
.topic("transactional-topic")
75
.create();
76
77
producer.newMessage(txn)
78
.value("Message 1")
79
.send();
80
81
producer.newMessage(txn)
82
.value("Message 2")
83
.send();
84
85
// Consume and acknowledge in transaction
86
Consumer<String> consumer = client.newConsumer(Schema.STRING)
87
.topic("input-topic")
88
.subscriptionName("txn-sub")
89
.subscribe();
90
91
Message<String> message = consumer.receive();
92
consumer.acknowledge(message.getMessageId(), txn);
93
94
// Commit transaction
95
txn.commit().get();
96
System.out.println("Transaction committed successfully");
97
98
} catch (Exception e) {
99
// Abort transaction on error
100
txn.abort().get();
101
System.err.println("Transaction aborted: " + e.getMessage());
102
}
103
```
104
105
### Transactional Producer Operations
106
107
Producer operations within transaction context for atomic message publishing.
108
109
```java { .api }
110
/**
111
* Extended TypedMessageBuilder for transactional operations
112
*/
113
interface TypedMessageBuilder<T> {
114
/** Send message within transaction context */
115
MessageId send(Transaction txn) throws PulsarClientException;
116
117
/** Send message within transaction context asynchronously */
118
CompletableFuture<MessageId> sendAsync(Transaction txn);
119
}
120
121
/**
122
* Producer interface with transaction support
123
*/
124
interface Producer<T> {
125
/** Create message builder for transaction */
126
TypedMessageBuilder<T> newMessage(Transaction txn);
127
128
/** Send message in transaction */
129
MessageId send(T message, Transaction txn) throws PulsarClientException;
130
131
/** Send message in transaction asynchronously */
132
CompletableFuture<MessageId> sendAsync(T message, Transaction txn);
133
}
134
```
135
136
**Transactional Producer Examples:**
137
138
```java
139
// Multi-topic atomic publishing
140
Transaction txn = client.newTransaction().build().get();
141
142
try {
143
Producer<String> orderProducer = client.newProducer(Schema.STRING)
144
.topic("orders")
145
.create();
146
147
Producer<String> inventoryProducer = client.newProducer(Schema.STRING)
148
.topic("inventory")
149
.create();
150
151
Producer<String> paymentProducer = client.newProducer(Schema.STRING)
152
.topic("payments")
153
.create();
154
155
// All messages sent atomically
156
orderProducer.send("order-123", txn);
157
inventoryProducer.send("reserve-item-456", txn);
158
paymentProducer.send("charge-user-789", txn);
159
160
txn.commit().get();
161
162
} catch (Exception e) {
163
txn.abort().get();
164
throw e;
165
}
166
167
// Conditional message publishing
168
Transaction txn = client.newTransaction().build().get();
169
170
try {
171
Producer<String> producer = client.newProducer(Schema.STRING)
172
.topic("conditional-topic")
173
.create();
174
175
// Business logic to determine if messages should be sent
176
if (shouldSendMessages()) {
177
for (String message : getMessagesToSend()) {
178
producer.send(message, txn);
179
}
180
txn.commit().get();
181
} else {
182
txn.abort().get();
183
}
184
} catch (Exception e) {
185
txn.abort().get();
186
}
187
```
188
189
### Transactional Consumer Operations
190
191
Consumer operations within transaction context for atomic message acknowledgment.
192
193
```java { .api }
194
/**
195
* Consumer interface with transaction support
196
*/
197
interface Consumer<T> {
198
/** Acknowledge message within transaction */
199
void acknowledge(MessageId messageId, Transaction txn) throws PulsarClientException;
200
201
/** Acknowledge message within transaction */
202
void acknowledge(Message<?> message, Transaction txn) throws PulsarClientException;
203
204
/** Acknowledge message within transaction asynchronously */
205
CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction txn);
206
207
/** Acknowledge message within transaction asynchronously */
208
CompletableFuture<Void> acknowledgeAsync(Message<?> message, Transaction txn);
209
210
/** Acknowledge cumulatively within transaction */
211
void acknowledgeCumulative(MessageId messageId, Transaction txn) throws PulsarClientException;
212
213
/** Acknowledge cumulatively within transaction */
214
void acknowledgeCumulative(Message<?> message, Transaction txn) throws PulsarClientException;
215
216
/** Acknowledge cumulatively within transaction asynchronously */
217
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction txn);
218
219
/** Acknowledge cumulatively within transaction asynchronously */
220
CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message, Transaction txn);
221
}
222
```
223
224
**Transactional Consumer Examples:**
225
226
```java
227
// Exactly-once message processing
228
Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
229
.topic("input-topic")
230
.subscriptionName("processor-sub")
231
.subscribe();
232
233
Producer<String> outputProducer = client.newProducer(Schema.STRING)
234
.topic("output-topic")
235
.create();
236
237
while (true) {
238
Transaction txn = client.newTransaction().build().get();
239
240
try {
241
// Receive message
242
Message<String> inputMessage = inputConsumer.receive();
243
244
// Process message
245
String processedData = processMessage(inputMessage.getValue());
246
247
// Send processed result
248
outputProducer.send(processedData, txn);
249
250
// Acknowledge input message
251
inputConsumer.acknowledge(inputMessage, txn);
252
253
// Commit transaction
254
txn.commit().get();
255
256
} catch (Exception e) {
257
txn.abort().get();
258
System.err.println("Processing failed, transaction aborted: " + e.getMessage());
259
}
260
}
261
262
// Batch processing with transactions
263
List<Message<String>> messageBatch = new ArrayList<>();
264
Transaction txn = client.newTransaction().build().get();
265
266
try {
267
// Collect batch of messages
268
for (int i = 0; i < BATCH_SIZE; i++) {
269
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
270
if (message != null) {
271
messageBatch.add(message);
272
} else {
273
break; // No more messages available
274
}
275
}
276
277
// Process batch
278
for (Message<String> message : messageBatch) {
279
String result = processMessage(message.getValue());
280
outputProducer.send(result, txn);
281
consumer.acknowledge(message, txn);
282
}
283
284
txn.commit().get();
285
286
} catch (Exception e) {
287
txn.abort().get();
288
}
289
```
290
291
### Transaction Isolation Levels
292
293
Configuration for transaction isolation behavior.
294
295
```java { .api }
296
/**
297
* Transaction isolation levels
298
*/
299
enum TransactionIsolationLevel {
300
/** Read committed isolation level */
301
READ_COMMITTED,
302
/** Read uncommitted isolation level */
303
READ_UNCOMMITTED
304
}
305
306
/**
307
* Consumer builder with transaction isolation configuration
308
*/
309
interface ConsumerBuilder<T> {
310
/** Set transaction isolation level */
311
ConsumerBuilder<T> transactionIsolationLevel(TransactionIsolationLevel isolationLevel);
312
}
313
```
314
315
**Isolation Level Examples:**
316
317
```java
318
// Consumer with read committed isolation (default)
319
Consumer<String> consumer = client.newConsumer(Schema.STRING)
320
.topic("transactional-topic")
321
.subscriptionName("committed-reader")
322
.transactionIsolationLevel(TransactionIsolationLevel.READ_COMMITTED)
323
.subscribe();
324
325
// Only reads messages from committed transactions
326
Message<String> message = consumer.receive();
327
328
// Consumer with read uncommitted isolation
329
Consumer<String> consumer = client.newConsumer(Schema.STRING)
330
.topic("transactional-topic")
331
.subscriptionName("uncommitted-reader")
332
.transactionIsolationLevel(TransactionIsolationLevel.READ_UNCOMMITTED)
333
.subscribe();
334
335
// Reads messages from both committed and uncommitted transactions
336
Message<String> message = consumer.receive();
337
```
338
339
### Transaction State Management
340
341
Monitoring and managing transaction states and lifecycle.
342
343
```java { .api }
344
/**
345
* Transaction states
346
*/
347
enum TransactionState {
348
/** Transaction is open and active */
349
OPEN,
350
/** Transaction is committing */
351
COMMITTING,
352
/** Transaction is aborting */
353
ABORTING,
354
/** Transaction has been committed */
355
COMMITTED,
356
/** Transaction has been aborted */
357
ABORTED,
358
/** Transaction has timed out */
359
TIMEOUT,
360
/** Transaction is in error state */
361
ERROR
362
}
363
364
/**
365
* Transaction ID for tracking and debugging
366
*/
367
interface TxnID {
368
/** Get most significant bits */
369
long getMostSigBits();
370
371
/** Get least significant bits */
372
long getLeastSigBits();
373
374
/** Convert to string representation */
375
String toString();
376
}
377
```
378
379
**Transaction State Examples:**
380
381
```java
382
// Monitor transaction state
383
Transaction txn = client.newTransaction().build().get();
384
385
System.out.println("Transaction ID: " + txn.getTxnID());
386
System.out.println("Initial state: " + txn.getState());
387
System.out.println("Timeout: " + txn.getTransactionTimeout() + "ms");
388
389
try {
390
// Perform operations
391
producer.send("data", txn);
392
393
// Check state before commit
394
if (txn.getState() == TransactionState.OPEN) {
395
txn.commit().get();
396
System.out.println("Final state: " + txn.getState());
397
}
398
399
} catch (Exception e) {
400
System.out.println("Error state: " + txn.getState());
401
if (txn.getState() == TransactionState.OPEN) {
402
txn.abort().get();
403
}
404
}
405
```
406
407
### Transaction Exception Handling
408
409
Comprehensive exception handling for transactional operations.
410
411
```java { .api }
412
/**
413
* Transaction-related exceptions
414
*/
415
class PulsarClientException {
416
/** Transaction conflict detected */
417
static class TransactionConflictException extends PulsarClientException {
418
TransactionConflictException(String msg);
419
}
420
421
/** Transaction has failed operations */
422
static class TransactionHasOperationFailedException extends PulsarClientException {
423
TransactionHasOperationFailedException(String msg);
424
}
425
426
/** Transaction coordinator not available */
427
static class TransactionCoordinatorNotAvailableException extends PulsarClientException {
428
TransactionCoordinatorNotAvailableException(String msg);
429
}
430
431
/** Transaction not found */
432
static class TransactionNotFoundException extends PulsarClientException {
433
TransactionNotFoundException(String msg);
434
}
435
}
436
```
437
438
### Advanced Transaction Patterns
439
440
Common patterns for using transactions in complex scenarios.
441
442
**Pattern 1: Message Forwarding with Exactly-Once Semantics**
443
444
```java
445
public class ExactlyOnceForwarder {
446
private final Consumer<String> inputConsumer;
447
private final Producer<String> outputProducer;
448
private final PulsarClient client;
449
450
public void processMessages() {
451
while (!Thread.currentThread().isInterrupted()) {
452
Transaction txn = null;
453
try {
454
txn = client.newTransaction()
455
.withTransactionTimeout(30, TimeUnit.SECONDS)
456
.build()
457
.get();
458
459
Message<String> inputMessage = inputConsumer.receive(5, TimeUnit.SECONDS);
460
if (inputMessage == null) {
461
txn.abort().get();
462
continue;
463
}
464
465
// Transform message
466
String transformedData = transform(inputMessage.getValue());
467
468
// Send to output topic
469
outputProducer.send(transformedData, txn);
470
471
// Acknowledge input
472
inputConsumer.acknowledge(inputMessage, txn);
473
474
// Commit transaction
475
txn.commit().get();
476
477
} catch (Exception e) {
478
if (txn != null) {
479
try {
480
txn.abort().get();
481
} catch (Exception abortException) {
482
logger.error("Failed to abort transaction", abortException);
483
}
484
}
485
logger.error("Message processing failed", e);
486
}
487
}
488
}
489
}
490
```
491
492
**Pattern 2: Multi-Consumer Coordinated Processing**
493
494
```java
495
public class CoordinatedProcessor {
496
public void processCoordinatedMessages() {
497
Transaction txn = client.newTransaction().build().get();
498
499
try {
500
// Read from multiple input topics
501
Message<String> orderMessage = orderConsumer.receive(1, TimeUnit.SECONDS);
502
Message<String> inventoryMessage = inventoryConsumer.receive(1, TimeUnit.SECONDS);
503
504
if (orderMessage != null && inventoryMessage != null) {
505
// Process both messages together
506
String result = processOrder(orderMessage.getValue(), inventoryMessage.getValue());
507
508
// Send result
509
resultProducer.send(result, txn);
510
511
// Acknowledge both inputs
512
orderConsumer.acknowledge(orderMessage, txn);
513
inventoryConsumer.acknowledge(inventoryMessage, txn);
514
515
txn.commit().get();
516
} else {
517
txn.abort().get();
518
}
519
520
} catch (Exception e) {
521
txn.abort().get();
522
throw e;
523
}
524
}
525
}
526
```
527
528
## Configuration and Best Practices
529
530
```java { .api }
531
/**
532
* Transaction configuration best practices
533
*/
534
class TransactionConfig {
535
/** Recommended timeout for short operations */
536
static final Duration SHORT_TIMEOUT = Duration.ofSeconds(30);
537
538
/** Recommended timeout for long operations */
539
static final Duration LONG_TIMEOUT = Duration.ofMinutes(5);
540
541
/** Maximum recommended timeout */
542
static final Duration MAX_TIMEOUT = Duration.ofMinutes(10);
543
}
544
```
545
546
**Best Practices Examples:**
547
548
```java
549
// Configure client for optimal transaction performance
550
PulsarClient client = PulsarClient.builder()
551
.serviceUrl("pulsar://localhost:6650")
552
.enableTransaction(true)
553
.operationTimeout(60, TimeUnit.SECONDS) // Longer timeout for transactions
554
.build();
555
556
// Use appropriate transaction timeouts
557
Transaction shortTxn = client.newTransaction()
558
.withTransactionTimeout(30, TimeUnit.SECONDS) // For quick operations
559
.build().get();
560
561
Transaction longTxn = client.newTransaction()
562
.withTransactionTimeout(5, TimeUnit.MINUTES) // For batch processing
563
.build().get();
564
565
// Always handle transaction lifecycle properly
566
try (AutoCloseable txnResource = () -> {
567
if (txn.getState() == TransactionState.OPEN) {
568
txn.abort().get();
569
}
570
}) {
571
// Perform transaction operations
572
txn.commit().get();
573
}
574
```