or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdexecution.mdexpressions.mdindex.mdjdbc.mdmapreduce.mdmonitoring.mdquery-compilation.mdschema-metadata.mdserver.mdtransactions.mdtypes.md

transactions.mddocs/

0

# Transaction Support

1

2

Phoenix provides comprehensive ACID transaction support through integration with transaction managers and distributed transaction protocols. The transaction framework enables consistent data operations across multiple tables and region servers while maintaining Phoenix's SQL semantics.

3

4

## Core Imports

5

6

```java

7

import org.apache.phoenix.transaction.*;

8

import org.apache.phoenix.execute.MutationState;

9

import org.apache.phoenix.jdbc.PhoenixConnection;

10

import java.sql.*;

11

```

12

13

## Transaction Framework

14

15

### PhoenixTransactionClient

16

17

Interface for Phoenix transaction clients providing transaction lifecycle management.

18

19

```java{ .api }

20

public interface PhoenixTransactionClient {

21

// Transaction lifecycle

22

TransactionContext newTransactionContext() throws SQLException

23

void beginTransaction(TransactionContext context) throws SQLException

24

void commitTransaction(TransactionContext context) throws SQLException

25

void rollbackTransaction(TransactionContext context) throws SQLException

26

27

// Transaction state

28

boolean isTransactionRunning(TransactionContext context)

29

long getTransactionId(TransactionContext context)

30

TransactionStatus getTransactionStatus(TransactionContext context)

31

32

// Transaction configuration

33

void setTimeout(long timeoutInSeconds)

34

long getTimeout()

35

36

// Cleanup

37

void close() throws SQLException

38

}

39

```

40

41

### TransactionContext

42

43

Represents a transaction context with state and metadata information.

44

45

```java{ .api }

46

public interface TransactionContext {

47

// Transaction identification

48

long getTransactionId()

49

long getStartTime()

50

TransactionStatus getStatus()

51

52

// Transaction properties

53

IsolationLevel getIsolationLevel()

54

long getTimeoutMs()

55

boolean isReadOnly()

56

57

// Transaction operations

58

void setReadOnly(boolean readOnly)

59

void setTimeout(long timeoutMs)

60

void addCheckpoint() throws SQLException

61

62

// Transaction metadata

63

Map<String, Object> getProperties()

64

void setProperty(String key, Object value)

65

}

66

```

67

68

### TransactionFactory

69

70

Factory for creating transaction-related objects and clients.

71

72

```java{ .api }

73

public class TransactionFactory {

74

// Transaction client creation

75

public static PhoenixTransactionClient getTransactionClient(Configuration config)

76

public static PhoenixTransactionClient getTransactionClient(ConnectionQueryServices services)

77

78

// Transaction provider detection

79

public static TransactionProcessor.Builder getTransactionProcessor(Configuration config,

80

String tableName)

81

public static boolean isTransactionEnabled(Configuration config)

82

83

// Transaction configuration

84

public static void configureTransactionManager(Configuration config, String provider)

85

public static String getConfiguredTransactionManager(Configuration config)

86

}

87

```

88

89

**Usage:**

90

```java

91

// Basic transaction setup

92

Configuration config = HBaseConfiguration.create();

93

config.set("phoenix.transactions.enabled", "true");

94

config.set("data.tx.snapshot.dir", "/tmp/phoenix-tx");

95

96

// Create transaction client

97

PhoenixTransactionClient txClient = TransactionFactory.getTransactionClient(config);

98

99

// Create transaction context

100

TransactionContext txContext = txClient.newTransactionContext();

101

102

// Configure transaction properties

103

txContext.setTimeout(300000); // 5 minutes

104

txContext.setReadOnly(false);

105

106

try {

107

// Begin transaction

108

txClient.beginTransaction(txContext);

109

110

// Perform transactional operations

111

Connection connection = getConnection();

112

connection.setAutoCommit(false);

113

114

Statement stmt = connection.createStatement();

115

stmt.executeUpdate("UPSERT INTO accounts (id, balance) VALUES (1, 1000)");

116

stmt.executeUpdate("UPSERT INTO accounts (id, balance) VALUES (2, 2000)");

117

118

// Transfer money between accounts

119

stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE id = 1");

120

stmt.executeUpdate("UPDATE accounts SET balance = balance + 100 WHERE id = 2");

121

122

// Commit transaction

123

connection.commit();

124

txClient.commitTransaction(txContext);

125

126

System.out.println("Transaction completed successfully");

127

System.out.println("Transaction ID: " + txContext.getTransactionId());

128

129

} catch (SQLException e) {

130

// Rollback on error

131

try {

132

connection.rollback();

133

txClient.rollbackTransaction(txContext);

134

System.out.println("Transaction rolled back due to error: " + e.getMessage());

135

} catch (SQLException rollbackError) {

136

System.err.println("Error during rollback: " + rollbackError.getMessage());

137

}

138

throw e;

139

} finally {

140

txClient.close();

141

}

142

```

143

144

## Transactional Tables

145

146

### Creating Transactional Tables

147

148

```java

149

// Create transactional tables with Phoenix DDL

150

public class TransactionalTableManager {

151

public void createTransactionalTable(Connection connection, String tableName) throws SQLException {

152

String createTableSQL = String.format("""

153

CREATE TABLE %s (

154

account_id BIGINT NOT NULL,

155

account_name VARCHAR(100),

156

balance DECIMAL(15,2),

157

last_updated TIMESTAMP,

158

CONSTRAINT pk PRIMARY KEY (account_id)

159

) TRANSACTIONAL=true

160

""", tableName);

161

162

try (Statement stmt = connection.createStatement()) {

163

stmt.execute(createTableSQL);

164

System.out.println("Created transactional table: " + tableName);

165

}

166

}

167

168

public void alterTableToTransactional(Connection connection, String tableName) throws SQLException {

169

String alterTableSQL = String.format("ALTER TABLE %s SET TRANSACTIONAL=true", tableName);

170

171

try (Statement stmt = connection.createStatement()) {

172

stmt.execute(alterTableSQL);

173

System.out.println("Altered table to transactional: " + tableName);

174

}

175

}

176

177

public boolean isTableTransactional(Connection connection, String tableName) throws SQLException {

178

String sql = """

179

SELECT TRANSACTIONAL FROM SYSTEM.CATALOG

180

WHERE TABLE_NAME = ? AND TABLE_TYPE = 'TABLE'

181

""";

182

183

try (PreparedStatement stmt = connection.prepareStatement(sql)) {

184

stmt.setString(1, tableName.toUpperCase());

185

ResultSet rs = stmt.executeQuery();

186

187

if (rs.next()) {

188

Boolean transactional = rs.getBoolean("TRANSACTIONAL");

189

return !rs.wasNull() && transactional;

190

}

191

return false;

192

}

193

}

194

}

195

196

// Usage

197

TransactionalTableManager tableManager = new TransactionalTableManager();

198

199

// Create transactional table

200

tableManager.createTransactionalTable(connection, "accounts");

201

tableManager.createTransactionalTable(connection, "transactions");

202

203

// Check if table is transactional

204

boolean isTransactional = tableManager.isTableTransactional(connection, "accounts");

205

System.out.println("Accounts table is transactional: " + isTransactional);

206

207

// Convert existing table to transactional

208

tableManager.alterTableToTransactional(connection, "orders");

209

```

210

211

## Transaction Isolation and Consistency

212

213

### Isolation Levels

214

215

```java

216

// Working with transaction isolation levels

217

public class TransactionIsolationManager {

218

public enum IsolationLevel {

219

SERIALIZABLE(Connection.TRANSACTION_SERIALIZABLE),

220

REPEATABLE_READ(Connection.TRANSACTION_REPEATABLE_READ),

221

READ_COMMITTED(Connection.TRANSACTION_READ_COMMITTED),

222

READ_UNCOMMITTED(Connection.TRANSACTION_READ_UNCOMMITTED);

223

224

private final int jdbcLevel;

225

226

IsolationLevel(int jdbcLevel) {

227

this.jdbcLevel = jdbcLevel;

228

}

229

230

public int getJdbcLevel() { return jdbcLevel; }

231

}

232

233

public void setTransactionIsolation(Connection connection, IsolationLevel level) throws SQLException {

234

connection.setTransactionIsolation(level.getJdbcLevel());

235

System.out.println("Set transaction isolation to: " + level);

236

}

237

238

public IsolationLevel getTransactionIsolation(Connection connection) throws SQLException {

239

int jdbcLevel = connection.getTransactionIsolation();

240

for (IsolationLevel level : IsolationLevel.values()) {

241

if (level.getJdbcLevel() == jdbcLevel) {

242

return level;

243

}

244

}

245

return IsolationLevel.READ_COMMITTED; // Default

246

}

247

248

public void demonstrateIsolationLevels(Connection connection) throws SQLException {

249

// Test different isolation levels

250

for (IsolationLevel level : IsolationLevel.values()) {

251

System.out.println("\n=== Testing Isolation Level: " + level + " ===");

252

253

setTransactionIsolation(connection, level);

254

connection.setAutoCommit(false);

255

256

try {

257

// Perform test operations

258

testIsolationLevel(connection, level);

259

connection.commit();

260

System.out.println("Transaction committed successfully");

261

262

} catch (SQLException e) {

263

connection.rollback();

264

System.out.println("Transaction rolled back: " + e.getMessage());

265

}

266

}

267

}

268

269

private void testIsolationLevel(Connection connection, IsolationLevel level) throws SQLException {

270

try (Statement stmt = connection.createStatement()) {

271

// Read initial state

272

ResultSet rs = stmt.executeQuery("SELECT balance FROM accounts WHERE account_id = 1");

273

if (rs.next()) {

274

BigDecimal initialBalance = rs.getBigDecimal("balance");

275

System.out.println("Initial balance: " + initialBalance);

276

}

277

278

// Perform update

279

int updatedRows = stmt.executeUpdate(

280

"UPDATE accounts SET balance = balance + 100 WHERE account_id = 1"

281

);

282

System.out.println("Updated " + updatedRows + " rows");

283

284

// Read updated state

285

rs = stmt.executeQuery("SELECT balance FROM accounts WHERE account_id = 1");

286

if (rs.next()) {

287

BigDecimal newBalance = rs.getBigDecimal("balance");

288

System.out.println("New balance: " + newBalance);

289

}

290

}

291

}

292

}

293

294

// Usage

295

TransactionIsolationManager isolationManager = new TransactionIsolationManager();

296

297

// Set specific isolation level

298

isolationManager.setTransactionIsolation(connection,

299

TransactionIsolationManager.IsolationLevel.SERIALIZABLE);

300

301

// Test all isolation levels

302

isolationManager.demonstrateIsolationLevels(connection);

303

```

304

305

## Advanced Transaction Patterns

306

307

### Distributed Transactions

308

309

```java

310

// Managing distributed transactions across multiple Phoenix connections

311

public class DistributedTransactionManager {

312

private final List<Connection> connections;

313

private final PhoenixTransactionClient txClient;

314

315

public DistributedTransactionManager(List<String> jdbcUrls) throws SQLException {

316

this.connections = new ArrayList<>();

317

for (String url : jdbcUrls) {

318

connections.add(DriverManager.getConnection(url));

319

}

320

321

Configuration config = HBaseConfiguration.create();

322

this.txClient = TransactionFactory.getTransactionClient(config);

323

}

324

325

public void executeDistributedTransaction(DistributedTransactionCallback callback) throws SQLException {

326

TransactionContext txContext = txClient.newTransactionContext();

327

boolean allConnectionsReady = false;

328

329

try {

330

// Begin transaction

331

txClient.beginTransaction(txContext);

332

333

// Prepare all connections

334

for (Connection conn : connections) {

335

conn.setAutoCommit(false);

336

}

337

allConnectionsReady = true;

338

339

// Execute business logic

340

callback.execute(connections, txContext);

341

342

// Commit on all connections

343

for (Connection conn : connections) {

344

conn.commit();

345

}

346

347

// Commit distributed transaction

348

txClient.commitTransaction(txContext);

349

350

System.out.println("Distributed transaction completed successfully");

351

352

} catch (Exception e) {

353

System.err.println("Distributed transaction failed: " + e.getMessage());

354

355

// Rollback all connections

356

if (allConnectionsReady) {

357

for (Connection conn : connections) {

358

try {

359

conn.rollback();

360

} catch (SQLException rollbackError) {

361

System.err.println("Error rolling back connection: " + rollbackError.getMessage());

362

}

363

}

364

}

365

366

// Rollback distributed transaction

367

try {

368

txClient.rollbackTransaction(txContext);

369

} catch (SQLException txRollbackError) {

370

System.err.println("Error rolling back transaction: " + txRollbackError.getMessage());

371

}

372

373

if (e instanceof SQLException) {

374

throw (SQLException) e;

375

} else {

376

throw new SQLException("Distributed transaction failed", e);

377

}

378

}

379

}

380

381

public interface DistributedTransactionCallback {

382

void execute(List<Connection> connections, TransactionContext txContext) throws SQLException;

383

}

384

385

public void close() throws SQLException {

386

for (Connection conn : connections) {

387

conn.close();

388

}

389

txClient.close();

390

}

391

}

392

393

// Usage

394

List<String> jdbcUrls = Arrays.asList(

395

"jdbc:phoenix:cluster1:2181",

396

"jdbc:phoenix:cluster2:2181",

397

"jdbc:phoenix:cluster3:2181"

398

);

399

400

DistributedTransactionManager dtm = new DistributedTransactionManager(jdbcUrls);

401

402

try {

403

dtm.executeDistributedTransaction((connections, txContext) -> {

404

// Business logic spanning multiple Phoenix clusters

405

Connection conn1 = connections.get(0);

406

Connection conn2 = connections.get(1);

407

Connection conn3 = connections.get(2);

408

409

// Update data across clusters

410

try (Statement stmt1 = conn1.createStatement();

411

Statement stmt2 = conn2.createStatement();

412

Statement stmt3 = conn3.createStatement()) {

413

414

stmt1.executeUpdate("UPDATE accounts SET balance = balance - 1000 WHERE account_id = 'ACC1'");

415

stmt2.executeUpdate("UPDATE accounts SET balance = balance + 500 WHERE account_id = 'ACC2'");

416

stmt3.executeUpdate("UPDATE accounts SET balance = balance + 500 WHERE account_id = 'ACC3'");

417

418

System.out.println("Distributed operations completed");

419

}

420

});

421

} finally {

422

dtm.close();

423

}

424

```

425

426

### Long-Running Transactions

427

428

```java

429

// Managing long-running transactions with checkpointing

430

public class LongRunningTransactionManager {

431

private final PhoenixTransactionClient txClient;

432

private final Connection connection;

433

434

public LongRunningTransactionManager(Connection connection) throws SQLException {

435

this.connection = connection;

436

Configuration config = HBaseConfiguration.create();

437

this.txClient = TransactionFactory.getTransactionClient(config);

438

}

439

440

public void executeLongRunningTransaction(LongRunningCallback callback) throws SQLException {

441

TransactionContext txContext = txClient.newTransactionContext();

442

443

// Set longer timeout for long-running transaction

444

txContext.setTimeout(1800000); // 30 minutes

445

446

try {

447

txClient.beginTransaction(txContext);

448

connection.setAutoCommit(false);

449

450

// Execute with periodic checkpointing

451

callback.execute(connection, txContext, this::createCheckpoint);

452

453

// Final commit

454

connection.commit();

455

txClient.commitTransaction(txContext);

456

457

System.out.println("Long-running transaction completed successfully");

458

System.out.println("Transaction ID: " + txContext.getTransactionId());

459

System.out.println("Duration: " + (System.currentTimeMillis() - txContext.getStartTime()) + "ms");

460

461

} catch (Exception e) {

462

System.err.println("Long-running transaction failed: " + e.getMessage());

463

464

try {

465

connection.rollback();

466

txClient.rollbackTransaction(txContext);

467

} catch (SQLException rollbackError) {

468

System.err.println("Error during rollback: " + rollbackError.getMessage());

469

}

470

471

if (e instanceof SQLException) {

472

throw (SQLException) e;

473

} else {

474

throw new SQLException("Long-running transaction failed", e);

475

}

476

}

477

}

478

479

private void createCheckpoint(TransactionContext txContext) throws SQLException {

480

System.out.println("Creating transaction checkpoint...");

481

txContext.addCheckpoint();

482

483

// Log checkpoint creation

484

System.out.println("Checkpoint created for transaction: " + txContext.getTransactionId());

485

System.out.println("Elapsed time: " + (System.currentTimeMillis() - txContext.getStartTime()) + "ms");

486

}

487

488

public interface LongRunningCallback {

489

void execute(Connection connection, TransactionContext txContext,

490

CheckpointCallback checkpointCallback) throws SQLException;

491

}

492

493

public interface CheckpointCallback {

494

void createCheckpoint(TransactionContext txContext) throws SQLException;

495

}

496

497

public void close() throws SQLException {

498

txClient.close();

499

}

500

}

501

502

// Usage example: Batch processing with checkpoints

503

LongRunningTransactionManager lrtm = new LongRunningTransactionManager(connection);

504

505

try {

506

lrtm.executeLongRunningTransaction((conn, txContext, checkpoint) -> {

507

// Process large batch of records

508

String selectSQL = "SELECT * FROM large_table WHERE processed = false ORDER BY id";

509

String updateSQL = "UPDATE large_table SET processed = true, processed_date = ? WHERE id = ?";

510

511

try (Statement selectStmt = conn.createStatement();

512

PreparedStatement updateStmt = conn.prepareStatement(updateSQL);

513

ResultSet rs = selectStmt.executeQuery(selectSQL)) {

514

515

int processedCount = 0;

516

Timestamp processedDate = new Timestamp(System.currentTimeMillis());

517

518

while (rs.next()) {

519

long id = rs.getLong("id");

520

521

// Process the record (business logic)

522

processRecord(rs);

523

524

// Update processed status

525

updateStmt.setTimestamp(1, processedDate);

526

updateStmt.setLong(2, id);

527

updateStmt.executeUpdate();

528

529

processedCount++;

530

531

// Create checkpoint every 1000 records

532

if (processedCount % 1000 == 0) {

533

checkpoint.createCheckpoint(txContext);

534

System.out.println("Processed " + processedCount + " records");

535

}

536

}

537

538

System.out.println("Total records processed: " + processedCount);

539

}

540

});

541

} finally {

542

lrtm.close();

543

}

544

```

545

546

### Transaction Retry Logic

547

548

```java

549

// Implementing retry logic for transient transaction failures

550

public class TransactionRetryManager {

551

private final int maxRetries;

552

private final long baseRetryDelayMs;

553

private final double backoffMultiplier;

554

555

public TransactionRetryManager(int maxRetries, long baseRetryDelayMs, double backoffMultiplier) {

556

this.maxRetries = maxRetries;

557

this.baseRetryDelayMs = baseRetryDelayMs;

558

this.backoffMultiplier = backoffMultiplier;

559

}

560

561

public <T> T executeWithRetry(Connection connection, RetryableTransactionCallback<T> callback) throws SQLException {

562

SQLException lastException = null;

563

long retryDelay = baseRetryDelayMs;

564

565

for (int attempt = 1; attempt <= maxRetries; attempt++) {

566

try {

567

return executeTransaction(connection, callback);

568

569

} catch (SQLException e) {

570

lastException = e;

571

572

if (!isRetryableException(e) || attempt == maxRetries) {

573

throw e;

574

}

575

576

System.out.println("Transaction failed (attempt " + attempt + "/" + maxRetries + "): " + e.getMessage());

577

System.out.println("Retrying in " + retryDelay + "ms...");

578

579

try {

580

Thread.sleep(retryDelay);

581

} catch (InterruptedException ie) {

582

Thread.currentThread().interrupt();

583

throw new SQLException("Transaction retry interrupted", ie);

584

}

585

586

retryDelay = Math.round(retryDelay * backoffMultiplier);

587

}

588

}

589

590

throw new SQLException("Transaction failed after " + maxRetries + " attempts", lastException);

591

}

592

593

private <T> T executeTransaction(Connection connection, RetryableTransactionCallback<T> callback) throws SQLException {

594

connection.setAutoCommit(false);

595

boolean committed = false;

596

597

try {

598

T result = callback.execute(connection);

599

connection.commit();

600

committed = true;

601

return result;

602

603

} catch (SQLException e) {

604

if (!committed) {

605

try {

606

connection.rollback();

607

} catch (SQLException rollbackError) {

608

System.err.println("Error during rollback: " + rollbackError.getMessage());

609

}

610

}

611

throw e;

612

}

613

}

614

615

private boolean isRetryableException(SQLException e) {

616

// Check for retryable error conditions

617

String sqlState = e.getSQLState();

618

int errorCode = e.getErrorCode();

619

620

// Common retryable conditions

621

return sqlState != null && (

622

sqlState.startsWith("08") || // Connection exceptions

623

sqlState.equals("40001") || // Serialization failure

624

sqlState.equals("40P01") || // Deadlock detected

625

"CONNECTION_THROTTLED".equals(e.getMessage()) ||

626

"REGION_TOO_BUSY".equals(e.getMessage())

627

);

628

}

629

630

public interface RetryableTransactionCallback<T> {

631

T execute(Connection connection) throws SQLException;

632

}

633

}

634

635

// Usage

636

TransactionRetryManager retryManager = new TransactionRetryManager(3, 1000, 2.0);

637

638

try {

639

String result = retryManager.executeWithRetry(connection, (conn) -> {

640

// Transactional business logic that might fail transiently

641

try (Statement stmt = conn.createStatement()) {

642

stmt.executeUpdate("UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 'PROD123'");

643

644

ResultSet rs = stmt.executeQuery("SELECT quantity FROM inventory WHERE product_id = 'PROD123'");

645

if (rs.next()) {

646

int remainingQuantity = rs.getInt("quantity");

647

if (remainingQuantity < 0) {

648

throw new SQLException("Insufficient inventory");

649

}

650

651

stmt.executeUpdate(

652

"INSERT INTO order_items (order_id, product_id, quantity) VALUES ('ORD456', 'PROD123', 1)"

653

);

654

655

return "Order item created successfully, remaining quantity: " + remainingQuantity;

656

} else {

657

throw new SQLException("Product not found");

658

}

659

}

660

});

661

662

System.out.println("Transaction result: " + result);

663

664

} catch (SQLException e) {

665

System.err.println("Transaction failed permanently: " + e.getMessage());

666

}

667

```

668

669

### Transaction Monitoring and Debugging

670

671

```java

672

// Transaction monitoring and debugging utilities

673

public class TransactionMonitor {

674

private final Map<Long, TransactionInfo> activeTransactions = new ConcurrentHashMap<>();

675

676

public void startMonitoring(TransactionContext txContext) {

677

TransactionInfo info = new TransactionInfo(

678

txContext.getTransactionId(),

679

System.currentTimeMillis(),

680

Thread.currentThread().getName()

681

);

682

activeTransactions.put(txContext.getTransactionId(), info);

683

684

System.out.println("Started monitoring transaction: " + txContext.getTransactionId());

685

}

686

687

public void recordOperation(long transactionId, String operation, String details) {

688

TransactionInfo info = activeTransactions.get(transactionId);

689

if (info != null) {

690

info.addOperation(operation, details);

691

}

692

}

693

694

public void stopMonitoring(long transactionId, boolean committed) {

695

TransactionInfo info = activeTransactions.remove(transactionId);

696

if (info != null) {

697

info.setEndTime(System.currentTimeMillis());

698

info.setCommitted(committed);

699

700

// Log transaction summary

701

System.out.println("\n=== Transaction Summary ===");

702

System.out.println("Transaction ID: " + transactionId);

703

System.out.println("Thread: " + info.getThreadName());

704

System.out.println("Duration: " + info.getDuration() + "ms");

705

System.out.println("Status: " + (committed ? "COMMITTED" : "ROLLED BACK"));

706

System.out.println("Operations: " + info.getOperations().size());

707

708

for (TransactionOperation op : info.getOperations()) {

709

System.out.println(" " + op.getTimestamp() + " - " + op.getOperation() + ": " + op.getDetails());

710

}

711

System.out.println("=========================\n");

712

}

713

}

714

715

public void reportActiveTransactions() {

716

System.out.println("\n=== Active Transactions ===");

717

long currentTime = System.currentTimeMillis();

718

719

if (activeTransactions.isEmpty()) {

720

System.out.println("No active transactions");

721

} else {

722

for (TransactionInfo info : activeTransactions.values()) {

723

long duration = currentTime - info.getStartTime();

724

System.out.println("Transaction ID: " + info.getTransactionId());

725

System.out.println(" Thread: " + info.getThreadName());

726

System.out.println(" Duration: " + duration + "ms");

727

System.out.println(" Operations: " + info.getOperations().size());

728

}

729

}

730

System.out.println("==========================\n");

731

}

732

733

// Supporting classes

734

private static class TransactionInfo {

735

private final long transactionId;

736

private final long startTime;

737

private final String threadName;

738

private final List<TransactionOperation> operations = new ArrayList<>();

739

private long endTime;

740

private boolean committed;

741

742

public TransactionInfo(long transactionId, long startTime, String threadName) {

743

this.transactionId = transactionId;

744

this.startTime = startTime;

745

this.threadName = threadName;

746

}

747

748

public void addOperation(String operation, String details) {

749

operations.add(new TransactionOperation(System.currentTimeMillis(), operation, details));

750

}

751

752

// Getters and setters

753

public long getTransactionId() { return transactionId; }

754

public long getStartTime() { return startTime; }

755

public String getThreadName() { return threadName; }

756

public List<TransactionOperation> getOperations() { return operations; }

757

public long getEndTime() { return endTime; }

758

public void setEndTime(long endTime) { this.endTime = endTime; }

759

public boolean isCommitted() { return committed; }

760

public void setCommitted(boolean committed) { this.committed = committed; }

761

762

public long getDuration() {

763

return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - startTime;

764

}

765

}

766

767

private static class TransactionOperation {

768

private final long timestamp;

769

private final String operation;

770

private final String details;

771

772

public TransactionOperation(long timestamp, String operation, String details) {

773

this.timestamp = timestamp;

774

this.operation = operation;

775

this.details = details;

776

}

777

778

public long getTimestamp() { return timestamp; }

779

public String getOperation() { return operation; }

780

public String getDetails() { return details; }

781

}

782

}

783

784

// Usage with monitored transactions

785

public class MonitoredTransactionExample {

786

private final TransactionMonitor monitor = new TransactionMonitor();

787

788

public void executeMonitoredTransaction(Connection connection) throws SQLException {

789

Configuration config = HBaseConfiguration.create();

790

PhoenixTransactionClient txClient = TransactionFactory.getTransactionClient(config);

791

TransactionContext txContext = txClient.newTransactionContext();

792

793

try {

794

// Start monitoring

795

monitor.startMonitoring(txContext);

796

797

txClient.beginTransaction(txContext);

798

connection.setAutoCommit(false);

799

800

// Record operations

801

monitor.recordOperation(txContext.getTransactionId(), "BEGIN", "Transaction started");

802

803

try (Statement stmt = connection.createStatement()) {

804

monitor.recordOperation(txContext.getTransactionId(), "UPDATE",

805

"UPDATE accounts SET balance = balance - 100 WHERE account_id = 1");

806

stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1");

807

808

monitor.recordOperation(txContext.getTransactionId(), "UPDATE",

809

"UPDATE accounts SET balance = balance + 100 WHERE account_id = 2");

810

stmt.executeUpdate("UPDATE accounts SET balance = balance + 100 WHERE account_id = 2");

811

812

monitor.recordOperation(txContext.getTransactionId(), "INSERT",

813

"INSERT INTO transaction_log (from_account, to_account, amount) VALUES (1, 2, 100)");

814

stmt.executeUpdate("INSERT INTO transaction_log (from_account, to_account, amount) VALUES (1, 2, 100)");

815

}

816

817

connection.commit();

818

txClient.commitTransaction(txContext);

819

820

monitor.recordOperation(txContext.getTransactionId(), "COMMIT", "Transaction committed");

821

monitor.stopMonitoring(txContext.getTransactionId(), true);

822

823

} catch (SQLException e) {

824

monitor.recordOperation(txContext.getTransactionId(), "ERROR", "Error: " + e.getMessage());

825

826

try {

827

connection.rollback();

828

txClient.rollbackTransaction(txContext);

829

monitor.recordOperation(txContext.getTransactionId(), "ROLLBACK", "Transaction rolled back");

830

} catch (SQLException rollbackError) {

831

monitor.recordOperation(txContext.getTransactionId(), "ROLLBACK_ERROR", "Rollback failed: " + rollbackError.getMessage());

832

}

833

834

monitor.stopMonitoring(txContext.getTransactionId(), false);

835

throw e;

836

} finally {

837

txClient.close();

838

}

839

}

840

841

public static void main(String[] args) throws SQLException {

842

MonitoredTransactionExample example = new MonitoredTransactionExample();

843

Connection connection = DriverManager.getConnection("jdbc:phoenix:localhost:2181");

844

845

try {

846

// Execute monitored transaction

847

example.executeMonitoredTransaction(connection);

848

849

// Report active transactions

850

example.monitor.reportActiveTransactions();

851

852

} finally {

853

connection.close();

854

}

855

}

856

}

857

```

858

859

This completes the comprehensive documentation for Phoenix Core's transaction support, covering transaction clients, contexts, distributed transactions, long-running operations, retry logic, and monitoring capabilities.