Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support
Phoenix provides comprehensive ACID transaction support through integration with transaction managers and distributed transaction protocols. The transaction framework enables consistent data operations across multiple tables and region servers while maintaining Phoenix's SQL semantics.
import org.apache.phoenix.transaction.*;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import java.sql.*;Interface for Phoenix transaction clients providing transaction lifecycle management.
public interface PhoenixTransactionClient {
// Transaction lifecycle
TransactionContext newTransactionContext() throws SQLException
void beginTransaction(TransactionContext context) throws SQLException
void commitTransaction(TransactionContext context) throws SQLException
void rollbackTransaction(TransactionContext context) throws SQLException
// Transaction state
boolean isTransactionRunning(TransactionContext context)
long getTransactionId(TransactionContext context)
TransactionStatus getTransactionStatus(TransactionContext context)
// Transaction configuration
void setTimeout(long timeoutInSeconds)
long getTimeout()
// Cleanup
void close() throws SQLException
}Represents a transaction context with state and metadata information.
public interface TransactionContext {
// Transaction identification
long getTransactionId()
long getStartTime()
TransactionStatus getStatus()
// Transaction properties
IsolationLevel getIsolationLevel()
long getTimeoutMs()
boolean isReadOnly()
// Transaction operations
void setReadOnly(boolean readOnly)
void setTimeout(long timeoutMs)
void addCheckpoint() throws SQLException
// Transaction metadata
Map<String, Object> getProperties()
void setProperty(String key, Object value)
}Factory for creating transaction-related objects and clients.
public class TransactionFactory {
// Transaction client creation
public static PhoenixTransactionClient getTransactionClient(Configuration config)
public static PhoenixTransactionClient getTransactionClient(ConnectionQueryServices services)
// Transaction provider detection
public static TransactionProcessor.Builder getTransactionProcessor(Configuration config,
String tableName)
public static boolean isTransactionEnabled(Configuration config)
// Transaction configuration
public static void configureTransactionManager(Configuration config, String provider)
public static String getConfiguredTransactionManager(Configuration config)
}Usage:
// Basic transaction setup
Configuration config = HBaseConfiguration.create();
config.set("phoenix.transactions.enabled", "true");
config.set("data.tx.snapshot.dir", "/tmp/phoenix-tx");
// Create transaction client
PhoenixTransactionClient txClient = TransactionFactory.getTransactionClient(config);
// Create transaction context
TransactionContext txContext = txClient.newTransactionContext();
// Configure transaction properties
txContext.setTimeout(300000); // 5 minutes
txContext.setReadOnly(false);
try {
// Begin transaction
txClient.beginTransaction(txContext);
// Perform transactional operations
Connection connection = getConnection();
connection.setAutoCommit(false);
Statement stmt = connection.createStatement();
stmt.executeUpdate("UPSERT INTO accounts (id, balance) VALUES (1, 1000)");
stmt.executeUpdate("UPSERT INTO accounts (id, balance) VALUES (2, 2000)");
// Transfer money between accounts
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE id = 1");
stmt.executeUpdate("UPDATE accounts SET balance = balance + 100 WHERE id = 2");
// Commit transaction
connection.commit();
txClient.commitTransaction(txContext);
System.out.println("Transaction completed successfully");
System.out.println("Transaction ID: " + txContext.getTransactionId());
} catch (SQLException e) {
// Rollback on error
try {
connection.rollback();
txClient.rollbackTransaction(txContext);
System.out.println("Transaction rolled back due to error: " + e.getMessage());
} catch (SQLException rollbackError) {
System.err.println("Error during rollback: " + rollbackError.getMessage());
}
throw e;
} finally {
txClient.close();
}// Create transactional tables with Phoenix DDL
public class TransactionalTableManager {
public void createTransactionalTable(Connection connection, String tableName) throws SQLException {
String createTableSQL = String.format("""
CREATE TABLE %s (
account_id BIGINT NOT NULL,
account_name VARCHAR(100),
balance DECIMAL(15,2),
last_updated TIMESTAMP,
CONSTRAINT pk PRIMARY KEY (account_id)
) TRANSACTIONAL=true
""", tableName);
try (Statement stmt = connection.createStatement()) {
stmt.execute(createTableSQL);
System.out.println("Created transactional table: " + tableName);
}
}
public void alterTableToTransactional(Connection connection, String tableName) throws SQLException {
String alterTableSQL = String.format("ALTER TABLE %s SET TRANSACTIONAL=true", tableName);
try (Statement stmt = connection.createStatement()) {
stmt.execute(alterTableSQL);
System.out.println("Altered table to transactional: " + tableName);
}
}
public boolean isTableTransactional(Connection connection, String tableName) throws SQLException {
String sql = """
SELECT TRANSACTIONAL FROM SYSTEM.CATALOG
WHERE TABLE_NAME = ? AND TABLE_TYPE = 'TABLE'
""";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, tableName.toUpperCase());
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
Boolean transactional = rs.getBoolean("TRANSACTIONAL");
return !rs.wasNull() && transactional;
}
return false;
}
}
}
// Usage
TransactionalTableManager tableManager = new TransactionalTableManager();
// Create transactional table
tableManager.createTransactionalTable(connection, "accounts");
tableManager.createTransactionalTable(connection, "transactions");
// Check if table is transactional
boolean isTransactional = tableManager.isTableTransactional(connection, "accounts");
System.out.println("Accounts table is transactional: " + isTransactional);
// Convert existing table to transactional
tableManager.alterTableToTransactional(connection, "orders");// Working with transaction isolation levels
public class TransactionIsolationManager {
public enum IsolationLevel {
SERIALIZABLE(Connection.TRANSACTION_SERIALIZABLE),
REPEATABLE_READ(Connection.TRANSACTION_REPEATABLE_READ),
READ_COMMITTED(Connection.TRANSACTION_READ_COMMITTED),
READ_UNCOMMITTED(Connection.TRANSACTION_READ_UNCOMMITTED);
private final int jdbcLevel;
IsolationLevel(int jdbcLevel) {
this.jdbcLevel = jdbcLevel;
}
public int getJdbcLevel() { return jdbcLevel; }
}
public void setTransactionIsolation(Connection connection, IsolationLevel level) throws SQLException {
connection.setTransactionIsolation(level.getJdbcLevel());
System.out.println("Set transaction isolation to: " + level);
}
public IsolationLevel getTransactionIsolation(Connection connection) throws SQLException {
int jdbcLevel = connection.getTransactionIsolation();
for (IsolationLevel level : IsolationLevel.values()) {
if (level.getJdbcLevel() == jdbcLevel) {
return level;
}
}
return IsolationLevel.READ_COMMITTED; // Default
}
public void demonstrateIsolationLevels(Connection connection) throws SQLException {
// Test different isolation levels
for (IsolationLevel level : IsolationLevel.values()) {
System.out.println("\n=== Testing Isolation Level: " + level + " ===");
setTransactionIsolation(connection, level);
connection.setAutoCommit(false);
try {
// Perform test operations
testIsolationLevel(connection, level);
connection.commit();
System.out.println("Transaction committed successfully");
} catch (SQLException e) {
connection.rollback();
System.out.println("Transaction rolled back: " + e.getMessage());
}
}
}
private void testIsolationLevel(Connection connection, IsolationLevel level) throws SQLException {
try (Statement stmt = connection.createStatement()) {
// Read initial state
ResultSet rs = stmt.executeQuery("SELECT balance FROM accounts WHERE account_id = 1");
if (rs.next()) {
BigDecimal initialBalance = rs.getBigDecimal("balance");
System.out.println("Initial balance: " + initialBalance);
}
// Perform update
int updatedRows = stmt.executeUpdate(
"UPDATE accounts SET balance = balance + 100 WHERE account_id = 1"
);
System.out.println("Updated " + updatedRows + " rows");
// Read updated state
rs = stmt.executeQuery("SELECT balance FROM accounts WHERE account_id = 1");
if (rs.next()) {
BigDecimal newBalance = rs.getBigDecimal("balance");
System.out.println("New balance: " + newBalance);
}
}
}
}
// Usage
TransactionIsolationManager isolationManager = new TransactionIsolationManager();
// Set specific isolation level
isolationManager.setTransactionIsolation(connection,
TransactionIsolationManager.IsolationLevel.SERIALIZABLE);
// Test all isolation levels
isolationManager.demonstrateIsolationLevels(connection);// Managing distributed transactions across multiple Phoenix connections
public class DistributedTransactionManager {
private final List<Connection> connections;
private final PhoenixTransactionClient txClient;
public DistributedTransactionManager(List<String> jdbcUrls) throws SQLException {
this.connections = new ArrayList<>();
for (String url : jdbcUrls) {
connections.add(DriverManager.getConnection(url));
}
Configuration config = HBaseConfiguration.create();
this.txClient = TransactionFactory.getTransactionClient(config);
}
public void executeDistributedTransaction(DistributedTransactionCallback callback) throws SQLException {
TransactionContext txContext = txClient.newTransactionContext();
boolean allConnectionsReady = false;
try {
// Begin transaction
txClient.beginTransaction(txContext);
// Prepare all connections
for (Connection conn : connections) {
conn.setAutoCommit(false);
}
allConnectionsReady = true;
// Execute business logic
callback.execute(connections, txContext);
// Commit on all connections
for (Connection conn : connections) {
conn.commit();
}
// Commit distributed transaction
txClient.commitTransaction(txContext);
System.out.println("Distributed transaction completed successfully");
} catch (Exception e) {
System.err.println("Distributed transaction failed: " + e.getMessage());
// Rollback all connections
if (allConnectionsReady) {
for (Connection conn : connections) {
try {
conn.rollback();
} catch (SQLException rollbackError) {
System.err.println("Error rolling back connection: " + rollbackError.getMessage());
}
}
}
// Rollback distributed transaction
try {
txClient.rollbackTransaction(txContext);
} catch (SQLException txRollbackError) {
System.err.println("Error rolling back transaction: " + txRollbackError.getMessage());
}
if (e instanceof SQLException) {
throw (SQLException) e;
} else {
throw new SQLException("Distributed transaction failed", e);
}
}
}
public interface DistributedTransactionCallback {
void execute(List<Connection> connections, TransactionContext txContext) throws SQLException;
}
public void close() throws SQLException {
for (Connection conn : connections) {
conn.close();
}
txClient.close();
}
}
// Usage
List<String> jdbcUrls = Arrays.asList(
"jdbc:phoenix:cluster1:2181",
"jdbc:phoenix:cluster2:2181",
"jdbc:phoenix:cluster3:2181"
);
DistributedTransactionManager dtm = new DistributedTransactionManager(jdbcUrls);
try {
dtm.executeDistributedTransaction((connections, txContext) -> {
// Business logic spanning multiple Phoenix clusters
Connection conn1 = connections.get(0);
Connection conn2 = connections.get(1);
Connection conn3 = connections.get(2);
// Update data across clusters
try (Statement stmt1 = conn1.createStatement();
Statement stmt2 = conn2.createStatement();
Statement stmt3 = conn3.createStatement()) {
stmt1.executeUpdate("UPDATE accounts SET balance = balance - 1000 WHERE account_id = 'ACC1'");
stmt2.executeUpdate("UPDATE accounts SET balance = balance + 500 WHERE account_id = 'ACC2'");
stmt3.executeUpdate("UPDATE accounts SET balance = balance + 500 WHERE account_id = 'ACC3'");
System.out.println("Distributed operations completed");
}
});
} finally {
dtm.close();
}// Managing long-running transactions with checkpointing
public class LongRunningTransactionManager {
private final PhoenixTransactionClient txClient;
private final Connection connection;
public LongRunningTransactionManager(Connection connection) throws SQLException {
this.connection = connection;
Configuration config = HBaseConfiguration.create();
this.txClient = TransactionFactory.getTransactionClient(config);
}
public void executeLongRunningTransaction(LongRunningCallback callback) throws SQLException {
TransactionContext txContext = txClient.newTransactionContext();
// Set longer timeout for long-running transaction
txContext.setTimeout(1800000); // 30 minutes
try {
txClient.beginTransaction(txContext);
connection.setAutoCommit(false);
// Execute with periodic checkpointing
callback.execute(connection, txContext, this::createCheckpoint);
// Final commit
connection.commit();
txClient.commitTransaction(txContext);
System.out.println("Long-running transaction completed successfully");
System.out.println("Transaction ID: " + txContext.getTransactionId());
System.out.println("Duration: " + (System.currentTimeMillis() - txContext.getStartTime()) + "ms");
} catch (Exception e) {
System.err.println("Long-running transaction failed: " + e.getMessage());
try {
connection.rollback();
txClient.rollbackTransaction(txContext);
} catch (SQLException rollbackError) {
System.err.println("Error during rollback: " + rollbackError.getMessage());
}
if (e instanceof SQLException) {
throw (SQLException) e;
} else {
throw new SQLException("Long-running transaction failed", e);
}
}
}
private void createCheckpoint(TransactionContext txContext) throws SQLException {
System.out.println("Creating transaction checkpoint...");
txContext.addCheckpoint();
// Log checkpoint creation
System.out.println("Checkpoint created for transaction: " + txContext.getTransactionId());
System.out.println("Elapsed time: " + (System.currentTimeMillis() - txContext.getStartTime()) + "ms");
}
public interface LongRunningCallback {
void execute(Connection connection, TransactionContext txContext,
CheckpointCallback checkpointCallback) throws SQLException;
}
public interface CheckpointCallback {
void createCheckpoint(TransactionContext txContext) throws SQLException;
}
public void close() throws SQLException {
txClient.close();
}
}
// Usage example: Batch processing with checkpoints
LongRunningTransactionManager lrtm = new LongRunningTransactionManager(connection);
try {
lrtm.executeLongRunningTransaction((conn, txContext, checkpoint) -> {
// Process large batch of records
String selectSQL = "SELECT * FROM large_table WHERE processed = false ORDER BY id";
String updateSQL = "UPDATE large_table SET processed = true, processed_date = ? WHERE id = ?";
try (Statement selectStmt = conn.createStatement();
PreparedStatement updateStmt = conn.prepareStatement(updateSQL);
ResultSet rs = selectStmt.executeQuery(selectSQL)) {
int processedCount = 0;
Timestamp processedDate = new Timestamp(System.currentTimeMillis());
while (rs.next()) {
long id = rs.getLong("id");
// Process the record (business logic)
processRecord(rs);
// Update processed status
updateStmt.setTimestamp(1, processedDate);
updateStmt.setLong(2, id);
updateStmt.executeUpdate();
processedCount++;
// Create checkpoint every 1000 records
if (processedCount % 1000 == 0) {
checkpoint.createCheckpoint(txContext);
System.out.println("Processed " + processedCount + " records");
}
}
System.out.println("Total records processed: " + processedCount);
}
});
} finally {
lrtm.close();
}// Implementing retry logic for transient transaction failures
public class TransactionRetryManager {
private final int maxRetries;
private final long baseRetryDelayMs;
private final double backoffMultiplier;
public TransactionRetryManager(int maxRetries, long baseRetryDelayMs, double backoffMultiplier) {
this.maxRetries = maxRetries;
this.baseRetryDelayMs = baseRetryDelayMs;
this.backoffMultiplier = backoffMultiplier;
}
public <T> T executeWithRetry(Connection connection, RetryableTransactionCallback<T> callback) throws SQLException {
SQLException lastException = null;
long retryDelay = baseRetryDelayMs;
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
return executeTransaction(connection, callback);
} catch (SQLException e) {
lastException = e;
if (!isRetryableException(e) || attempt == maxRetries) {
throw e;
}
System.out.println("Transaction failed (attempt " + attempt + "/" + maxRetries + "): " + e.getMessage());
System.out.println("Retrying in " + retryDelay + "ms...");
try {
Thread.sleep(retryDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new SQLException("Transaction retry interrupted", ie);
}
retryDelay = Math.round(retryDelay * backoffMultiplier);
}
}
throw new SQLException("Transaction failed after " + maxRetries + " attempts", lastException);
}
private <T> T executeTransaction(Connection connection, RetryableTransactionCallback<T> callback) throws SQLException {
connection.setAutoCommit(false);
boolean committed = false;
try {
T result = callback.execute(connection);
connection.commit();
committed = true;
return result;
} catch (SQLException e) {
if (!committed) {
try {
connection.rollback();
} catch (SQLException rollbackError) {
System.err.println("Error during rollback: " + rollbackError.getMessage());
}
}
throw e;
}
}
private boolean isRetryableException(SQLException e) {
// Check for retryable error conditions
String sqlState = e.getSQLState();
int errorCode = e.getErrorCode();
// Common retryable conditions
return sqlState != null && (
sqlState.startsWith("08") || // Connection exceptions
sqlState.equals("40001") || // Serialization failure
sqlState.equals("40P01") || // Deadlock detected
"CONNECTION_THROTTLED".equals(e.getMessage()) ||
"REGION_TOO_BUSY".equals(e.getMessage())
);
}
public interface RetryableTransactionCallback<T> {
T execute(Connection connection) throws SQLException;
}
}
// Usage
TransactionRetryManager retryManager = new TransactionRetryManager(3, 1000, 2.0);
try {
String result = retryManager.executeWithRetry(connection, (conn) -> {
// Transactional business logic that might fail transiently
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate("UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 'PROD123'");
ResultSet rs = stmt.executeQuery("SELECT quantity FROM inventory WHERE product_id = 'PROD123'");
if (rs.next()) {
int remainingQuantity = rs.getInt("quantity");
if (remainingQuantity < 0) {
throw new SQLException("Insufficient inventory");
}
stmt.executeUpdate(
"INSERT INTO order_items (order_id, product_id, quantity) VALUES ('ORD456', 'PROD123', 1)"
);
return "Order item created successfully, remaining quantity: " + remainingQuantity;
} else {
throw new SQLException("Product not found");
}
}
});
System.out.println("Transaction result: " + result);
} catch (SQLException e) {
System.err.println("Transaction failed permanently: " + e.getMessage());
}// Transaction monitoring and debugging utilities
public class TransactionMonitor {
private final Map<Long, TransactionInfo> activeTransactions = new ConcurrentHashMap<>();
public void startMonitoring(TransactionContext txContext) {
TransactionInfo info = new TransactionInfo(
txContext.getTransactionId(),
System.currentTimeMillis(),
Thread.currentThread().getName()
);
activeTransactions.put(txContext.getTransactionId(), info);
System.out.println("Started monitoring transaction: " + txContext.getTransactionId());
}
public void recordOperation(long transactionId, String operation, String details) {
TransactionInfo info = activeTransactions.get(transactionId);
if (info != null) {
info.addOperation(operation, details);
}
}
public void stopMonitoring(long transactionId, boolean committed) {
TransactionInfo info = activeTransactions.remove(transactionId);
if (info != null) {
info.setEndTime(System.currentTimeMillis());
info.setCommitted(committed);
// Log transaction summary
System.out.println("\n=== Transaction Summary ===");
System.out.println("Transaction ID: " + transactionId);
System.out.println("Thread: " + info.getThreadName());
System.out.println("Duration: " + info.getDuration() + "ms");
System.out.println("Status: " + (committed ? "COMMITTED" : "ROLLED BACK"));
System.out.println("Operations: " + info.getOperations().size());
for (TransactionOperation op : info.getOperations()) {
System.out.println(" " + op.getTimestamp() + " - " + op.getOperation() + ": " + op.getDetails());
}
System.out.println("=========================\n");
}
}
public void reportActiveTransactions() {
System.out.println("\n=== Active Transactions ===");
long currentTime = System.currentTimeMillis();
if (activeTransactions.isEmpty()) {
System.out.println("No active transactions");
} else {
for (TransactionInfo info : activeTransactions.values()) {
long duration = currentTime - info.getStartTime();
System.out.println("Transaction ID: " + info.getTransactionId());
System.out.println(" Thread: " + info.getThreadName());
System.out.println(" Duration: " + duration + "ms");
System.out.println(" Operations: " + info.getOperations().size());
}
}
System.out.println("==========================\n");
}
// Supporting classes
private static class TransactionInfo {
private final long transactionId;
private final long startTime;
private final String threadName;
private final List<TransactionOperation> operations = new ArrayList<>();
private long endTime;
private boolean committed;
public TransactionInfo(long transactionId, long startTime, String threadName) {
this.transactionId = transactionId;
this.startTime = startTime;
this.threadName = threadName;
}
public void addOperation(String operation, String details) {
operations.add(new TransactionOperation(System.currentTimeMillis(), operation, details));
}
// Getters and setters
public long getTransactionId() { return transactionId; }
public long getStartTime() { return startTime; }
public String getThreadName() { return threadName; }
public List<TransactionOperation> getOperations() { return operations; }
public long getEndTime() { return endTime; }
public void setEndTime(long endTime) { this.endTime = endTime; }
public boolean isCommitted() { return committed; }
public void setCommitted(boolean committed) { this.committed = committed; }
public long getDuration() {
return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - startTime;
}
}
private static class TransactionOperation {
private final long timestamp;
private final String operation;
private final String details;
public TransactionOperation(long timestamp, String operation, String details) {
this.timestamp = timestamp;
this.operation = operation;
this.details = details;
}
public long getTimestamp() { return timestamp; }
public String getOperation() { return operation; }
public String getDetails() { return details; }
}
}
// Usage with monitored transactions
public class MonitoredTransactionExample {
private final TransactionMonitor monitor = new TransactionMonitor();
public void executeMonitoredTransaction(Connection connection) throws SQLException {
Configuration config = HBaseConfiguration.create();
PhoenixTransactionClient txClient = TransactionFactory.getTransactionClient(config);
TransactionContext txContext = txClient.newTransactionContext();
try {
// Start monitoring
monitor.startMonitoring(txContext);
txClient.beginTransaction(txContext);
connection.setAutoCommit(false);
// Record operations
monitor.recordOperation(txContext.getTransactionId(), "BEGIN", "Transaction started");
try (Statement stmt = connection.createStatement()) {
monitor.recordOperation(txContext.getTransactionId(), "UPDATE",
"UPDATE accounts SET balance = balance - 100 WHERE account_id = 1");
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1");
monitor.recordOperation(txContext.getTransactionId(), "UPDATE",
"UPDATE accounts SET balance = balance + 100 WHERE account_id = 2");
stmt.executeUpdate("UPDATE accounts SET balance = balance + 100 WHERE account_id = 2");
monitor.recordOperation(txContext.getTransactionId(), "INSERT",
"INSERT INTO transaction_log (from_account, to_account, amount) VALUES (1, 2, 100)");
stmt.executeUpdate("INSERT INTO transaction_log (from_account, to_account, amount) VALUES (1, 2, 100)");
}
connection.commit();
txClient.commitTransaction(txContext);
monitor.recordOperation(txContext.getTransactionId(), "COMMIT", "Transaction committed");
monitor.stopMonitoring(txContext.getTransactionId(), true);
} catch (SQLException e) {
monitor.recordOperation(txContext.getTransactionId(), "ERROR", "Error: " + e.getMessage());
try {
connection.rollback();
txClient.rollbackTransaction(txContext);
monitor.recordOperation(txContext.getTransactionId(), "ROLLBACK", "Transaction rolled back");
} catch (SQLException rollbackError) {
monitor.recordOperation(txContext.getTransactionId(), "ROLLBACK_ERROR", "Rollback failed: " + rollbackError.getMessage());
}
monitor.stopMonitoring(txContext.getTransactionId(), false);
throw e;
} finally {
txClient.close();
}
}
public static void main(String[] args) throws SQLException {
MonitoredTransactionExample example = new MonitoredTransactionExample();
Connection connection = DriverManager.getConnection("jdbc:phoenix:localhost:2181");
try {
// Execute monitored transaction
example.executeMonitoredTransaction(connection);
// Report active transactions
example.monitor.reportActiveTransactions();
} finally {
connection.close();
}
}
}This completes the comprehensive documentation for Phoenix Core's transaction support, covering transaction clients, contexts, distributed transactions, long-running operations, retry logic, and monitoring capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-phoenix--phoenix-core