or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdclient-management.mdindex.mdmessage-consumption.mdmessage-production.mdmessage-reading.mdschema-serialization.mdtransaction-support.md

transaction-support.mddocs/

0

# Transaction Support

1

2

Transactional messaging for exactly-once semantics, multi-topic atomic operations, and coordinated message processing across producers and consumers.

3

4

## Capabilities

5

6

### Transaction Interface

7

8

Core interface for transactional operations providing exactly-once message processing guarantees.

9

10

```java { .api }

11

/**

12

* Transaction interface for atomic message operations

13

* Provides exactly-once semantics across multiple topics and operations

14

*/

15

interface Transaction {

16

/** Get transaction ID */

17

TxnID getTxnID();

18

19

/** Get transaction state */

20

TransactionState getState();

21

22

/** Commit transaction */

23

CompletableFuture<Void> commit();

24

25

/** Abort transaction */

26

CompletableFuture<Void> abort();

27

28

/** Get transaction timeout */

29

long getTransactionTimeout();

30

31

/** Get transaction start timestamp */

32

long getTransactionStartTime();

33

}

34

```

35

36

### TransactionBuilder Configuration

37

38

Builder interface for creating and configuring transactions.

39

40

```java { .api }

41

/**

42

* Builder for creating and configuring transactions

43

*/

44

interface TransactionBuilder {

45

/** Build the transaction asynchronously */

46

CompletableFuture<Transaction> build();

47

48

/** Set transaction timeout (default: coordinator configured timeout) */

49

TransactionBuilder withTransactionTimeout(long timeout, TimeUnit timeUnit);

50

}

51

```

52

53

**Basic Transaction Usage:**

54

55

```java

56

import org.apache.pulsar.client.api.*;

57

import org.apache.pulsar.client.api.transaction.*;

58

59

// Enable transactions in client

60

PulsarClient client = PulsarClient.builder()

61

.serviceUrl("pulsar://localhost:6650")

62

.enableTransaction(true)

63

.build();

64

65

// Create transaction

66

Transaction txn = client.newTransaction()

67

.withTransactionTimeout(1, TimeUnit.MINUTES)

68

.build()

69

.get();

70

71

try {

72

// Produce messages in transaction

73

Producer<String> producer = client.newProducer(Schema.STRING)

74

.topic("transactional-topic")

75

.create();

76

77

producer.newMessage(txn)

78

.value("Message 1")

79

.send();

80

81

producer.newMessage(txn)

82

.value("Message 2")

83

.send();

84

85

// Consume and acknowledge in transaction

86

Consumer<String> consumer = client.newConsumer(Schema.STRING)

87

.topic("input-topic")

88

.subscriptionName("txn-sub")

89

.subscribe();

90

91

Message<String> message = consumer.receive();

92

consumer.acknowledge(message.getMessageId(), txn);

93

94

// Commit transaction

95

txn.commit().get();

96

System.out.println("Transaction committed successfully");

97

98

} catch (Exception e) {

99

// Abort transaction on error

100

txn.abort().get();

101

System.err.println("Transaction aborted: " + e.getMessage());

102

}

103

```

104

105

### Transactional Producer Operations

106

107

Producer operations within transaction context for atomic message publishing.

108

109

```java { .api }

110

/**

111

* Extended TypedMessageBuilder for transactional operations

112

*/

113

interface TypedMessageBuilder<T> {

114

/** Send message within transaction context */

115

MessageId send(Transaction txn) throws PulsarClientException;

116

117

/** Send message within transaction context asynchronously */

118

CompletableFuture<MessageId> sendAsync(Transaction txn);

119

}

120

121

/**

122

* Producer interface with transaction support

123

*/

124

interface Producer<T> {

125

/** Create message builder for transaction */

126

TypedMessageBuilder<T> newMessage(Transaction txn);

127

128

/** Send message in transaction */

129

MessageId send(T message, Transaction txn) throws PulsarClientException;

130

131

/** Send message in transaction asynchronously */

132

CompletableFuture<MessageId> sendAsync(T message, Transaction txn);

133

}

134

```

135

136

**Transactional Producer Examples:**

137

138

```java

139

// Multi-topic atomic publishing

140

Transaction txn = client.newTransaction().build().get();

141

142

try {

143

Producer<String> orderProducer = client.newProducer(Schema.STRING)

144

.topic("orders")

145

.create();

146

147

Producer<String> inventoryProducer = client.newProducer(Schema.STRING)

148

.topic("inventory")

149

.create();

150

151

Producer<String> paymentProducer = client.newProducer(Schema.STRING)

152

.topic("payments")

153

.create();

154

155

// All messages sent atomically

156

orderProducer.send("order-123", txn);

157

inventoryProducer.send("reserve-item-456", txn);

158

paymentProducer.send("charge-user-789", txn);

159

160

txn.commit().get();

161

162

} catch (Exception e) {

163

txn.abort().get();

164

throw e;

165

}

166

167

// Conditional message publishing

168

Transaction txn = client.newTransaction().build().get();

169

170

try {

171

Producer<String> producer = client.newProducer(Schema.STRING)

172

.topic("conditional-topic")

173

.create();

174

175

// Business logic to determine if messages should be sent

176

if (shouldSendMessages()) {

177

for (String message : getMessagesToSend()) {

178

producer.send(message, txn);

179

}

180

txn.commit().get();

181

} else {

182

txn.abort().get();

183

}

184

} catch (Exception e) {

185

txn.abort().get();

186

}

187

```

188

189

### Transactional Consumer Operations

190

191

Consumer operations within transaction context for atomic message acknowledgment.

192

193

```java { .api }

194

/**

195

* Consumer interface with transaction support

196

*/

197

interface Consumer<T> {

198

/** Acknowledge message within transaction */

199

void acknowledge(MessageId messageId, Transaction txn) throws PulsarClientException;

200

201

/** Acknowledge message within transaction */

202

void acknowledge(Message<?> message, Transaction txn) throws PulsarClientException;

203

204

/** Acknowledge message within transaction asynchronously */

205

CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction txn);

206

207

/** Acknowledge message within transaction asynchronously */

208

CompletableFuture<Void> acknowledgeAsync(Message<?> message, Transaction txn);

209

210

/** Acknowledge cumulatively within transaction */

211

void acknowledgeCumulative(MessageId messageId, Transaction txn) throws PulsarClientException;

212

213

/** Acknowledge cumulatively within transaction */

214

void acknowledgeCumulative(Message<?> message, Transaction txn) throws PulsarClientException;

215

216

/** Acknowledge cumulatively within transaction asynchronously */

217

CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction txn);

218

219

/** Acknowledge cumulatively within transaction asynchronously */

220

CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message, Transaction txn);

221

}

222

```

223

224

**Transactional Consumer Examples:**

225

226

```java

227

// Exactly-once message processing

228

Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)

229

.topic("input-topic")

230

.subscriptionName("processor-sub")

231

.subscribe();

232

233

Producer<String> outputProducer = client.newProducer(Schema.STRING)

234

.topic("output-topic")

235

.create();

236

237

while (true) {

238

Transaction txn = client.newTransaction().build().get();

239

240

try {

241

// Receive message

242

Message<String> inputMessage = inputConsumer.receive();

243

244

// Process message

245

String processedData = processMessage(inputMessage.getValue());

246

247

// Send processed result

248

outputProducer.send(processedData, txn);

249

250

// Acknowledge input message

251

inputConsumer.acknowledge(inputMessage, txn);

252

253

// Commit transaction

254

txn.commit().get();

255

256

} catch (Exception e) {

257

txn.abort().get();

258

System.err.println("Processing failed, transaction aborted: " + e.getMessage());

259

}

260

}

261

262

// Batch processing with transactions

263

List<Message<String>> messageBatch = new ArrayList<>();

264

Transaction txn = client.newTransaction().build().get();

265

266

try {

267

// Collect batch of messages

268

for (int i = 0; i < BATCH_SIZE; i++) {

269

Message<String> message = consumer.receive(1, TimeUnit.SECONDS);

270

if (message != null) {

271

messageBatch.add(message);

272

} else {

273

break; // No more messages available

274

}

275

}

276

277

// Process batch

278

for (Message<String> message : messageBatch) {

279

String result = processMessage(message.getValue());

280

outputProducer.send(result, txn);

281

consumer.acknowledge(message, txn);

282

}

283

284

txn.commit().get();

285

286

} catch (Exception e) {

287

txn.abort().get();

288

}

289

```

290

291

### Transaction Isolation Levels

292

293

Configuration for transaction isolation behavior.

294

295

```java { .api }

296

/**

297

* Transaction isolation levels

298

*/

299

enum TransactionIsolationLevel {

300

/** Read committed isolation level */

301

READ_COMMITTED,

302

/** Read uncommitted isolation level */

303

READ_UNCOMMITTED

304

}

305

306

/**

307

* Consumer builder with transaction isolation configuration

308

*/

309

interface ConsumerBuilder<T> {

310

/** Set transaction isolation level */

311

ConsumerBuilder<T> transactionIsolationLevel(TransactionIsolationLevel isolationLevel);

312

}

313

```

314

315

**Isolation Level Examples:**

316

317

```java

318

// Consumer with read committed isolation (default)

319

Consumer<String> consumer = client.newConsumer(Schema.STRING)

320

.topic("transactional-topic")

321

.subscriptionName("committed-reader")

322

.transactionIsolationLevel(TransactionIsolationLevel.READ_COMMITTED)

323

.subscribe();

324

325

// Only reads messages from committed transactions

326

Message<String> message = consumer.receive();

327

328

// Consumer with read uncommitted isolation

329

Consumer<String> consumer = client.newConsumer(Schema.STRING)

330

.topic("transactional-topic")

331

.subscriptionName("uncommitted-reader")

332

.transactionIsolationLevel(TransactionIsolationLevel.READ_UNCOMMITTED)

333

.subscribe();

334

335

// Reads messages from both committed and uncommitted transactions

336

Message<String> message = consumer.receive();

337

```

338

339

### Transaction State Management

340

341

Monitoring and managing transaction states and lifecycle.

342

343

```java { .api }

344

/**

345

* Transaction states

346

*/

347

enum TransactionState {

348

/** Transaction is open and active */

349

OPEN,

350

/** Transaction is committing */

351

COMMITTING,

352

/** Transaction is aborting */

353

ABORTING,

354

/** Transaction has been committed */

355

COMMITTED,

356

/** Transaction has been aborted */

357

ABORTED,

358

/** Transaction has timed out */

359

TIMEOUT,

360

/** Transaction is in error state */

361

ERROR

362

}

363

364

/**

365

* Transaction ID for tracking and debugging

366

*/

367

interface TxnID {

368

/** Get most significant bits */

369

long getMostSigBits();

370

371

/** Get least significant bits */

372

long getLeastSigBits();

373

374

/** Convert to string representation */

375

String toString();

376

}

377

```

378

379

**Transaction State Examples:**

380

381

```java

382

// Monitor transaction state

383

Transaction txn = client.newTransaction().build().get();

384

385

System.out.println("Transaction ID: " + txn.getTxnID());

386

System.out.println("Initial state: " + txn.getState());

387

System.out.println("Timeout: " + txn.getTransactionTimeout() + "ms");

388

389

try {

390

// Perform operations

391

producer.send("data", txn);

392

393

// Check state before commit

394

if (txn.getState() == TransactionState.OPEN) {

395

txn.commit().get();

396

System.out.println("Final state: " + txn.getState());

397

}

398

399

} catch (Exception e) {

400

System.out.println("Error state: " + txn.getState());

401

if (txn.getState() == TransactionState.OPEN) {

402

txn.abort().get();

403

}

404

}

405

```

406

407

### Transaction Exception Handling

408

409

Comprehensive exception handling for transactional operations.

410

411

```java { .api }

412

/**

413

* Transaction-related exceptions

414

*/

415

class PulsarClientException {

416

/** Transaction conflict detected */

417

static class TransactionConflictException extends PulsarClientException {

418

TransactionConflictException(String msg);

419

}

420

421

/** Transaction has failed operations */

422

static class TransactionHasOperationFailedException extends PulsarClientException {

423

TransactionHasOperationFailedException(String msg);

424

}

425

426

/** Transaction coordinator not available */

427

static class TransactionCoordinatorNotAvailableException extends PulsarClientException {

428

TransactionCoordinatorNotAvailableException(String msg);

429

}

430

431

/** Transaction not found */

432

static class TransactionNotFoundException extends PulsarClientException {

433

TransactionNotFoundException(String msg);

434

}

435

}

436

```

437

438

### Advanced Transaction Patterns

439

440

Common patterns for using transactions in complex scenarios.

441

442

**Pattern 1: Message Forwarding with Exactly-Once Semantics**

443

444

```java

445

public class ExactlyOnceForwarder {

446

private final Consumer<String> inputConsumer;

447

private final Producer<String> outputProducer;

448

private final PulsarClient client;

449

450

public void processMessages() {

451

while (!Thread.currentThread().isInterrupted()) {

452

Transaction txn = null;

453

try {

454

txn = client.newTransaction()

455

.withTransactionTimeout(30, TimeUnit.SECONDS)

456

.build()

457

.get();

458

459

Message<String> inputMessage = inputConsumer.receive(5, TimeUnit.SECONDS);

460

if (inputMessage == null) {

461

txn.abort().get();

462

continue;

463

}

464

465

// Transform message

466

String transformedData = transform(inputMessage.getValue());

467

468

// Send to output topic

469

outputProducer.send(transformedData, txn);

470

471

// Acknowledge input

472

inputConsumer.acknowledge(inputMessage, txn);

473

474

// Commit transaction

475

txn.commit().get();

476

477

} catch (Exception e) {

478

if (txn != null) {

479

try {

480

txn.abort().get();

481

} catch (Exception abortException) {

482

logger.error("Failed to abort transaction", abortException);

483

}

484

}

485

logger.error("Message processing failed", e);

486

}

487

}

488

}

489

}

490

```

491

492

**Pattern 2: Multi-Consumer Coordinated Processing**

493

494

```java

495

public class CoordinatedProcessor {

496

public void processCoordinatedMessages() {

497

Transaction txn = client.newTransaction().build().get();

498

499

try {

500

// Read from multiple input topics

501

Message<String> orderMessage = orderConsumer.receive(1, TimeUnit.SECONDS);

502

Message<String> inventoryMessage = inventoryConsumer.receive(1, TimeUnit.SECONDS);

503

504

if (orderMessage != null && inventoryMessage != null) {

505

// Process both messages together

506

String result = processOrder(orderMessage.getValue(), inventoryMessage.getValue());

507

508

// Send result

509

resultProducer.send(result, txn);

510

511

// Acknowledge both inputs

512

orderConsumer.acknowledge(orderMessage, txn);

513

inventoryConsumer.acknowledge(inventoryMessage, txn);

514

515

txn.commit().get();

516

} else {

517

txn.abort().get();

518

}

519

520

} catch (Exception e) {

521

txn.abort().get();

522

throw e;

523

}

524

}

525

}

526

```

527

528

## Configuration and Best Practices

529

530

```java { .api }

531

/**

532

* Transaction configuration best practices

533

*/

534

class TransactionConfig {

535

/** Recommended timeout for short operations */

536

static final Duration SHORT_TIMEOUT = Duration.ofSeconds(30);

537

538

/** Recommended timeout for long operations */

539

static final Duration LONG_TIMEOUT = Duration.ofMinutes(5);

540

541

/** Maximum recommended timeout */

542

static final Duration MAX_TIMEOUT = Duration.ofMinutes(10);

543

}

544

```

545

546

**Best Practices Examples:**

547

548

```java

549

// Configure client for optimal transaction performance

550

PulsarClient client = PulsarClient.builder()

551

.serviceUrl("pulsar://localhost:6650")

552

.enableTransaction(true)

553

.operationTimeout(60, TimeUnit.SECONDS) // Longer timeout for transactions

554

.build();

555

556

// Use appropriate transaction timeouts

557

Transaction shortTxn = client.newTransaction()

558

.withTransactionTimeout(30, TimeUnit.SECONDS) // For quick operations

559

.build().get();

560

561

Transaction longTxn = client.newTransaction()

562

.withTransactionTimeout(5, TimeUnit.MINUTES) // For batch processing

563

.build().get();

564

565

// Always handle transaction lifecycle properly

566

try (AutoCloseable txnResource = () -> {

567

if (txn.getState() == TransactionState.OPEN) {

568

txn.abort().get();

569

}

570

}) {

571

// Perform transaction operations

572

txn.commit().get();

573

}

574

```