Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support
Phoenix provides comprehensive monitoring and metrics collection capabilities to track query performance, resource usage, and system health. The monitoring framework enables detailed analysis of Phoenix operations and supports performance optimization efforts.
import org.apache.phoenix.monitoring.*;
import org.apache.phoenix.jdbc.*;
import java.util.Map;
import java.util.concurrent.TimeUnit;Enumeration of all Phoenix metric types with detailed performance and resource monitoring.
public enum MetricType {
// Query performance metrics
QUERY_TIME("Query execution time", true),
QUERY_TIMEOUT("Query timeout occurrences", false),
QUERY_FAILED("Failed query count", false),
QUERY_EXECUTED("Executed query count", false),
// Scan metrics
SCAN_BYTES("Bytes scanned", true),
SCAN_ROWS("Rows scanned", true),
SCAN_TIME("Scan execution time", true),
// Memory metrics
MEMORY_CHUNK_BYTES("Memory chunk size in bytes", true),
MEMORY_WAIT_TIME("Memory wait time", true),
SPOOL_FILE_SIZE("Spool file size", true),
SPOOL_FILE_COUNTER("Number of spool files", false),
// Network and RPC metrics
BYTES_RECEIVED("Bytes received over network", true),
BYTES_SENT("Bytes sent over network", true),
RPC_TIME("RPC call time", true),
RPC_CALLS("Number of RPC calls", false),
// Cache metrics
CACHE_REFRESH_SPLITS_COUNTER("Cache refresh splits", false),
METADATA_CACHE_MISS_COUNTER("Metadata cache misses", false),
METADATA_CACHE_HIT_COUNTER("Metadata cache hits", false),
// Mutation metrics
MUTATION_BATCH_SIZE("Mutation batch size", true),
MUTATION_COMMIT_TIME("Mutation commit time", true),
MUTATION_BYTES("Mutation data size", true),
// Connection metrics
OPEN_PHOENIX_CONNECTIONS("Open Phoenix connections", false),
QUERY_SERVICES_COUNTER("Query services operations", false),
HCONNECTIONS_COUNTER("HBase connections", false);
private final String description;
private final boolean isTimeMetric;
MetricType(String description, boolean isTimeMetric) {
this.description = description;
this.isTimeMetric = isTimeMetric;
}
public String getDescription()
public String getMetricName()
public boolean isTimer()
public boolean isHistogram()
public boolean isCounter()
}Usage:
// Access metric information
for (MetricType metricType : MetricType.values()) {
System.out.println("Metric: " + metricType.getMetricName());
System.out.println("Description: " + metricType.getDescription());
System.out.println("Is Timer: " + metricType.isTimer());
System.out.println("Is Counter: " + metricType.isCounter());
System.out.println();
}
// Check specific metric properties
MetricType queryTime = MetricType.QUERY_TIME;
if (queryTime.isTimer()) {
System.out.println("Query time is measured as a timer metric");
}
MetricType queryCount = MetricType.QUERY_EXECUTED;
if (queryCount.isCounter()) {
System.out.println("Query executed is a counter metric");
}Global client-side metrics collection providing system-wide monitoring.
public class GlobalClientMetrics {
// Global metric instances
public static final GlobalMetric GLOBAL_QUERY_TIME =
GlobalClientMetrics.getInstance(MetricType.QUERY_TIME);
public static final GlobalMetric GLOBAL_SCAN_BYTES =
GlobalClientMetrics.getInstance(MetricType.SCAN_BYTES);
public static final GlobalMetric GLOBAL_MEMORY_CHUNK_BYTES =
GlobalClientMetrics.getInstance(MetricType.MEMORY_CHUNK_BYTES);
public static final GlobalMetric GLOBAL_MUTATION_BATCH_SIZE =
GlobalClientMetrics.getInstance(MetricType.MUTATION_BATCH_SIZE);
// Metric factory methods
public static GlobalMetric getInstance(MetricType metricType)
public static boolean isMetricsEnabled()
public static void setMetricsEnabled(boolean enabled)
// Metric collection methods
public static Map<MetricType, Long> getGlobalMetrics()
public static long getGlobalMetricValue(MetricType metricType)
public static void resetGlobalMetrics()
// Metric aggregation
public static Map<String, Object> getMetricsSummary()
public static String getMetricsReport()
}Individual global metric implementation with thread-safe operations.
public class GlobalMetric {
// Metric operations
public void increment()
public void increment(long delta)
public void update(long value)
public void decrement()
public void decrement(long delta)
// Metric queries
public long getValue()
public long getCount()
public double getMean()
public long getMax()
public long getMin()
// Reset operations
public void reset()
public MetricType getMetricType()
}Usage:
// Enable global metrics collection
GlobalClientMetrics.setMetricsEnabled(true);
// Use global metrics during operations
Connection connection = DriverManager.getConnection(url);
Statement stmt = connection.createStatement();
// Query execution automatically updates metrics
long startTime = System.currentTimeMillis();
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM large_table");
// Access global metrics
long totalQueryTime = GlobalClientMetrics.GLOBAL_QUERY_TIME.getValue();
long totalScanBytes = GlobalClientMetrics.GLOBAL_SCAN_BYTES.getValue();
System.out.println("Total query time across all queries: " + totalQueryTime + "ms");
System.out.println("Total bytes scanned: " + totalScanBytes);
// Get comprehensive metrics summary
Map<String, Object> summary = GlobalClientMetrics.getMetricsSummary();
for (Map.Entry<String, Object> entry : summary.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
// Generate metrics report
String report = GlobalClientMetrics.getMetricsReport();
System.out.println("=== Phoenix Metrics Report ===");
System.out.println(report);
// Reset metrics for new measurement period
GlobalClientMetrics.resetGlobalMetrics();Manages metrics collection for individual Phoenix tables.
public class TableMetricsManager {
// Singleton access
public static TableMetricsManager getInstance()
// Table metrics management
public TableClientMetrics getTableMetrics(String tableName)
public void updateTableMetrics(String tableName, MetricType metricType, long value)
public void resetTableMetrics(String tableName)
public void removeTableMetrics(String tableName)
// Bulk operations
public Map<String, TableClientMetrics> getAllTableMetrics()
public void resetAllTableMetrics()
public int getTrackedTableCount()
// Configuration
public void setMaxTrackedTables(int maxTables)
public boolean isTableMetricsEnabled()
public void setTableMetricsEnabled(boolean enabled)
}Client-side metrics for individual Phoenix tables.
public class TableClientMetrics {
// Constructor
public TableClientMetrics(String tableName)
// Metric operations
public void incrementMetric(MetricType metricType)
public void incrementMetric(MetricType metricType, long delta)
public void updateMetric(MetricType metricType, long value)
// Metric queries
public long getMetricValue(MetricType metricType)
public Map<MetricType, Long> getAllMetrics()
public String getTableName()
// Statistics
public long getTotalQueryTime()
public long getTotalScanBytes()
public long getQueryCount()
public double getAverageQueryTime()
// Reset operations
public void reset()
public void resetMetric(MetricType metricType)
}Usage:
// Enable table-level metrics
TableMetricsManager manager = TableMetricsManager.getInstance();
manager.setTableMetricsEnabled(true);
manager.setMaxTrackedTables(100);
// Execute queries on specific tables
String tableName = "users";
Connection connection = DriverManager.getConnection(url);
// Table metrics are automatically collected during query execution
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName + " WHERE age > 25");
// Process results...
while (rs.next()) {
// Result processing
}
// Access table-specific metrics
TableClientMetrics tableMetrics = manager.getTableMetrics(tableName);
long queryTime = tableMetrics.getMetricValue(MetricType.QUERY_TIME);
long scanBytes = tableMetrics.getMetricValue(MetricType.SCAN_BYTES);
long queryCount = tableMetrics.getQueryCount();
double avgQueryTime = tableMetrics.getAverageQueryTime();
System.out.println("Table: " + tableName);
System.out.println("Total query time: " + queryTime + "ms");
System.out.println("Total scan bytes: " + scanBytes);
System.out.println("Query count: " + queryCount);
System.out.println("Average query time: " + String.format("%.2f", avgQueryTime) + "ms");
// Get metrics for all tables
Map<String, TableClientMetrics> allMetrics = manager.getAllTableMetrics();
for (Map.Entry<String, TableClientMetrics> entry : allMetrics.entrySet()) {
String table = entry.getKey();
TableClientMetrics metrics = entry.getValue();
System.out.println("\n=== Metrics for table: " + table + " ===");
Map<MetricType, Long> tableMetricValues = metrics.getAllMetrics();
for (Map.Entry<MetricType, Long> metricEntry : tableMetricValues.entrySet()) {
System.out.println(metricEntry.getKey().getMetricName() + ": " + metricEntry.getValue());
}
}
// Reset metrics for specific table
manager.resetTableMetrics(tableName);Connection interface providing access to metrics and monitoring information.
public interface PhoenixMonitoredConnection extends Connection {
// Metrics access
ReadMetricQueue getReadMetricsQueue()
WriteMetricQueue getWriteMetricsQueue()
OverAllQueryMetrics getOverallQueryMetrics()
// Query monitoring
QueryLogger getQueryLogger()
void enableQueryLogging(boolean enable)
boolean isQueryLoggingEnabled()
// Connection metrics
ConnectionMetrics getConnectionMetrics()
}Queue for collecting read operation metrics during query execution.
public class ReadMetricQueue {
// Queue operations
public void addMetric(ReadMetric metric)
public ReadMetric pollMetric()
public List<ReadMetric> drainMetrics()
// Queue state
public int size()
public boolean isEmpty()
public void clear()
// Aggregated metrics
public long getTotalReadTime()
public long getTotalBytesRead()
public long getTotalRowsRead()
public int getReadOperationCount()
}Queue for collecting write operation metrics during mutation execution.
public class WriteMetricQueue {
// Queue operations
public void addMetric(WriteMetric metric)
public WriteMetric pollMetric()
public List<WriteMetric> drainMetrics()
// Queue state
public int size()
public boolean isEmpty()
public void clear()
// Aggregated metrics
public long getTotalWriteTime()
public long getTotalBytesWritten()
public long getTotalRowsWritten()
public int getWriteOperationCount()
}Usage:
// Access query-level monitoring
PhoenixConnection phoenixConn = connection.unwrap(PhoenixConnection.class);
PhoenixMonitoredConnection monitoredConn = (PhoenixMonitoredConnection) phoenixConn;
// Enable query logging
monitoredConn.enableQueryLogging(true);
// Execute monitored query
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM orders WHERE order_date > '2023-01-01'");
// Process results
int rowCount = 0;
while (rs.next()) {
rowCount++;
}
// Access read metrics
ReadMetricQueue readMetrics = monitoredConn.getReadMetricsQueue();
System.out.println("Read operations: " + readMetrics.getReadOperationCount());
System.out.println("Total read time: " + readMetrics.getTotalReadTime() + "ms");
System.out.println("Total bytes read: " + readMetrics.getTotalBytesRead());
System.out.println("Total rows read: " + readMetrics.getTotalRowsRead());
// Drain metrics for processing
List<ReadMetric> allReadMetrics = readMetrics.drainMetrics();
for (ReadMetric metric : allReadMetrics) {
System.out.println("Read operation: " + metric.toString());
}
// For write operations
PreparedStatement pstmt = connection.prepareStatement("UPSERT INTO orders VALUES (?, ?, ?)");
for (int i = 0; i < 1000; i++) {
pstmt.setLong(1, i);
pstmt.setString(2, "Product " + i);
pstmt.setBigDecimal(3, new BigDecimal(Math.random() * 1000));
pstmt.executeUpdate();
}
connection.commit();
// Access write metrics
WriteMetricQueue writeMetrics = monitoredConn.getWriteMetricsQueue();
System.out.println("Write operations: " + writeMetrics.getWriteOperationCount());
System.out.println("Total write time: " + writeMetrics.getTotalWriteTime() + "ms");
System.out.println("Total bytes written: " + writeMetrics.getTotalBytesWritten());
System.out.println("Total rows written: " + writeMetrics.getTotalRowsWritten());Custom metrics collection and reporting system.
// Custom metrics collector for Phoenix operations
public class PhoenixMetricsCollector {
private final Map<String, QueryMetrics> queryMetrics = new ConcurrentHashMap<>();
private final TableMetricsManager tableManager = TableMetricsManager.getInstance();
private final ExecutorService reportingExecutor = Executors.newSingleThreadExecutor();
public void startMetricsCollection() {
// Enable all metrics
GlobalClientMetrics.setMetricsEnabled(true);
tableManager.setTableMetricsEnabled(true);
// Start periodic reporting
reportingExecutor.submit(this::periodicReporting);
}
public void recordQueryStart(String queryId, String sql, String tableName) {
QueryMetrics metrics = new QueryMetrics(queryId, sql, tableName);
metrics.setStartTime(System.currentTimeMillis());
queryMetrics.put(queryId, metrics);
}
public void recordQueryEnd(String queryId, int rowCount, boolean success) {
QueryMetrics metrics = queryMetrics.get(queryId);
if (metrics != null) {
metrics.setEndTime(System.currentTimeMillis());
metrics.setRowCount(rowCount);
metrics.setSuccess(success);
// Update table metrics
if (success) {
String tableName = metrics.getTableName();
long executionTime = metrics.getExecutionTime();
tableManager.updateTableMetrics(tableName, MetricType.QUERY_TIME, executionTime);
tableManager.updateTableMetrics(tableName, MetricType.QUERY_EXECUTED, 1);
} else {
tableManager.updateTableMetrics(metrics.getTableName(), MetricType.QUERY_FAILED, 1);
}
}
}
public void recordSlowQuery(String queryId, long thresholdMs) {
QueryMetrics metrics = queryMetrics.get(queryId);
if (metrics != null && metrics.getExecutionTime() > thresholdMs) {
System.out.println("SLOW QUERY DETECTED:");
System.out.println("Query ID: " + queryId);
System.out.println("Execution time: " + metrics.getExecutionTime() + "ms");
System.out.println("SQL: " + metrics.getSql());
System.out.println("Table: " + metrics.getTableName());
}
}
public MetricsSummary generateSummary() {
MetricsSummary summary = new MetricsSummary();
// Global metrics
Map<MetricType, Long> globalMetrics = GlobalClientMetrics.getGlobalMetrics();
summary.setGlobalMetrics(globalMetrics);
// Table metrics
Map<String, TableClientMetrics> allTableMetrics = tableManager.getAllTableMetrics();
summary.setTableMetrics(allTableMetrics);
// Query metrics
summary.setQueryMetrics(new HashMap<>(queryMetrics));
return summary;
}
private void periodicReporting() {
try {
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(60000); // Report every minute
MetricsSummary summary = generateSummary();
generateReport(summary);
// Clean up old query metrics (keep last hour)
cleanupOldMetrics();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void generateReport(MetricsSummary summary) {
System.out.println("\n=== Phoenix Metrics Report ===");
System.out.println("Report time: " + new Date());
// Global metrics report
Map<MetricType, Long> globalMetrics = summary.getGlobalMetrics();
System.out.println("\n--- Global Metrics ---");
for (Map.Entry<MetricType, Long> entry : globalMetrics.entrySet()) {
if (entry.getValue() > 0) {
System.out.println(entry.getKey().getDescription() + ": " + entry.getValue());
}
}
// Table metrics report
Map<String, TableClientMetrics> tableMetrics = summary.getTableMetrics();
System.out.println("\n--- Table Metrics ---");
for (Map.Entry<String, TableClientMetrics> entry : tableMetrics.entrySet()) {
String tableName = entry.getKey();
TableClientMetrics metrics = entry.getValue();
System.out.println("Table: " + tableName);
System.out.println(" Query count: " + metrics.getQueryCount());
System.out.println(" Average query time: " +
String.format("%.2f", metrics.getAverageQueryTime()) + "ms");
System.out.println(" Total scan bytes: " +
formatBytes(metrics.getMetricValue(MetricType.SCAN_BYTES)));
}
// Recent slow queries
System.out.println("\n--- Recent Slow Queries (>5s) ---");
long currentTime = System.currentTimeMillis();
int slowQueryCount = 0;
for (QueryMetrics queryMetrics : summary.getQueryMetrics().values()) {
if (queryMetrics.getExecutionTime() > 5000 &&
(currentTime - queryMetrics.getStartTime()) < 300000) { // Last 5 minutes
System.out.println("Query: " + queryMetrics.getQueryId());
System.out.println(" Time: " + queryMetrics.getExecutionTime() + "ms");
System.out.println(" Table: " + queryMetrics.getTableName());
System.out.println(" Rows: " + queryMetrics.getRowCount());
slowQueryCount++;
}
}
if (slowQueryCount == 0) {
System.out.println("No slow queries detected in the last 5 minutes.");
}
System.out.println("=== End Report ===\n");
}
private void cleanupOldMetrics() {
long cutoffTime = System.currentTimeMillis() - 3600000; // 1 hour ago
queryMetrics.entrySet().removeIf(entry ->
entry.getValue().getStartTime() < cutoffTime
);
}
private String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B";
if (bytes < 1024 * 1024) return String.format("%.2f KB", bytes / 1024.0);
if (bytes < 1024 * 1024 * 1024) return String.format("%.2f MB", bytes / (1024.0 * 1024.0));
return String.format("%.2f GB", bytes / (1024.0 * 1024.0 * 1024.0));
}
public void shutdown() {
reportingExecutor.shutdown();
try {
if (!reportingExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
reportingExecutor.shutdownNow();
}
} catch (InterruptedException e) {
reportingExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// Supporting classes
public class QueryMetrics {
private final String queryId;
private final String sql;
private final String tableName;
private long startTime;
private long endTime;
private int rowCount;
private boolean success;
public QueryMetrics(String queryId, String sql, String tableName) {
this.queryId = queryId;
this.sql = sql;
this.tableName = tableName;
}
public long getExecutionTime() {
return endTime - startTime;
}
// Getters and setters
public String getQueryId() { return queryId; }
public String getSql() { return sql; }
public String getTableName() { return tableName; }
public long getStartTime() { return startTime; }
public void setStartTime(long startTime) { this.startTime = startTime; }
public long getEndTime() { return endTime; }
public void setEndTime(long endTime) { this.endTime = endTime; }
public int getRowCount() { return rowCount; }
public void setRowCount(int rowCount) { this.rowCount = rowCount; }
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
}
public class MetricsSummary {
private Map<MetricType, Long> globalMetrics;
private Map<String, TableClientMetrics> tableMetrics;
private Map<String, QueryMetrics> queryMetrics;
// Getters and setters
public Map<MetricType, Long> getGlobalMetrics() { return globalMetrics; }
public void setGlobalMetrics(Map<MetricType, Long> globalMetrics) { this.globalMetrics = globalMetrics; }
public Map<String, TableClientMetrics> getTableMetrics() { return tableMetrics; }
public void setTableMetrics(Map<String, TableClientMetrics> tableMetrics) { this.tableMetrics = tableMetrics; }
public Map<String, QueryMetrics> getQueryMetrics() { return queryMetrics; }
public void setQueryMetrics(Map<String, QueryMetrics> queryMetrics) { this.queryMetrics = queryMetrics; }
}// Integration with Phoenix application
public class MonitoredPhoenixApplication {
private final PhoenixMetricsCollector metricsCollector = new PhoenixMetricsCollector();
private final Connection connection;
public MonitoredPhoenixApplication(String phoenixUrl) throws SQLException {
this.connection = DriverManager.getConnection(phoenixUrl);
metricsCollector.startMetricsCollection();
}
public void executeQuery(String sql, String tableName) throws SQLException {
String queryId = UUID.randomUUID().toString();
// Record query start
metricsCollector.recordQueryStart(queryId, sql, tableName);
try {
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql);
int rowCount = 0;
while (rs.next()) {
rowCount++;
// Process row
}
// Record successful completion
metricsCollector.recordQueryEnd(queryId, rowCount, true);
// Check for slow queries
metricsCollector.recordSlowQuery(queryId, 1000); // 1 second threshold
rs.close();
stmt.close();
} catch (SQLException e) {
// Record failed query
metricsCollector.recordQueryEnd(queryId, 0, false);
throw e;
}
}
public void generateMetricsReport() {
MetricsSummary summary = metricsCollector.generateSummary();
// Process summary as needed
}
public void close() throws SQLException {
metricsCollector.shutdown();
connection.close();
}
public static void main(String[] args) throws SQLException {
MonitoredPhoenixApplication app = new MonitoredPhoenixApplication("jdbc:phoenix:localhost:2181");
try {
// Execute various queries
app.executeQuery("SELECT COUNT(*) FROM users", "users");
app.executeQuery("SELECT * FROM orders WHERE order_date > '2023-01-01'", "orders");
app.executeQuery("SELECT customer_id, SUM(amount) FROM transactions GROUP BY customer_id", "transactions");
// Generate report
app.generateMetricsReport();
} finally {
app.close();
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-phoenix--phoenix-core