0
# Transaction Support
1
2
Phoenix provides comprehensive ACID transaction support through integration with transaction managers and distributed transaction protocols. The transaction framework enables consistent data operations across multiple tables and region servers while maintaining Phoenix's SQL semantics.
3
4
## Core Imports
5
6
```java
7
import org.apache.phoenix.transaction.*;
8
import org.apache.phoenix.execute.MutationState;
9
import org.apache.phoenix.jdbc.PhoenixConnection;
10
import java.sql.*;
11
```
12
13
## Transaction Framework
14
15
### PhoenixTransactionClient
16
17
Interface for Phoenix transaction clients providing transaction lifecycle management.
18
19
```java{ .api }
20
public interface PhoenixTransactionClient {
21
// Transaction lifecycle
22
TransactionContext newTransactionContext() throws SQLException
23
void beginTransaction(TransactionContext context) throws SQLException
24
void commitTransaction(TransactionContext context) throws SQLException
25
void rollbackTransaction(TransactionContext context) throws SQLException
26
27
// Transaction state
28
boolean isTransactionRunning(TransactionContext context)
29
long getTransactionId(TransactionContext context)
30
TransactionStatus getTransactionStatus(TransactionContext context)
31
32
// Transaction configuration
33
void setTimeout(long timeoutInSeconds)
34
long getTimeout()
35
36
// Cleanup
37
void close() throws SQLException
38
}
39
```
40
41
### TransactionContext
42
43
Represents a transaction context with state and metadata information.
44
45
```java{ .api }
46
public interface TransactionContext {
47
// Transaction identification
48
long getTransactionId()
49
long getStartTime()
50
TransactionStatus getStatus()
51
52
// Transaction properties
53
IsolationLevel getIsolationLevel()
54
long getTimeoutMs()
55
boolean isReadOnly()
56
57
// Transaction operations
58
void setReadOnly(boolean readOnly)
59
void setTimeout(long timeoutMs)
60
void addCheckpoint() throws SQLException
61
62
// Transaction metadata
63
Map<String, Object> getProperties()
64
void setProperty(String key, Object value)
65
}
66
```
67
68
### TransactionFactory
69
70
Factory for creating transaction-related objects and clients.
71
72
```java{ .api }
73
public class TransactionFactory {
74
// Transaction client creation
75
public static PhoenixTransactionClient getTransactionClient(Configuration config)
76
public static PhoenixTransactionClient getTransactionClient(ConnectionQueryServices services)
77
78
// Transaction provider detection
79
public static TransactionProcessor.Builder getTransactionProcessor(Configuration config,
80
String tableName)
81
public static boolean isTransactionEnabled(Configuration config)
82
83
// Transaction configuration
84
public static void configureTransactionManager(Configuration config, String provider)
85
public static String getConfiguredTransactionManager(Configuration config)
86
}
87
```
88
89
**Usage:**
90
```java
91
// Basic transaction setup
92
Configuration config = HBaseConfiguration.create();
93
config.set("phoenix.transactions.enabled", "true");
94
config.set("data.tx.snapshot.dir", "/tmp/phoenix-tx");
95
96
// Create transaction client
97
PhoenixTransactionClient txClient = TransactionFactory.getTransactionClient(config);
98
99
// Create transaction context
100
TransactionContext txContext = txClient.newTransactionContext();
101
102
// Configure transaction properties
103
txContext.setTimeout(300000); // 5 minutes
104
txContext.setReadOnly(false);
105
106
try {
107
// Begin transaction
108
txClient.beginTransaction(txContext);
109
110
// Perform transactional operations
111
Connection connection = getConnection();
112
connection.setAutoCommit(false);
113
114
Statement stmt = connection.createStatement();
115
stmt.executeUpdate("UPSERT INTO accounts (id, balance) VALUES (1, 1000)");
116
stmt.executeUpdate("UPSERT INTO accounts (id, balance) VALUES (2, 2000)");
117
118
// Transfer money between accounts
119
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE id = 1");
120
stmt.executeUpdate("UPDATE accounts SET balance = balance + 100 WHERE id = 2");
121
122
// Commit transaction
123
connection.commit();
124
txClient.commitTransaction(txContext);
125
126
System.out.println("Transaction completed successfully");
127
System.out.println("Transaction ID: " + txContext.getTransactionId());
128
129
} catch (SQLException e) {
130
// Rollback on error
131
try {
132
connection.rollback();
133
txClient.rollbackTransaction(txContext);
134
System.out.println("Transaction rolled back due to error: " + e.getMessage());
135
} catch (SQLException rollbackError) {
136
System.err.println("Error during rollback: " + rollbackError.getMessage());
137
}
138
throw e;
139
} finally {
140
txClient.close();
141
}
142
```
143
144
## Transactional Tables
145
146
### Creating Transactional Tables
147
148
```java
149
// Create transactional tables with Phoenix DDL
150
public class TransactionalTableManager {
151
public void createTransactionalTable(Connection connection, String tableName) throws SQLException {
152
String createTableSQL = String.format("""
153
CREATE TABLE %s (
154
account_id BIGINT NOT NULL,
155
account_name VARCHAR(100),
156
balance DECIMAL(15,2),
157
last_updated TIMESTAMP,
158
CONSTRAINT pk PRIMARY KEY (account_id)
159
) TRANSACTIONAL=true
160
""", tableName);
161
162
try (Statement stmt = connection.createStatement()) {
163
stmt.execute(createTableSQL);
164
System.out.println("Created transactional table: " + tableName);
165
}
166
}
167
168
public void alterTableToTransactional(Connection connection, String tableName) throws SQLException {
169
String alterTableSQL = String.format("ALTER TABLE %s SET TRANSACTIONAL=true", tableName);
170
171
try (Statement stmt = connection.createStatement()) {
172
stmt.execute(alterTableSQL);
173
System.out.println("Altered table to transactional: " + tableName);
174
}
175
}
176
177
public boolean isTableTransactional(Connection connection, String tableName) throws SQLException {
178
String sql = """
179
SELECT TRANSACTIONAL FROM SYSTEM.CATALOG
180
WHERE TABLE_NAME = ? AND TABLE_TYPE = 'TABLE'
181
""";
182
183
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
184
stmt.setString(1, tableName.toUpperCase());
185
ResultSet rs = stmt.executeQuery();
186
187
if (rs.next()) {
188
Boolean transactional = rs.getBoolean("TRANSACTIONAL");
189
return !rs.wasNull() && transactional;
190
}
191
return false;
192
}
193
}
194
}
195
196
// Usage
197
TransactionalTableManager tableManager = new TransactionalTableManager();
198
199
// Create transactional table
200
tableManager.createTransactionalTable(connection, "accounts");
201
tableManager.createTransactionalTable(connection, "transactions");
202
203
// Check if table is transactional
204
boolean isTransactional = tableManager.isTableTransactional(connection, "accounts");
205
System.out.println("Accounts table is transactional: " + isTransactional);
206
207
// Convert existing table to transactional
208
tableManager.alterTableToTransactional(connection, "orders");
209
```
210
211
## Transaction Isolation and Consistency
212
213
### Isolation Levels
214
215
```java
216
// Working with transaction isolation levels
217
public class TransactionIsolationManager {
218
public enum IsolationLevel {
219
SERIALIZABLE(Connection.TRANSACTION_SERIALIZABLE),
220
REPEATABLE_READ(Connection.TRANSACTION_REPEATABLE_READ),
221
READ_COMMITTED(Connection.TRANSACTION_READ_COMMITTED),
222
READ_UNCOMMITTED(Connection.TRANSACTION_READ_UNCOMMITTED);
223
224
private final int jdbcLevel;
225
226
IsolationLevel(int jdbcLevel) {
227
this.jdbcLevel = jdbcLevel;
228
}
229
230
public int getJdbcLevel() { return jdbcLevel; }
231
}
232
233
public void setTransactionIsolation(Connection connection, IsolationLevel level) throws SQLException {
234
connection.setTransactionIsolation(level.getJdbcLevel());
235
System.out.println("Set transaction isolation to: " + level);
236
}
237
238
public IsolationLevel getTransactionIsolation(Connection connection) throws SQLException {
239
int jdbcLevel = connection.getTransactionIsolation();
240
for (IsolationLevel level : IsolationLevel.values()) {
241
if (level.getJdbcLevel() == jdbcLevel) {
242
return level;
243
}
244
}
245
return IsolationLevel.READ_COMMITTED; // Default
246
}
247
248
public void demonstrateIsolationLevels(Connection connection) throws SQLException {
249
// Test different isolation levels
250
for (IsolationLevel level : IsolationLevel.values()) {
251
System.out.println("\n=== Testing Isolation Level: " + level + " ===");
252
253
setTransactionIsolation(connection, level);
254
connection.setAutoCommit(false);
255
256
try {
257
// Perform test operations
258
testIsolationLevel(connection, level);
259
connection.commit();
260
System.out.println("Transaction committed successfully");
261
262
} catch (SQLException e) {
263
connection.rollback();
264
System.out.println("Transaction rolled back: " + e.getMessage());
265
}
266
}
267
}
268
269
private void testIsolationLevel(Connection connection, IsolationLevel level) throws SQLException {
270
try (Statement stmt = connection.createStatement()) {
271
// Read initial state
272
ResultSet rs = stmt.executeQuery("SELECT balance FROM accounts WHERE account_id = 1");
273
if (rs.next()) {
274
BigDecimal initialBalance = rs.getBigDecimal("balance");
275
System.out.println("Initial balance: " + initialBalance);
276
}
277
278
// Perform update
279
int updatedRows = stmt.executeUpdate(
280
"UPDATE accounts SET balance = balance + 100 WHERE account_id = 1"
281
);
282
System.out.println("Updated " + updatedRows + " rows");
283
284
// Read updated state
285
rs = stmt.executeQuery("SELECT balance FROM accounts WHERE account_id = 1");
286
if (rs.next()) {
287
BigDecimal newBalance = rs.getBigDecimal("balance");
288
System.out.println("New balance: " + newBalance);
289
}
290
}
291
}
292
}
293
294
// Usage
295
TransactionIsolationManager isolationManager = new TransactionIsolationManager();
296
297
// Set specific isolation level
298
isolationManager.setTransactionIsolation(connection,
299
TransactionIsolationManager.IsolationLevel.SERIALIZABLE);
300
301
// Test all isolation levels
302
isolationManager.demonstrateIsolationLevels(connection);
303
```
304
305
## Advanced Transaction Patterns
306
307
### Distributed Transactions
308
309
```java
310
// Managing distributed transactions across multiple Phoenix connections
311
public class DistributedTransactionManager {
312
private final List<Connection> connections;
313
private final PhoenixTransactionClient txClient;
314
315
public DistributedTransactionManager(List<String> jdbcUrls) throws SQLException {
316
this.connections = new ArrayList<>();
317
for (String url : jdbcUrls) {
318
connections.add(DriverManager.getConnection(url));
319
}
320
321
Configuration config = HBaseConfiguration.create();
322
this.txClient = TransactionFactory.getTransactionClient(config);
323
}
324
325
public void executeDistributedTransaction(DistributedTransactionCallback callback) throws SQLException {
326
TransactionContext txContext = txClient.newTransactionContext();
327
boolean allConnectionsReady = false;
328
329
try {
330
// Begin transaction
331
txClient.beginTransaction(txContext);
332
333
// Prepare all connections
334
for (Connection conn : connections) {
335
conn.setAutoCommit(false);
336
}
337
allConnectionsReady = true;
338
339
// Execute business logic
340
callback.execute(connections, txContext);
341
342
// Commit on all connections
343
for (Connection conn : connections) {
344
conn.commit();
345
}
346
347
// Commit distributed transaction
348
txClient.commitTransaction(txContext);
349
350
System.out.println("Distributed transaction completed successfully");
351
352
} catch (Exception e) {
353
System.err.println("Distributed transaction failed: " + e.getMessage());
354
355
// Rollback all connections
356
if (allConnectionsReady) {
357
for (Connection conn : connections) {
358
try {
359
conn.rollback();
360
} catch (SQLException rollbackError) {
361
System.err.println("Error rolling back connection: " + rollbackError.getMessage());
362
}
363
}
364
}
365
366
// Rollback distributed transaction
367
try {
368
txClient.rollbackTransaction(txContext);
369
} catch (SQLException txRollbackError) {
370
System.err.println("Error rolling back transaction: " + txRollbackError.getMessage());
371
}
372
373
if (e instanceof SQLException) {
374
throw (SQLException) e;
375
} else {
376
throw new SQLException("Distributed transaction failed", e);
377
}
378
}
379
}
380
381
public interface DistributedTransactionCallback {
382
void execute(List<Connection> connections, TransactionContext txContext) throws SQLException;
383
}
384
385
public void close() throws SQLException {
386
for (Connection conn : connections) {
387
conn.close();
388
}
389
txClient.close();
390
}
391
}
392
393
// Usage
394
List<String> jdbcUrls = Arrays.asList(
395
"jdbc:phoenix:cluster1:2181",
396
"jdbc:phoenix:cluster2:2181",
397
"jdbc:phoenix:cluster3:2181"
398
);
399
400
DistributedTransactionManager dtm = new DistributedTransactionManager(jdbcUrls);
401
402
try {
403
dtm.executeDistributedTransaction((connections, txContext) -> {
404
// Business logic spanning multiple Phoenix clusters
405
Connection conn1 = connections.get(0);
406
Connection conn2 = connections.get(1);
407
Connection conn3 = connections.get(2);
408
409
// Update data across clusters
410
try (Statement stmt1 = conn1.createStatement();
411
Statement stmt2 = conn2.createStatement();
412
Statement stmt3 = conn3.createStatement()) {
413
414
stmt1.executeUpdate("UPDATE accounts SET balance = balance - 1000 WHERE account_id = 'ACC1'");
415
stmt2.executeUpdate("UPDATE accounts SET balance = balance + 500 WHERE account_id = 'ACC2'");
416
stmt3.executeUpdate("UPDATE accounts SET balance = balance + 500 WHERE account_id = 'ACC3'");
417
418
System.out.println("Distributed operations completed");
419
}
420
});
421
} finally {
422
dtm.close();
423
}
424
```
425
426
### Long-Running Transactions
427
428
```java
429
// Managing long-running transactions with checkpointing
430
public class LongRunningTransactionManager {
431
private final PhoenixTransactionClient txClient;
432
private final Connection connection;
433
434
public LongRunningTransactionManager(Connection connection) throws SQLException {
435
this.connection = connection;
436
Configuration config = HBaseConfiguration.create();
437
this.txClient = TransactionFactory.getTransactionClient(config);
438
}
439
440
public void executeLongRunningTransaction(LongRunningCallback callback) throws SQLException {
441
TransactionContext txContext = txClient.newTransactionContext();
442
443
// Set longer timeout for long-running transaction
444
txContext.setTimeout(1800000); // 30 minutes
445
446
try {
447
txClient.beginTransaction(txContext);
448
connection.setAutoCommit(false);
449
450
// Execute with periodic checkpointing
451
callback.execute(connection, txContext, this::createCheckpoint);
452
453
// Final commit
454
connection.commit();
455
txClient.commitTransaction(txContext);
456
457
System.out.println("Long-running transaction completed successfully");
458
System.out.println("Transaction ID: " + txContext.getTransactionId());
459
System.out.println("Duration: " + (System.currentTimeMillis() - txContext.getStartTime()) + "ms");
460
461
} catch (Exception e) {
462
System.err.println("Long-running transaction failed: " + e.getMessage());
463
464
try {
465
connection.rollback();
466
txClient.rollbackTransaction(txContext);
467
} catch (SQLException rollbackError) {
468
System.err.println("Error during rollback: " + rollbackError.getMessage());
469
}
470
471
if (e instanceof SQLException) {
472
throw (SQLException) e;
473
} else {
474
throw new SQLException("Long-running transaction failed", e);
475
}
476
}
477
}
478
479
private void createCheckpoint(TransactionContext txContext) throws SQLException {
480
System.out.println("Creating transaction checkpoint...");
481
txContext.addCheckpoint();
482
483
// Log checkpoint creation
484
System.out.println("Checkpoint created for transaction: " + txContext.getTransactionId());
485
System.out.println("Elapsed time: " + (System.currentTimeMillis() - txContext.getStartTime()) + "ms");
486
}
487
488
public interface LongRunningCallback {
489
void execute(Connection connection, TransactionContext txContext,
490
CheckpointCallback checkpointCallback) throws SQLException;
491
}
492
493
public interface CheckpointCallback {
494
void createCheckpoint(TransactionContext txContext) throws SQLException;
495
}
496
497
public void close() throws SQLException {
498
txClient.close();
499
}
500
}
501
502
// Usage example: Batch processing with checkpoints
503
LongRunningTransactionManager lrtm = new LongRunningTransactionManager(connection);
504
505
try {
506
lrtm.executeLongRunningTransaction((conn, txContext, checkpoint) -> {
507
// Process large batch of records
508
String selectSQL = "SELECT * FROM large_table WHERE processed = false ORDER BY id";
509
String updateSQL = "UPDATE large_table SET processed = true, processed_date = ? WHERE id = ?";
510
511
try (Statement selectStmt = conn.createStatement();
512
PreparedStatement updateStmt = conn.prepareStatement(updateSQL);
513
ResultSet rs = selectStmt.executeQuery(selectSQL)) {
514
515
int processedCount = 0;
516
Timestamp processedDate = new Timestamp(System.currentTimeMillis());
517
518
while (rs.next()) {
519
long id = rs.getLong("id");
520
521
// Process the record (business logic)
522
processRecord(rs);
523
524
// Update processed status
525
updateStmt.setTimestamp(1, processedDate);
526
updateStmt.setLong(2, id);
527
updateStmt.executeUpdate();
528
529
processedCount++;
530
531
// Create checkpoint every 1000 records
532
if (processedCount % 1000 == 0) {
533
checkpoint.createCheckpoint(txContext);
534
System.out.println("Processed " + processedCount + " records");
535
}
536
}
537
538
System.out.println("Total records processed: " + processedCount);
539
}
540
});
541
} finally {
542
lrtm.close();
543
}
544
```
545
546
### Transaction Retry Logic
547
548
```java
549
// Implementing retry logic for transient transaction failures
550
public class TransactionRetryManager {
551
private final int maxRetries;
552
private final long baseRetryDelayMs;
553
private final double backoffMultiplier;
554
555
public TransactionRetryManager(int maxRetries, long baseRetryDelayMs, double backoffMultiplier) {
556
this.maxRetries = maxRetries;
557
this.baseRetryDelayMs = baseRetryDelayMs;
558
this.backoffMultiplier = backoffMultiplier;
559
}
560
561
public <T> T executeWithRetry(Connection connection, RetryableTransactionCallback<T> callback) throws SQLException {
562
SQLException lastException = null;
563
long retryDelay = baseRetryDelayMs;
564
565
for (int attempt = 1; attempt <= maxRetries; attempt++) {
566
try {
567
return executeTransaction(connection, callback);
568
569
} catch (SQLException e) {
570
lastException = e;
571
572
if (!isRetryableException(e) || attempt == maxRetries) {
573
throw e;
574
}
575
576
System.out.println("Transaction failed (attempt " + attempt + "/" + maxRetries + "): " + e.getMessage());
577
System.out.println("Retrying in " + retryDelay + "ms...");
578
579
try {
580
Thread.sleep(retryDelay);
581
} catch (InterruptedException ie) {
582
Thread.currentThread().interrupt();
583
throw new SQLException("Transaction retry interrupted", ie);
584
}
585
586
retryDelay = Math.round(retryDelay * backoffMultiplier);
587
}
588
}
589
590
throw new SQLException("Transaction failed after " + maxRetries + " attempts", lastException);
591
}
592
593
private <T> T executeTransaction(Connection connection, RetryableTransactionCallback<T> callback) throws SQLException {
594
connection.setAutoCommit(false);
595
boolean committed = false;
596
597
try {
598
T result = callback.execute(connection);
599
connection.commit();
600
committed = true;
601
return result;
602
603
} catch (SQLException e) {
604
if (!committed) {
605
try {
606
connection.rollback();
607
} catch (SQLException rollbackError) {
608
System.err.println("Error during rollback: " + rollbackError.getMessage());
609
}
610
}
611
throw e;
612
}
613
}
614
615
private boolean isRetryableException(SQLException e) {
616
// Check for retryable error conditions
617
String sqlState = e.getSQLState();
618
int errorCode = e.getErrorCode();
619
620
// Common retryable conditions
621
return sqlState != null && (
622
sqlState.startsWith("08") || // Connection exceptions
623
sqlState.equals("40001") || // Serialization failure
624
sqlState.equals("40P01") || // Deadlock detected
625
"CONNECTION_THROTTLED".equals(e.getMessage()) ||
626
"REGION_TOO_BUSY".equals(e.getMessage())
627
);
628
}
629
630
public interface RetryableTransactionCallback<T> {
631
T execute(Connection connection) throws SQLException;
632
}
633
}
634
635
// Usage
636
TransactionRetryManager retryManager = new TransactionRetryManager(3, 1000, 2.0);
637
638
try {
639
String result = retryManager.executeWithRetry(connection, (conn) -> {
640
// Transactional business logic that might fail transiently
641
try (Statement stmt = conn.createStatement()) {
642
stmt.executeUpdate("UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 'PROD123'");
643
644
ResultSet rs = stmt.executeQuery("SELECT quantity FROM inventory WHERE product_id = 'PROD123'");
645
if (rs.next()) {
646
int remainingQuantity = rs.getInt("quantity");
647
if (remainingQuantity < 0) {
648
throw new SQLException("Insufficient inventory");
649
}
650
651
stmt.executeUpdate(
652
"INSERT INTO order_items (order_id, product_id, quantity) VALUES ('ORD456', 'PROD123', 1)"
653
);
654
655
return "Order item created successfully, remaining quantity: " + remainingQuantity;
656
} else {
657
throw new SQLException("Product not found");
658
}
659
}
660
});
661
662
System.out.println("Transaction result: " + result);
663
664
} catch (SQLException e) {
665
System.err.println("Transaction failed permanently: " + e.getMessage());
666
}
667
```
668
669
### Transaction Monitoring and Debugging
670
671
```java
672
// Transaction monitoring and debugging utilities
673
public class TransactionMonitor {
674
private final Map<Long, TransactionInfo> activeTransactions = new ConcurrentHashMap<>();
675
676
public void startMonitoring(TransactionContext txContext) {
677
TransactionInfo info = new TransactionInfo(
678
txContext.getTransactionId(),
679
System.currentTimeMillis(),
680
Thread.currentThread().getName()
681
);
682
activeTransactions.put(txContext.getTransactionId(), info);
683
684
System.out.println("Started monitoring transaction: " + txContext.getTransactionId());
685
}
686
687
public void recordOperation(long transactionId, String operation, String details) {
688
TransactionInfo info = activeTransactions.get(transactionId);
689
if (info != null) {
690
info.addOperation(operation, details);
691
}
692
}
693
694
public void stopMonitoring(long transactionId, boolean committed) {
695
TransactionInfo info = activeTransactions.remove(transactionId);
696
if (info != null) {
697
info.setEndTime(System.currentTimeMillis());
698
info.setCommitted(committed);
699
700
// Log transaction summary
701
System.out.println("\n=== Transaction Summary ===");
702
System.out.println("Transaction ID: " + transactionId);
703
System.out.println("Thread: " + info.getThreadName());
704
System.out.println("Duration: " + info.getDuration() + "ms");
705
System.out.println("Status: " + (committed ? "COMMITTED" : "ROLLED BACK"));
706
System.out.println("Operations: " + info.getOperations().size());
707
708
for (TransactionOperation op : info.getOperations()) {
709
System.out.println(" " + op.getTimestamp() + " - " + op.getOperation() + ": " + op.getDetails());
710
}
711
System.out.println("=========================\n");
712
}
713
}
714
715
public void reportActiveTransactions() {
716
System.out.println("\n=== Active Transactions ===");
717
long currentTime = System.currentTimeMillis();
718
719
if (activeTransactions.isEmpty()) {
720
System.out.println("No active transactions");
721
} else {
722
for (TransactionInfo info : activeTransactions.values()) {
723
long duration = currentTime - info.getStartTime();
724
System.out.println("Transaction ID: " + info.getTransactionId());
725
System.out.println(" Thread: " + info.getThreadName());
726
System.out.println(" Duration: " + duration + "ms");
727
System.out.println(" Operations: " + info.getOperations().size());
728
}
729
}
730
System.out.println("==========================\n");
731
}
732
733
// Supporting classes
734
private static class TransactionInfo {
735
private final long transactionId;
736
private final long startTime;
737
private final String threadName;
738
private final List<TransactionOperation> operations = new ArrayList<>();
739
private long endTime;
740
private boolean committed;
741
742
public TransactionInfo(long transactionId, long startTime, String threadName) {
743
this.transactionId = transactionId;
744
this.startTime = startTime;
745
this.threadName = threadName;
746
}
747
748
public void addOperation(String operation, String details) {
749
operations.add(new TransactionOperation(System.currentTimeMillis(), operation, details));
750
}
751
752
// Getters and setters
753
public long getTransactionId() { return transactionId; }
754
public long getStartTime() { return startTime; }
755
public String getThreadName() { return threadName; }
756
public List<TransactionOperation> getOperations() { return operations; }
757
public long getEndTime() { return endTime; }
758
public void setEndTime(long endTime) { this.endTime = endTime; }
759
public boolean isCommitted() { return committed; }
760
public void setCommitted(boolean committed) { this.committed = committed; }
761
762
public long getDuration() {
763
return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - startTime;
764
}
765
}
766
767
private static class TransactionOperation {
768
private final long timestamp;
769
private final String operation;
770
private final String details;
771
772
public TransactionOperation(long timestamp, String operation, String details) {
773
this.timestamp = timestamp;
774
this.operation = operation;
775
this.details = details;
776
}
777
778
public long getTimestamp() { return timestamp; }
779
public String getOperation() { return operation; }
780
public String getDetails() { return details; }
781
}
782
}
783
784
// Usage with monitored transactions
785
public class MonitoredTransactionExample {
786
private final TransactionMonitor monitor = new TransactionMonitor();
787
788
public void executeMonitoredTransaction(Connection connection) throws SQLException {
789
Configuration config = HBaseConfiguration.create();
790
PhoenixTransactionClient txClient = TransactionFactory.getTransactionClient(config);
791
TransactionContext txContext = txClient.newTransactionContext();
792
793
try {
794
// Start monitoring
795
monitor.startMonitoring(txContext);
796
797
txClient.beginTransaction(txContext);
798
connection.setAutoCommit(false);
799
800
// Record operations
801
monitor.recordOperation(txContext.getTransactionId(), "BEGIN", "Transaction started");
802
803
try (Statement stmt = connection.createStatement()) {
804
monitor.recordOperation(txContext.getTransactionId(), "UPDATE",
805
"UPDATE accounts SET balance = balance - 100 WHERE account_id = 1");
806
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1");
807
808
monitor.recordOperation(txContext.getTransactionId(), "UPDATE",
809
"UPDATE accounts SET balance = balance + 100 WHERE account_id = 2");
810
stmt.executeUpdate("UPDATE accounts SET balance = balance + 100 WHERE account_id = 2");
811
812
monitor.recordOperation(txContext.getTransactionId(), "INSERT",
813
"INSERT INTO transaction_log (from_account, to_account, amount) VALUES (1, 2, 100)");
814
stmt.executeUpdate("INSERT INTO transaction_log (from_account, to_account, amount) VALUES (1, 2, 100)");
815
}
816
817
connection.commit();
818
txClient.commitTransaction(txContext);
819
820
monitor.recordOperation(txContext.getTransactionId(), "COMMIT", "Transaction committed");
821
monitor.stopMonitoring(txContext.getTransactionId(), true);
822
823
} catch (SQLException e) {
824
monitor.recordOperation(txContext.getTransactionId(), "ERROR", "Error: " + e.getMessage());
825
826
try {
827
connection.rollback();
828
txClient.rollbackTransaction(txContext);
829
monitor.recordOperation(txContext.getTransactionId(), "ROLLBACK", "Transaction rolled back");
830
} catch (SQLException rollbackError) {
831
monitor.recordOperation(txContext.getTransactionId(), "ROLLBACK_ERROR", "Rollback failed: " + rollbackError.getMessage());
832
}
833
834
monitor.stopMonitoring(txContext.getTransactionId(), false);
835
throw e;
836
} finally {
837
txClient.close();
838
}
839
}
840
841
public static void main(String[] args) throws SQLException {
842
MonitoredTransactionExample example = new MonitoredTransactionExample();
843
Connection connection = DriverManager.getConnection("jdbc:phoenix:localhost:2181");
844
845
try {
846
// Execute monitored transaction
847
example.executeMonitoredTransaction(connection);
848
849
// Report active transactions
850
example.monitor.reportActiveTransactions();
851
852
} finally {
853
connection.close();
854
}
855
}
856
}
857
```
858
859
This completes the comprehensive documentation for Phoenix Core's transaction support, covering transaction clients, contexts, distributed transactions, long-running operations, retry logic, and monitoring capabilities.