Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support
Phoenix's query execution framework provides optimized query processing with support for parallel execution, result iteration, mutation state management, and distributed operations. The execution system leverages HBase's distributed architecture while providing SQL semantics.
import org.apache.phoenix.execute.*;
import org.apache.phoenix.iterate.*;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.schema.tuple.Tuple;Manages mutation state for Phoenix transactions, batching mutations for efficient execution.
public class MutationState implements SQLCloseable {
// Constructor
public MutationState(int maxSize, int maxSizeBytes, PhoenixConnection connection)
// Mutation operations
public void addMutation(PName tableName, Mutation mutation) throws SQLException
public void addMutations(PName tableName, List<Mutation> mutations) throws SQLException
// Batch management
public void send() throws SQLException
public void commit() throws SQLException
public void rollback() throws SQLException
// State information
public int getUpdateCount()
public boolean hasUncommittedData()
public long getEstimatedSize()
public int getMaxSize()
// Transaction management
public void startTransaction() throws SQLException
public void join(MutationState newMutation) throws SQLException
public MutationState newMutationState(int maxSize, int maxSizeBytes)
}Usage:
PhoenixConnection connection = getPhoenixConnection();
MutationState mutationState = connection.getMutationState();
// Add individual mutations
PName tableName = PNameFactory.newName("users");
Put userPut = new Put(Bytes.toBytes("user123"));
userPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("John Doe"));
mutationState.addMutation(tableName, userPut);
// Add batch of mutations
List<Mutation> batch = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Put put = new Put(Bytes.toBytes("user" + i));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("User " + i));
batch.add(put);
}
mutationState.addMutations(tableName, batch);
// Check state before commit
boolean hasData = mutationState.hasUncommittedData();
long estimatedSize = mutationState.getEstimatedSize();
int updateCount = mutationState.getUpdateCount();
System.out.println("Has uncommitted data: " + hasData);
System.out.println("Estimated size: " + estimatedSize + " bytes");
System.out.println("Update count: " + updateCount);
// Commit mutations
mutationState.commit();Base interface for query execution plans providing iteration and metadata.
public interface QueryPlan {
// Plan execution
ResultIterator iterator() throws SQLException
ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException
ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException
// Plan information
StatementContext getContext()
ParameterMetaData getParameterMetaData()
ExplainPlan getExplainPlan() throws SQLException
// Cost estimation
long getEstimatedSize()
Cost getCost()
long getEstimatedRowsToScan()
Long getEstimatedBytesToScan()
// Plan properties
Operation getOperation()
boolean useRoundRobinIterator()
}Abstract base implementation of QueryPlan with common functionality.
public abstract class BaseQueryPlan implements QueryPlan {
protected final StatementContext context;
protected final FilterableStatement statement;
protected final TableRef tableRef;
protected final RowProjector projector;
// Common plan operations
public StatementContext getContext()
public ParameterMetaData getParameterMetaData()
public Cost getCost()
public Operation getOperation()
// Abstract methods for subclasses
public abstract ResultIterator iterator() throws SQLException
public abstract ExplainPlan getExplainPlan() throws SQLException
}Usage:
// Get query plan from compiled statement
QueryCompiler compiler = new QueryCompiler();
String sql = "SELECT id, name, salary FROM employees WHERE department = ? ORDER BY salary DESC";
QueryPlan plan = compiler.compile(sql, context);
// Examine plan properties
long estimatedSize = plan.getEstimatedSize();
long estimatedRows = plan.getEstimatedRowsToScan();
Cost cost = plan.getCost();
Operation operation = plan.getOperation();
System.out.println("Estimated size: " + estimatedSize + " bytes");
System.out.println("Estimated rows: " + estimatedRows);
System.out.println("Operation: " + operation);
// Get explain plan
ExplainPlan explainPlan = plan.getExplainPlan();
System.out.println("Query plan:");
for (String step : explainPlan.getPlanSteps()) {
System.out.println(" " + step);
}
// Execute plan
try (ResultIterator iterator = plan.iterator()) {
Tuple tuple;
while ((tuple = iterator.next()) != null) {
// Process tuple
processTuple(tuple);
}
}Interface for iterating over query results with support for peeking and closing.
public interface ResultIterator extends SQLCloseable {
// Iteration methods
Tuple next() throws SQLException
Tuple peek() throws SQLException
// Iterator state
void close() throws SQLException
ExplainPlan getExplainPlan() throws SQLException
// Aggregation support
Aggregators getAggregators()
}Interface for iterating over table scan results with additional scan information.
public interface TableResultIterator extends ResultIterator {
// Scan information
void initScanner() throws SQLException
Scan getScan()
void setScan(Scan scan)
// Region information
HRegionLocation getRegionLocation()
long getReadMetricQueue()
long getOverAllQueryMetrics()
}Abstract base implementation providing common iterator functionality.
public abstract class BaseResultIterator implements ResultIterator {
// Common iterator operations
public Tuple peek() throws SQLException
public void close() throws SQLException
public ExplainPlan getExplainPlan() throws SQLException
// Abstract methods
public abstract Tuple next() throws SQLException
}Usage:
// Basic result iteration
QueryPlan plan = getQueryPlan();
try (ResultIterator iterator = plan.iterator()) {
Tuple tuple;
int rowCount = 0;
while ((tuple = iterator.next()) != null) {
rowCount++;
// Access column values from tuple
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
// Get first column value
tuple.getKey(ptr, 0);
Object value = PVarchar.INSTANCE.toObject(ptr);
System.out.println("Column 0: " + value);
// Peek at next tuple without consuming it
Tuple nextTuple = iterator.peek();
if (nextTuple != null) {
System.out.println("Next tuple available");
}
}
System.out.println("Processed " + rowCount + " rows");
}
// Table scan iteration with scan details
if (iterator instanceof TableResultIterator) {
TableResultIterator tableIter = (TableResultIterator) iterator;
Scan scan = tableIter.getScan();
HRegionLocation region = tableIter.getRegionLocation();
System.out.println("Scan start row: " + Bytes.toString(scan.getStartRow()));
System.out.println("Scan stop row: " + Bytes.toString(scan.getStopRow()));
System.out.println("Region: " + region.getRegionInfo().getRegionNameAsString());
}Factory for creating parallel result iterators for distributed query execution.
public interface ParallelIteratorFactory {
// Iterator creation
ResultIterator newIterator(StatementContext context, ResultIterator iterator,
Scan scan, String tableName) throws SQLException
// Parallel execution support
List<PeekingResultIterator> getIterators() throws SQLException
void submitWork(Callable<Boolean> callable) throws SQLException
}Usage:
// Create parallel iterators for distributed execution
ParallelIteratorFactory factory = getParallelIteratorFactory();
List<PeekingResultIterator> parallelIterators = factory.getIterators();
System.out.println("Created " + parallelIterators.size() + " parallel iterators");
// Process results from multiple iterators
ExecutorService executor = Executors.newFixedThreadPool(parallelIterators.size());
List<Future<Integer>> futures = new ArrayList<>();
for (PeekingResultIterator iter : parallelIterators) {
Future<Integer> future = executor.submit(() -> {
int count = 0;
try (ResultIterator iterator = iter) {
Tuple tuple;
while ((tuple = iterator.next()) != null) {
count++;
// Process tuple
}
}
return count;
});
futures.add(future);
}
// Collect results
int totalRows = 0;
for (Future<Integer> future : futures) {
totalRows += future.get();
}
System.out.println("Total rows processed: " + totalRows);
executor.shutdown();Interface representing a row of data with key-value access methods.
public interface Tuple {
// Key access
void getKey(ImmutableBytesWritable ptr)
void getKey(ImmutableBytesWritable ptr, int position)
// Value access
boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)
Cell getValue(byte[] family, byte[] qualifier)
// Tuple properties
boolean isImmutable()
int size()
KeyValue getValue(int index)
// Comparison
int compareTo(Tuple other)
}Implementation of Tuple backed by HBase KeyValue objects.
public class KeyValueTuple implements Tuple {
public KeyValueTuple(KeyValue keyValue)
public KeyValueTuple(List<KeyValue> keyValues)
// Tuple implementation
public void getKey(ImmutableBytesWritable ptr)
public boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)
public int size()
public KeyValue getValue(int index)
}Usage:
// Process tuples from result iterator
try (ResultIterator iterator = plan.iterator()) {
Tuple tuple;
while ((tuple = iterator.next()) != null) {
// Access row key
ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
tuple.getKey(keyPtr);
String rowKey = Bytes.toString(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
// Access specific column values
ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
byte[] family = Bytes.toBytes("cf");
byte[] qualifier = Bytes.toBytes("name");
if (tuple.getValue(family, qualifier, valuePtr)) {
String name = Bytes.toString(valuePtr.get(), valuePtr.getOffset(), valuePtr.getLength());
System.out.println("Row " + rowKey + ", Name: " + name);
}
// Access tuple properties
int tupleSize = tuple.size();
boolean immutable = tuple.isImmutable();
// Access by index
for (int i = 0; i < tupleSize; i++) {
KeyValue kv = tuple.getValue(i);
if (kv != null) {
String qual = Bytes.toString(kv.getQualifierArray(),
kv.getQualifierOffset(),
kv.getQualifierLength());
String value = Bytes.toString(kv.getValueArray(),
kv.getValueOffset(),
kv.getValueLength());
System.out.println(" " + qual + ": " + value);
}
}
}
}Maintains execution context and state during query processing.
public class StatementContext {
// Context information
public PhoenixConnection getConnection()
public ColumnResolver getResolver()
public Scan getScan()
public long getCurrentTime()
// Execution state
public SequenceManager getSequenceManager()
public TupleProjector getTupleProjector()
public GroupBy getGroupBy()
public OrderBy getOrderBy()
// Metrics and monitoring
public ReadMetricQueue getReadMetricsQueue()
public WriteMetricQueue getWriteMetricsQueue()
public OverAllQueryMetrics getOverallQueryMetrics()
// Execution configuration
public int getPageSize()
public Integer getLimit()
public Integer getOffset()
}Usage:
// Access execution context
QueryPlan plan = getQueryPlan();
StatementContext context = plan.getContext();
// Get connection and scan information
PhoenixConnection connection = context.getConnection();
Scan scan = context.getScan();
// Configure scan properties
scan.setCaching(1000); // Set row caching
scan.setBatch(100); // Set batch size
scan.setMaxVersions(1); // Only get latest version
// Access execution limits
Integer limit = context.getLimit();
Integer offset = context.getOffset();
int pageSize = context.getPageSize();
System.out.println("Query limit: " + limit);
System.out.println("Query offset: " + offset);
System.out.println("Page size: " + pageSize);
// Access metrics
ReadMetricQueue readMetrics = context.getReadMetricsQueue();
WriteMetricQueue writeMetrics = context.getWriteMetricsQueue();
System.out.println("Read metrics queue size: " + readMetrics.size());
System.out.println("Write metrics queue size: " + writeMetrics.size());// Configure parallel execution
public class ParallelQueryExecutor {
private final int parallelism;
private final ExecutorService executorService;
public ParallelQueryExecutor(int parallelism) {
this.parallelism = parallelism;
this.executorService = Executors.newFixedThreadPool(parallelism);
}
public List<Future<QueryResult>> executeParallel(List<QueryPlan> plans) {
List<Future<QueryResult>> futures = new ArrayList<>();
for (QueryPlan plan : plans) {
Future<QueryResult> future = executorService.submit(() -> {
List<Tuple> results = new ArrayList<>();
try (ResultIterator iterator = plan.iterator()) {
Tuple tuple;
while ((tuple = iterator.next()) != null) {
results.add(tuple);
}
}
return new QueryResult(results, plan.getEstimatedSize());
});
futures.add(future);
}
return futures;
}
}
// Usage
ParallelQueryExecutor executor = new ParallelQueryExecutor(4);
List<QueryPlan> parallelPlans = createParallelPlans(originalPlan);
List<Future<QueryResult>> futures = executor.executeParallel(parallelPlans);
// Collect results
List<Tuple> allResults = new ArrayList<>();
for (Future<QueryResult> future : futures) {
QueryResult result = future.get();
allResults.addAll(result.getTuples());
}
System.out.println("Parallel execution completed, total results: " + allResults.size());// Query result caching
public class QueryCache {
private final Map<String, CachedResult> cache = new ConcurrentHashMap<>();
private final long maxAge = 300000; // 5 minutes
public CachedResult getCachedResult(String queryKey) {
CachedResult cached = cache.get(queryKey);
if (cached != null && System.currentTimeMillis() - cached.getTimestamp() < maxAge) {
return cached;
}
cache.remove(queryKey);
return null;
}
public void cacheResult(String queryKey, List<Tuple> results) {
cache.put(queryKey, new CachedResult(results, System.currentTimeMillis()));
}
}
// Usage with query execution
QueryCache queryCache = new QueryCache();
String queryKey = generateQueryKey(sql, parameters);
// Check cache first
CachedResult cached = queryCache.getCachedResult(queryKey);
if (cached != null) {
System.out.println("Using cached results");
return cached.getResults();
}
// Execute query if not cached
List<Tuple> results = new ArrayList<>();
try (ResultIterator iterator = plan.iterator()) {
Tuple tuple;
while ((tuple = iterator.next()) != null) {
results.add(tuple);
}
}
// Cache results for future queries
queryCache.cacheResult(queryKey, results);
return results;// Custom result processor with aggregation
public class ResultProcessor {
public ProcessedResult processResults(ResultIterator iterator,
List<Expression> aggregateExpressions) throws SQLException {
Map<String, Object> aggregatedValues = new HashMap<>();
List<ProcessedRow> processedRows = new ArrayList<>();
int totalRows = 0;
// Initialize aggregators
Map<String, Aggregator> aggregators = new HashMap<>();
for (Expression expr : aggregateExpressions) {
if (expr instanceof AggregateFunction) {
AggregateFunction aggFunc = (AggregateFunction) expr;
aggregators.put(expr.toString(), aggFunc.newAggregator());
}
}
// Process each row
Tuple tuple;
while ((tuple = iterator.next()) != null) {
totalRows++;
// Create processed row
ProcessedRow row = new ProcessedRow();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
// Extract values from tuple
for (int i = 0; i < tuple.size(); i++) {
tuple.getKey(ptr, i);
if (ptr.getLength() > 0) {
String value = Bytes.toString(ptr.get(), ptr.getOffset(), ptr.getLength());
row.addValue("column_" + i, value);
}
}
processedRows.add(row);
// Update aggregators
for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {
entry.getValue().aggregate(tuple, ptr);
}
}
// Finalize aggregates
ImmutableBytesWritable result = new ImmutableBytesWritable();
for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {
if (entry.getValue().evaluate(null, result)) {
Object aggregatedValue = PVarchar.INSTANCE.toObject(result);
aggregatedValues.put(entry.getKey(), aggregatedValue);
}
}
return new ProcessedResult(processedRows, aggregatedValues, totalRows);
}
}
// Usage
ResultProcessor processor = new ResultProcessor();
List<Expression> aggregateExprs = Arrays.asList(
new CountAggregateFunction(Arrays.asList(LiteralExpression.newConstant(1))),
new SumAggregateFunction(Arrays.asList(salaryColumn))
);
try (ResultIterator iterator = plan.iterator()) {
ProcessedResult result = processor.processResults(iterator, aggregateExprs);
System.out.println("Total rows: " + result.getTotalRows());
System.out.println("Processed rows: " + result.getProcessedRows().size());
Map<String, Object> aggregates = result.getAggregatedValues();
for (Map.Entry<String, Object> entry : aggregates.entrySet()) {
System.out.println("Aggregate " + entry.getKey() + ": " + entry.getValue());
}
}// Monitor query execution
public class ExecutionMonitor {
public void monitorExecution(QueryPlan plan) throws SQLException {
StatementContext context = plan.getContext();
long startTime = System.currentTimeMillis();
System.out.println("=== Query Execution Monitor ===");
System.out.println("Start time: " + new Date(startTime));
System.out.println("Estimated rows: " + plan.getEstimatedRowsToScan());
System.out.println("Estimated bytes: " + plan.getEstimatedBytesToScan());
int rowCount = 0;
try (ResultIterator iterator = plan.iterator()) {
Tuple tuple;
while ((tuple = iterator.next()) != null) {
rowCount++;
// Log progress every 1000 rows
if (rowCount % 1000 == 0) {
long elapsed = System.currentTimeMillis() - startTime;
double rowsPerSecond = (double) rowCount / (elapsed / 1000.0);
System.out.println("Processed " + rowCount + " rows, " +
String.format("%.2f", rowsPerSecond) + " rows/sec");
}
}
// Final statistics
long totalTime = System.currentTimeMillis() - startTime;
double avgRowsPerSecond = (double) rowCount / (totalTime / 1000.0);
System.out.println("=== Execution Complete ===");
System.out.println("Total rows: " + rowCount);
System.out.println("Total time: " + totalTime + "ms");
System.out.println("Average rate: " + String.format("%.2f", avgRowsPerSecond) + " rows/sec");
// Access execution metrics if available
ReadMetricQueue readMetrics = context.getReadMetricsQueue();
if (readMetrics != null) {
System.out.println("Read operations: " + readMetrics.size());
}
}
}
}
// Usage
ExecutionMonitor monitor = new ExecutionMonitor();
monitor.monitorExecution(queryPlan);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-phoenix--phoenix-core