Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support
Phoenix provides extensive configuration options and utility classes for customizing behavior, optimizing performance, and handling common operations. The configuration system integrates with Hadoop configuration while providing Phoenix-specific settings.
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ConnectionProperty;
import org.apache.phoenix.util.*;
import org.apache.hadoop.conf.Configuration;
import java.util.Properties;Default configuration options for Phoenix query services with comprehensive tuning parameters.
public class QueryServicesOptions {
// Connection settings
public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
public static final int DEFAULT_THREAD_POOL_SIZE = 128;
public static final boolean DEFAULT_QUEUE_SIZE_BIG = false;
// Query execution settings
public static final int DEFAULT_QUERY_TIMEOUT_MS = 600000; // 10 minutes
public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000;
public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 20971520; // 20MB
// Scan settings
public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
public static final int DEFAULT_MAX_SERVER_CACHE_SIZE = 104857600; // 100MB
public static final long DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000;
// Index settings
public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true;
public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
// Statistics settings
public static final boolean DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 104857600; // 100MB
public static final boolean DEFAULT_STATS_UPDATE_FREQ_MS = 900000; // 15 minutes
// Get configuration values
public static boolean getBoolean(String name, boolean defaultValue)
public static int getInt(String name, int defaultValue)
public static long getLong(String name, long defaultValue)
public static String getString(String name, String defaultValue)
}Enumeration of Phoenix connection properties with type information and validation.
public enum ConnectionProperty {
// Core connection properties
PHOENIX_QUERY_TIMEOUT_MS("phoenix.query.timeoutMs", "Query timeout in milliseconds",
DataType.INTEGER, "600000", false),
PHOENIX_QUERY_DATE_FORMAT_TIMEZONE("phoenix.query.dateFormatTimeZone",
"Timezone for date formatting", DataType.STRING,
QueryServicesOptions.DEFAULT_DATE_FORMAT_TIMEZONE, false),
// Performance tuning
SCAN_CACHE_SIZE("ScanCacheSize", "Number of rows to cache for scans",
DataType.INTEGER, "1000", false),
AUTO_COMMIT("AutoCommit", "Auto commit transactions",
DataType.BOOLEAN, "false", false),
// Security and authentication
AUTHENTICATION("Authentication", "Authentication method",
DataType.STRING, null, false),
KERBEROS_PRINCIPAL("KerberosPrincipal", "Kerberos principal",
DataType.STRING, null, false),
// Transaction settings
TRANSACTIONAL("Transactional", "Enable transactional behavior",
DataType.BOOLEAN, "false", false),
TRANSACTION_TIMEOUT("TransactionTimeout", "Transaction timeout in seconds",
DataType.INTEGER, "300", false);
public String getPropertyName()
public String getDescription()
public DataType getDataType()
public String getDefaultValue()
public boolean isRequired()
public Object getValue(String stringValue)
public String toString(Object value)
}Usage:
// Set connection properties
Properties props = new Properties();
props.setProperty(ConnectionProperty.PHOENIX_QUERY_TIMEOUT_MS.getPropertyName(), "300000");
props.setProperty(ConnectionProperty.SCAN_CACHE_SIZE.getPropertyName(), "2000");
props.setProperty(ConnectionProperty.AUTO_COMMIT.getPropertyName(), "false");
// Create connection with properties
String url = "jdbc:phoenix:localhost:2181";
Connection connection = DriverManager.getConnection(url, props);
// Get property values from connection
PhoenixConnection phoenixConn = connection.unwrap(PhoenixConnection.class);
String timeoutValue = phoenixConn.getClientInfo(
ConnectionProperty.PHOENIX_QUERY_TIMEOUT_MS.getPropertyName()
);
String cacheSize = phoenixConn.getClientInfo(
ConnectionProperty.SCAN_CACHE_SIZE.getPropertyName()
);
System.out.println("Query timeout: " + timeoutValue + "ms");
System.out.println("Scan cache size: " + cacheSize);
// Validate property values
for (ConnectionProperty prop : ConnectionProperty.values()) {
String value = props.getProperty(prop.getPropertyName());
if (value != null) {
try {
Object validatedValue = prop.getValue(value);
System.out.println(prop.getPropertyName() + " = " + validatedValue);
} catch (IllegalArgumentException e) {
System.err.println("Invalid value for " + prop.getPropertyName() + ": " + value);
}
}
}Utility methods for byte array operations optimized for Phoenix/HBase usage.
public class ByteUtil {
// Byte array concatenation
public static byte[] concat(byte[]... arrays)
public static byte[] concat(byte[] first, byte[] second)
// Byte array comparison
public static int compare(byte[] left, byte[] right)
public static boolean equals(byte[] left, byte[] right)
// Byte array utilities
public static byte[] copyKeyBytesIfNecessary(ImmutableBytesWritable ptr)
public static int getVarCharLength(byte[] bytes, int offset, int length)
public static int calculateHashCode(ImmutableBytesWritable ptr)
// Constants
public static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public static final ImmutableBytesWritable EMPTY_BYTE_ARRAY_PTR =
new ImmutableBytesWritable(EMPTY_BYTE_ARRAY);
}Utility methods for date/time operations with timezone and format handling.
public class DateUtil {
// Date formatting
public static String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static String DEFAULT_MS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
// Date parsing and formatting
public static Date parseDate(String dateString)
public static Date parseDate(String dateString, String formatString)
public static String formatDate(Date date)
public static String formatDate(Date date, String formatString)
// Timezone handling
public static Date parseDate(String dateString, String formatString, TimeZone timeZone)
public static String formatDate(Date date, String formatString, TimeZone timeZone)
// Date arithmetic
public static Date addDays(Date date, int days)
public static Date addMonths(Date date, int months)
public static Date addYears(Date date, int years)
// Date comparison utilities
public static boolean isSameDay(Date date1, Date date2)
public static long daysBetween(Date startDate, Date endDate)
}Utility methods for schema operations including name handling and DDL generation.
public class SchemaUtil {
// Name formatting
public static String getTableName(String schemaName, String tableName)
public static String getColumnName(String familyName, String columnName)
public static String getEscapedTableName(String schemaName, String tableName)
// Name parsing
public static String getSchemaNameFromFullName(String tableName)
public static String getTableNameFromFullName(String fullTableName)
// DDL utilities
public static String getCreateTableStatement(PTable table)
public static String getDropTableStatement(PName tableName)
public static String getCreateIndexStatement(PTable index)
// Schema validation
public static void validateIdentifier(String identifier)
public static boolean isValidIdentifier(String identifier)
// Column utilities
public static String getColumnDisplayName(byte[] cf, byte[] cq)
public static byte[] getEmptyColumnFamily(PName defaultColumnFamily)
}Utility methods for query operations and SQL generation.
public class QueryUtil {
// Query construction
public static String constructSelectStatement(PTable table, List<PColumn> columns)
public static String constructUpsertStatement(PTable table, List<PColumn> columns)
public static String constructDeleteStatement(PTable table)
// Parameter handling
public static String bindParams(String query, List<Object> parameters)
public static PreparedStatement setParameters(PreparedStatement stmt, List<Object> params)
// Query analysis
public static boolean isSelect(String sql)
public static boolean isUpsert(String sql)
public static boolean isDDL(String sql)
// Result processing
public static List<String> getColumnNames(ResultSetMetaData metaData)
public static List<Object> getRowValues(ResultSet rs, ResultSetMetaData metaData)
}Client-side utility methods for connection management and error handling.
public class ClientUtil {
// Connection utilities
public static PhoenixConnection getPhoenixConnection(Connection conn) throws SQLException
public static boolean isConnectionClosed(Connection conn)
public static void closeQuietly(Connection conn)
public static void closeQuietly(Statement stmt)
public static void closeQuietly(ResultSet rs)
// Error handling
public static SQLException parseServerException(SQLException e)
public static boolean isRetryableException(SQLException e)
public static void handleRetryableException(SQLException e, int maxRetries)
// Performance utilities
public static void enableQueryLogging(Connection conn)
public static void disableQueryLogging(Connection conn)
public static QueryMetrics getQueryMetrics(Statement stmt)
}Usage:
// ByteUtil operations
byte[] array1 = Bytes.toBytes("Hello");
byte[] array2 = Bytes.toBytes(" World");
byte[] combined = ByteUtil.concat(array1, array2);
String result = Bytes.toString(combined); // "Hello World"
boolean areEqual = ByteUtil.equals(array1, array2);
int comparison = ByteUtil.compare(array1, array2);
// DateUtil operations
String dateString = "2023-12-25 14:30:00";
Date parsedDate = DateUtil.parseDate(dateString);
String formattedDate = DateUtil.formatDate(parsedDate, DateUtil.DEFAULT_MS_DATE_FORMAT);
// Add time periods
Date futureDate = DateUtil.addDays(parsedDate, 30);
Date nextMonth = DateUtil.addMonths(parsedDate, 1);
long daysDiff = DateUtil.daysBetween(parsedDate, futureDate);
System.out.println("Original date: " + formattedDate);
System.out.println("30 days later: " + DateUtil.formatDate(futureDate));
System.out.println("Days between: " + daysDiff);
// SchemaUtil operations
String fullTableName = SchemaUtil.getTableName("myschema", "users");
String escapedName = SchemaUtil.getEscapedTableName("my schema", "user table");
String schemaName = SchemaUtil.getSchemaNameFromFullName("myschema.users");
System.out.println("Full table name: " + fullTableName);
System.out.println("Escaped name: " + escapedName);
System.out.println("Schema name: " + schemaName);
// Validate identifiers
boolean isValid = SchemaUtil.isValidIdentifier("valid_name");
try {
SchemaUtil.validateIdentifier("invalid-name!");
} catch (IllegalArgumentException e) {
System.err.println("Invalid identifier: " + e.getMessage());
}
// QueryUtil operations
PTable table = connection.getTable(PNameFactory.newName("users"));
List<PColumn> columns = table.getColumns();
String selectQuery = QueryUtil.constructSelectStatement(table, columns);
String upsertQuery = QueryUtil.constructUpsertStatement(table, columns);
System.out.println("Generated SELECT: " + selectQuery);
System.out.println("Generated UPSERT: " + upsertQuery);
// ClientUtil operations
PhoenixConnection phoenixConn = ClientUtil.getPhoenixConnection(connection);
boolean isClosed = ClientUtil.isConnectionClosed(connection);
// Safe resource cleanup
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery("SELECT COUNT(*) FROM users");
// Process results...
} finally {
ClientUtil.closeQuietly(rs);
ClientUtil.closeQuietly(stmt);
}Utility for loading CSV data into Phoenix tables with flexible configuration options.
public class CSVCommonsLoader {
// Constructor
public CSVCommonsLoader(PhoenixConnection conn, String tableName,
List<ColumnInfo> columns, boolean strict)
// CSV loading
public void upsert(String csvValue) throws SQLException
public void upsert(InputStream inputStream) throws SQLException
public void upsert(Reader reader) throws SQLException
// Configuration
public void setDelimiter(char delimiter)
public void setQuoteChar(char quoteChar)
public void setEscapeChar(char escapeChar)
public void setArrayElementSeparator(String separator)
// Statistics
public long getUpsertCount()
public long getErrorCount()
}Information about columns for data loading operations with type and format specifications.
public class ColumnInfo {
// Constructor
public ColumnInfo(String columnName, int sqlType)
public ColumnInfo(String columnName, int sqlType, String columnDisplayName)
// Column properties
public String getColumnName()
public int getSqlType()
public String getColumnDisplayName()
public PDataType getDataType()
// Type-specific settings
public Integer getPrecision()
public Integer getScale()
public void setPrecision(Integer precision)
public void setScale(Integer scale)
// Array handling
public boolean isArrayType()
public String getArrayElementSeparator()
public void setArrayElementSeparator(String separator)
}Usage:
// Set up column information for CSV loading
List<ColumnInfo> columns = Arrays.asList(
new ColumnInfo("id", Types.BIGINT),
new ColumnInfo("name", Types.VARCHAR),
new ColumnInfo("email", Types.VARCHAR),
new ColumnInfo("salary", Types.DECIMAL),
new ColumnInfo("hire_date", Types.DATE),
new ColumnInfo("skills", Types.ARRAY) // Array column
);
// Configure decimal precision
ColumnInfo salaryColumn = columns.get(3);
salaryColumn.setPrecision(10);
salaryColumn.setScale(2);
// Configure array separator
ColumnInfo skillsColumn = columns.get(5);
skillsColumn.setArrayElementSeparator("|");
// Create CSV loader
PhoenixConnection phoenixConn = connection.unwrap(PhoenixConnection.class);
CSVCommonsLoader csvLoader = new CSVCommonsLoader(phoenixConn, "employees", columns, true);
// Configure CSV format
csvLoader.setDelimiter(',');
csvLoader.setQuoteChar('"');
csvLoader.setEscapeChar('\\');
// Load CSV data
String csvData = """
1,"John Doe","john@example.com",75000.50,"2023-01-15","Java|Python|SQL"
2,"Jane Smith","jane@example.com",82000.00,"2023-02-01","Python|JavaScript|React"
3,"Bob Johnson","bob@example.com",68000.75,"2023-03-10","Java|Spring|Hibernate"
""";
try (StringReader reader = new StringReader(csvData)) {
csvLoader.upsert(reader);
}
// Get loading statistics
long upsertCount = csvLoader.getUpsertCount();
long errorCount = csvLoader.getErrorCount();
System.out.println("Records loaded: " + upsertCount);
System.out.println("Errors: " + errorCount);
// Load from file
try (FileInputStream fileInput = new FileInputStream("employees.csv")) {
csvLoader.upsert(fileInput);
}// Comprehensive connection configuration
public Properties createPhoenixProperties() {
Properties props = new Properties();
// Query settings
props.setProperty(ConnectionProperty.PHOENIX_QUERY_TIMEOUT_MS.getPropertyName(), "900000"); // 15 minutes
props.setProperty(ConnectionProperty.SCAN_CACHE_SIZE.getPropertyName(), "5000");
props.setProperty(ConnectionProperty.AUTO_COMMIT.getPropertyName(), "false");
// Date/time settings
props.setProperty(ConnectionProperty.PHOENIX_QUERY_DATE_FORMAT_TIMEZONE.getPropertyName(), "UTC");
// Performance tuning
props.setProperty("phoenix.query.spoolThresholdBytes", "52428800"); // 50MB
props.setProperty("phoenix.query.maxGlobalMemoryPercentage", "20");
props.setProperty("phoenix.query.targetConcurrency", "16");
// Retry settings
props.setProperty("phoenix.query.maxRetries", "5");
props.setProperty("phoenix.query.retryInitialBackoffMs", "1000");
// Security settings (if needed)
// props.setProperty(ConnectionProperty.AUTHENTICATION.getPropertyName(), "KERBEROS");
// props.setProperty(ConnectionProperty.KERBEROS_PRINCIPAL.getPropertyName(), "user@REALM");
return props;
}
// Use configured connection
Properties props = createPhoenixProperties();
String url = "jdbc:phoenix:zk1,zk2,zk3:2181:/hbase";
Connection connection = DriverManager.getConnection(url, props);// Configure Phoenix for high-performance workloads
public class PerformanceTuningConfig {
public static void configureForBulkOperations(PhoenixConnection connection) throws SQLException {
// Increase batch sizes
connection.setClientInfo("phoenix.mutate.batchSize", "10000");
connection.setClientInfo("phoenix.query.maxGlobalMemorySize", "2147483648"); // 2GB
// Optimize scans
connection.setClientInfo("phoenix.scan.cacheSize", "10000");
connection.setClientInfo("phoenix.scan.maxResultSize", "104857600"); // 100MB
// Disable auto-commit for batch operations
connection.setAutoCommit(false);
}
public static void configureForOLTP(PhoenixConnection connection) throws SQLException {
// Optimize for small, fast queries
connection.setClientInfo("phoenix.scan.cacheSize", "100");
connection.setClientInfo("phoenix.query.timeoutMs", "30000"); // 30 seconds
// Enable auto-commit for single operations
connection.setAutoCommit(true);
}
public static void configureForAnalytics(PhoenixConnection connection) throws SQLException {
// Optimize for large analytical queries
connection.setClientInfo("phoenix.query.timeoutMs", "1800000"); // 30 minutes
connection.setClientInfo("phoenix.scan.cacheSize", "1000");
connection.setClientInfo("phoenix.query.maxGlobalMemoryPercentage", "40");
// Use larger spool threshold
connection.setClientInfo("phoenix.query.spoolThresholdBytes", "209715200"); // 200MB
}
}
// Apply performance configurations
PhoenixConnection phoenixConn = connection.unwrap(PhoenixConnection.class);
// For bulk data loading
PerformanceTuningConfig.configureForBulkOperations(phoenixConn);
// For OLTP workload
// PerformanceTuningConfig.configureForOLTP(phoenixConn);
// For analytics workload
// PerformanceTuningConfig.configureForAnalytics(phoenixConn);// Helper class for common Phoenix operations
public class PhoenixHelper {
private final PhoenixConnection connection;
public PhoenixHelper(Connection connection) throws SQLException {
this.connection = connection.unwrap(PhoenixConnection.class);
}
public void createTableIfNotExists(String tableName, String ddl) throws SQLException {
try {
PTable table = connection.getTable(PNameFactory.newName(tableName));
System.out.println("Table " + tableName + " already exists");
} catch (TableNotFoundException e) {
Statement stmt = connection.createStatement();
stmt.execute(ddl);
System.out.println("Created table: " + tableName);
ClientUtil.closeQuietly(stmt);
}
}
public long getTableRowCount(String tableName) throws SQLException {
String sql = "SELECT COUNT(*) FROM " + SchemaUtil.getEscapedTableName(null, tableName);
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return rs.getLong(1);
}
return 0;
}
}
public List<String> getTableNames(String schemaName) throws SQLException {
DatabaseMetaData metaData = connection.getMetaData();
List<String> tableNames = new ArrayList<>();
try (ResultSet rs = metaData.getTables(null, schemaName, "%", new String[]{"TABLE"})) {
while (rs.next()) {
String tableName = rs.getString("TABLE_NAME");
tableNames.add(tableName);
}
}
return tableNames;
}
public void optimizeTable(String tableName) throws SQLException {
// Update statistics for better query planning
String sql = "UPDATE STATISTICS " + SchemaUtil.getEscapedTableName(null, tableName);
try (Statement stmt = connection.createStatement()) {
stmt.execute(sql);
System.out.println("Updated statistics for table: " + tableName);
}
}
}
// Usage
PhoenixHelper helper = new PhoenixHelper(connection);
// Create table if it doesn't exist
String createTableDDL = """
CREATE TABLE IF NOT EXISTS products (
id BIGINT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10,2),
category VARCHAR(50)
)
""";
helper.createTableIfNotExists("products", createTableDDL);
// Get table information
long rowCount = helper.getTableRowCount("products");
List<String> allTables = helper.getTableNames("PUBLIC");
System.out.println("Products table has " + rowCount + " rows");
System.out.println("All tables: " + allTables);
// Optimize table
helper.optimizeTable("products");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-phoenix--phoenix-core