CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-phoenix--phoenix-core

Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support

Overview
Eval results
Files

execution.mddocs/

Query Execution

Phoenix's query execution framework provides optimized query processing with support for parallel execution, result iteration, mutation state management, and distributed operations. The execution system leverages HBase's distributed architecture while providing SQL semantics.

Core Imports

import org.apache.phoenix.execute.*;
import org.apache.phoenix.iterate.*;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.schema.tuple.Tuple;

Mutation State Management

MutationState

Manages mutation state for Phoenix transactions, batching mutations for efficient execution.

public class MutationState implements SQLCloseable {
    // Constructor
    public MutationState(int maxSize, int maxSizeBytes, PhoenixConnection connection)

    // Mutation operations
    public void addMutation(PName tableName, Mutation mutation) throws SQLException
    public void addMutations(PName tableName, List<Mutation> mutations) throws SQLException

    // Batch management
    public void send() throws SQLException
    public void commit() throws SQLException
    public void rollback() throws SQLException

    // State information
    public int getUpdateCount()
    public boolean hasUncommittedData()
    public long getEstimatedSize()
    public int getMaxSize()

    // Transaction management
    public void startTransaction() throws SQLException
    public void join(MutationState newMutation) throws SQLException
    public MutationState newMutationState(int maxSize, int maxSizeBytes)
}

Usage:

PhoenixConnection connection = getPhoenixConnection();
MutationState mutationState = connection.getMutationState();

// Add individual mutations
PName tableName = PNameFactory.newName("users");
Put userPut = new Put(Bytes.toBytes("user123"));
userPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("John Doe"));

mutationState.addMutation(tableName, userPut);

// Add batch of mutations
List<Mutation> batch = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    Put put = new Put(Bytes.toBytes("user" + i));
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("User " + i));
    batch.add(put);
}
mutationState.addMutations(tableName, batch);

// Check state before commit
boolean hasData = mutationState.hasUncommittedData();
long estimatedSize = mutationState.getEstimatedSize();
int updateCount = mutationState.getUpdateCount();

System.out.println("Has uncommitted data: " + hasData);
System.out.println("Estimated size: " + estimatedSize + " bytes");
System.out.println("Update count: " + updateCount);

// Commit mutations
mutationState.commit();

Query Execution Plans

QueryPlan Interface

Base interface for query execution plans providing iteration and metadata.

public interface QueryPlan {
    // Plan execution
    ResultIterator iterator() throws SQLException
    ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException
    ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException

    // Plan information
    StatementContext getContext()
    ParameterMetaData getParameterMetaData()
    ExplainPlan getExplainPlan() throws SQLException

    // Cost estimation
    long getEstimatedSize()
    Cost getCost()
    long getEstimatedRowsToScan()
    Long getEstimatedBytesToScan()

    // Plan properties
    Operation getOperation()
    boolean useRoundRobinIterator()
}

BaseQueryPlan

Abstract base implementation of QueryPlan with common functionality.

public abstract class BaseQueryPlan implements QueryPlan {
    protected final StatementContext context;
    protected final FilterableStatement statement;
    protected final TableRef tableRef;
    protected final RowProjector projector;

    // Common plan operations
    public StatementContext getContext()
    public ParameterMetaData getParameterMetaData()
    public Cost getCost()
    public Operation getOperation()

    // Abstract methods for subclasses
    public abstract ResultIterator iterator() throws SQLException
    public abstract ExplainPlan getExplainPlan() throws SQLException
}

Usage:

// Get query plan from compiled statement
QueryCompiler compiler = new QueryCompiler();
String sql = "SELECT id, name, salary FROM employees WHERE department = ? ORDER BY salary DESC";
QueryPlan plan = compiler.compile(sql, context);

// Examine plan properties
long estimatedSize = plan.getEstimatedSize();
long estimatedRows = plan.getEstimatedRowsToScan();
Cost cost = plan.getCost();
Operation operation = plan.getOperation();

System.out.println("Estimated size: " + estimatedSize + " bytes");
System.out.println("Estimated rows: " + estimatedRows);
System.out.println("Operation: " + operation);

// Get explain plan
ExplainPlan explainPlan = plan.getExplainPlan();
System.out.println("Query plan:");
for (String step : explainPlan.getPlanSteps()) {
    System.out.println("  " + step);
}

// Execute plan
try (ResultIterator iterator = plan.iterator()) {
    Tuple tuple;
    while ((tuple = iterator.next()) != null) {
        // Process tuple
        processTuple(tuple);
    }
}

Result Iteration

ResultIterator

Interface for iterating over query results with support for peeking and closing.

public interface ResultIterator extends SQLCloseable {
    // Iteration methods
    Tuple next() throws SQLException
    Tuple peek() throws SQLException

    // Iterator state
    void close() throws SQLException
    ExplainPlan getExplainPlan() throws SQLException

    // Aggregation support
    Aggregators getAggregators()
}

TableResultIterator

Interface for iterating over table scan results with additional scan information.

public interface TableResultIterator extends ResultIterator {
    // Scan information
    void initScanner() throws SQLException
    Scan getScan()
    void setScan(Scan scan)

    // Region information
    HRegionLocation getRegionLocation()
    long getReadMetricQueue()
    long getOverAllQueryMetrics()
}

BaseResultIterator

Abstract base implementation providing common iterator functionality.

public abstract class BaseResultIterator implements ResultIterator {
    // Common iterator operations
    public Tuple peek() throws SQLException
    public void close() throws SQLException
    public ExplainPlan getExplainPlan() throws SQLException

    // Abstract methods
    public abstract Tuple next() throws SQLException
}

Usage:

// Basic result iteration
QueryPlan plan = getQueryPlan();
try (ResultIterator iterator = plan.iterator()) {
    Tuple tuple;
    int rowCount = 0;

    while ((tuple = iterator.next()) != null) {
        rowCount++;

        // Access column values from tuple
        ImmutableBytesWritable ptr = new ImmutableBytesWritable();

        // Get first column value
        tuple.getKey(ptr, 0);
        Object value = PVarchar.INSTANCE.toObject(ptr);
        System.out.println("Column 0: " + value);

        // Peek at next tuple without consuming it
        Tuple nextTuple = iterator.peek();
        if (nextTuple != null) {
            System.out.println("Next tuple available");
        }
    }

    System.out.println("Processed " + rowCount + " rows");
}

// Table scan iteration with scan details
if (iterator instanceof TableResultIterator) {
    TableResultIterator tableIter = (TableResultIterator) iterator;
    Scan scan = tableIter.getScan();
    HRegionLocation region = tableIter.getRegionLocation();

    System.out.println("Scan start row: " + Bytes.toString(scan.getStartRow()));
    System.out.println("Scan stop row: " + Bytes.toString(scan.getStopRow()));
    System.out.println("Region: " + region.getRegionInfo().getRegionNameAsString());
}

ParallelIteratorFactory

Factory for creating parallel result iterators for distributed query execution.

public interface ParallelIteratorFactory {
    // Iterator creation
    ResultIterator newIterator(StatementContext context, ResultIterator iterator,
                              Scan scan, String tableName) throws SQLException

    // Parallel execution support
    List<PeekingResultIterator> getIterators() throws SQLException
    void submitWork(Callable<Boolean> callable) throws SQLException
}

Usage:

// Create parallel iterators for distributed execution
ParallelIteratorFactory factory = getParallelIteratorFactory();
List<PeekingResultIterator> parallelIterators = factory.getIterators();

System.out.println("Created " + parallelIterators.size() + " parallel iterators");

// Process results from multiple iterators
ExecutorService executor = Executors.newFixedThreadPool(parallelIterators.size());
List<Future<Integer>> futures = new ArrayList<>();

for (PeekingResultIterator iter : parallelIterators) {
    Future<Integer> future = executor.submit(() -> {
        int count = 0;
        try (ResultIterator iterator = iter) {
            Tuple tuple;
            while ((tuple = iterator.next()) != null) {
                count++;
                // Process tuple
            }
        }
        return count;
    });
    futures.add(future);
}

// Collect results
int totalRows = 0;
for (Future<Integer> future : futures) {
    totalRows += future.get();
}

System.out.println("Total rows processed: " + totalRows);
executor.shutdown();

Tuple Processing

Tuple

Interface representing a row of data with key-value access methods.

public interface Tuple {
    // Key access
    void getKey(ImmutableBytesWritable ptr)
    void getKey(ImmutableBytesWritable ptr, int position)

    // Value access
    boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)
    Cell getValue(byte[] family, byte[] qualifier)

    // Tuple properties
    boolean isImmutable()
    int size()
    KeyValue getValue(int index)

    // Comparison
    int compareTo(Tuple other)
}

KeyValueTuple

Implementation of Tuple backed by HBase KeyValue objects.

public class KeyValueTuple implements Tuple {
    public KeyValueTuple(KeyValue keyValue)
    public KeyValueTuple(List<KeyValue> keyValues)

    // Tuple implementation
    public void getKey(ImmutableBytesWritable ptr)
    public boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)
    public int size()
    public KeyValue getValue(int index)
}

Usage:

// Process tuples from result iterator
try (ResultIterator iterator = plan.iterator()) {
    Tuple tuple;
    while ((tuple = iterator.next()) != null) {
        // Access row key
        ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
        tuple.getKey(keyPtr);
        String rowKey = Bytes.toString(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());

        // Access specific column values
        ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
        byte[] family = Bytes.toBytes("cf");
        byte[] qualifier = Bytes.toBytes("name");

        if (tuple.getValue(family, qualifier, valuePtr)) {
            String name = Bytes.toString(valuePtr.get(), valuePtr.getOffset(), valuePtr.getLength());
            System.out.println("Row " + rowKey + ", Name: " + name);
        }

        // Access tuple properties
        int tupleSize = tuple.size();
        boolean immutable = tuple.isImmutable();

        // Access by index
        for (int i = 0; i < tupleSize; i++) {
            KeyValue kv = tuple.getValue(i);
            if (kv != null) {
                String qual = Bytes.toString(kv.getQualifierArray(),
                                           kv.getQualifierOffset(),
                                           kv.getQualifierLength());
                String value = Bytes.toString(kv.getValueArray(),
                                            kv.getValueOffset(),
                                            kv.getValueLength());
                System.out.println("  " + qual + ": " + value);
            }
        }
    }
}

Execution Context

StatementContext

Maintains execution context and state during query processing.

public class StatementContext {
    // Context information
    public PhoenixConnection getConnection()
    public ColumnResolver getResolver()
    public Scan getScan()
    public long getCurrentTime()

    // Execution state
    public SequenceManager getSequenceManager()
    public TupleProjector getTupleProjector()
    public GroupBy getGroupBy()
    public OrderBy getOrderBy()

    // Metrics and monitoring
    public ReadMetricQueue getReadMetricsQueue()
    public WriteMetricQueue getWriteMetricsQueue()
    public OverAllQueryMetrics getOverallQueryMetrics()

    // Execution configuration
    public int getPageSize()
    public Integer getLimit()
    public Integer getOffset()
}

Usage:

// Access execution context
QueryPlan plan = getQueryPlan();
StatementContext context = plan.getContext();

// Get connection and scan information
PhoenixConnection connection = context.getConnection();
Scan scan = context.getScan();

// Configure scan properties
scan.setCaching(1000);  // Set row caching
scan.setBatch(100);     // Set batch size
scan.setMaxVersions(1); // Only get latest version

// Access execution limits
Integer limit = context.getLimit();
Integer offset = context.getOffset();
int pageSize = context.getPageSize();

System.out.println("Query limit: " + limit);
System.out.println("Query offset: " + offset);
System.out.println("Page size: " + pageSize);

// Access metrics
ReadMetricQueue readMetrics = context.getReadMetricsQueue();
WriteMetricQueue writeMetrics = context.getWriteMetricsQueue();

System.out.println("Read metrics queue size: " + readMetrics.size());
System.out.println("Write metrics queue size: " + writeMetrics.size());

Advanced Execution Features

Parallel Query Execution

// Configure parallel execution
public class ParallelQueryExecutor {
    private final int parallelism;
    private final ExecutorService executorService;

    public ParallelQueryExecutor(int parallelism) {
        this.parallelism = parallelism;
        this.executorService = Executors.newFixedThreadPool(parallelism);
    }

    public List<Future<QueryResult>> executeParallel(List<QueryPlan> plans) {
        List<Future<QueryResult>> futures = new ArrayList<>();

        for (QueryPlan plan : plans) {
            Future<QueryResult> future = executorService.submit(() -> {
                List<Tuple> results = new ArrayList<>();
                try (ResultIterator iterator = plan.iterator()) {
                    Tuple tuple;
                    while ((tuple = iterator.next()) != null) {
                        results.add(tuple);
                    }
                }
                return new QueryResult(results, plan.getEstimatedSize());
            });
            futures.add(future);
        }

        return futures;
    }
}

// Usage
ParallelQueryExecutor executor = new ParallelQueryExecutor(4);
List<QueryPlan> parallelPlans = createParallelPlans(originalPlan);
List<Future<QueryResult>> futures = executor.executeParallel(parallelPlans);

// Collect results
List<Tuple> allResults = new ArrayList<>();
for (Future<QueryResult> future : futures) {
    QueryResult result = future.get();
    allResults.addAll(result.getTuples());
}

System.out.println("Parallel execution completed, total results: " + allResults.size());

Query Optimization and Caching

// Query result caching
public class QueryCache {
    private final Map<String, CachedResult> cache = new ConcurrentHashMap<>();
    private final long maxAge = 300000; // 5 minutes

    public CachedResult getCachedResult(String queryKey) {
        CachedResult cached = cache.get(queryKey);
        if (cached != null && System.currentTimeMillis() - cached.getTimestamp() < maxAge) {
            return cached;
        }
        cache.remove(queryKey);
        return null;
    }

    public void cacheResult(String queryKey, List<Tuple> results) {
        cache.put(queryKey, new CachedResult(results, System.currentTimeMillis()));
    }
}

// Usage with query execution
QueryCache queryCache = new QueryCache();
String queryKey = generateQueryKey(sql, parameters);

// Check cache first
CachedResult cached = queryCache.getCachedResult(queryKey);
if (cached != null) {
    System.out.println("Using cached results");
    return cached.getResults();
}

// Execute query if not cached
List<Tuple> results = new ArrayList<>();
try (ResultIterator iterator = plan.iterator()) {
    Tuple tuple;
    while ((tuple = iterator.next()) != null) {
        results.add(tuple);
    }
}

// Cache results for future queries
queryCache.cacheResult(queryKey, results);
return results;

Custom Result Processing

// Custom result processor with aggregation
public class ResultProcessor {
    public ProcessedResult processResults(ResultIterator iterator,
                                        List<Expression> aggregateExpressions) throws SQLException {
        Map<String, Object> aggregatedValues = new HashMap<>();
        List<ProcessedRow> processedRows = new ArrayList<>();
        int totalRows = 0;

        // Initialize aggregators
        Map<String, Aggregator> aggregators = new HashMap<>();
        for (Expression expr : aggregateExpressions) {
            if (expr instanceof AggregateFunction) {
                AggregateFunction aggFunc = (AggregateFunction) expr;
                aggregators.put(expr.toString(), aggFunc.newAggregator());
            }
        }

        // Process each row
        Tuple tuple;
        while ((tuple = iterator.next()) != null) {
            totalRows++;

            // Create processed row
            ProcessedRow row = new ProcessedRow();
            ImmutableBytesWritable ptr = new ImmutableBytesWritable();

            // Extract values from tuple
            for (int i = 0; i < tuple.size(); i++) {
                tuple.getKey(ptr, i);
                if (ptr.getLength() > 0) {
                    String value = Bytes.toString(ptr.get(), ptr.getOffset(), ptr.getLength());
                    row.addValue("column_" + i, value);
                }
            }

            processedRows.add(row);

            // Update aggregators
            for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {
                entry.getValue().aggregate(tuple, ptr);
            }
        }

        // Finalize aggregates
        ImmutableBytesWritable result = new ImmutableBytesWritable();
        for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {
            if (entry.getValue().evaluate(null, result)) {
                Object aggregatedValue = PVarchar.INSTANCE.toObject(result);
                aggregatedValues.put(entry.getKey(), aggregatedValue);
            }
        }

        return new ProcessedResult(processedRows, aggregatedValues, totalRows);
    }
}

// Usage
ResultProcessor processor = new ResultProcessor();
List<Expression> aggregateExprs = Arrays.asList(
    new CountAggregateFunction(Arrays.asList(LiteralExpression.newConstant(1))),
    new SumAggregateFunction(Arrays.asList(salaryColumn))
);

try (ResultIterator iterator = plan.iterator()) {
    ProcessedResult result = processor.processResults(iterator, aggregateExprs);

    System.out.println("Total rows: " + result.getTotalRows());
    System.out.println("Processed rows: " + result.getProcessedRows().size());

    Map<String, Object> aggregates = result.getAggregatedValues();
    for (Map.Entry<String, Object> entry : aggregates.entrySet()) {
        System.out.println("Aggregate " + entry.getKey() + ": " + entry.getValue());
    }
}

Execution Monitoring

// Monitor query execution
public class ExecutionMonitor {
    public void monitorExecution(QueryPlan plan) throws SQLException {
        StatementContext context = plan.getContext();
        long startTime = System.currentTimeMillis();

        System.out.println("=== Query Execution Monitor ===");
        System.out.println("Start time: " + new Date(startTime));
        System.out.println("Estimated rows: " + plan.getEstimatedRowsToScan());
        System.out.println("Estimated bytes: " + plan.getEstimatedBytesToScan());

        int rowCount = 0;
        try (ResultIterator iterator = plan.iterator()) {
            Tuple tuple;
            while ((tuple = iterator.next()) != null) {
                rowCount++;

                // Log progress every 1000 rows
                if (rowCount % 1000 == 0) {
                    long elapsed = System.currentTimeMillis() - startTime;
                    double rowsPerSecond = (double) rowCount / (elapsed / 1000.0);
                    System.out.println("Processed " + rowCount + " rows, " +
                                     String.format("%.2f", rowsPerSecond) + " rows/sec");
                }
            }

            // Final statistics
            long totalTime = System.currentTimeMillis() - startTime;
            double avgRowsPerSecond = (double) rowCount / (totalTime / 1000.0);

            System.out.println("=== Execution Complete ===");
            System.out.println("Total rows: " + rowCount);
            System.out.println("Total time: " + totalTime + "ms");
            System.out.println("Average rate: " + String.format("%.2f", avgRowsPerSecond) + " rows/sec");

            // Access execution metrics if available
            ReadMetricQueue readMetrics = context.getReadMetricsQueue();
            if (readMetrics != null) {
                System.out.println("Read operations: " + readMetrics.size());
            }
        }
    }
}

// Usage
ExecutionMonitor monitor = new ExecutionMonitor();
monitor.monitorExecution(queryPlan);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-phoenix--phoenix-core

docs

configuration.md

exceptions.md

execution.md

expressions.md

index.md

jdbc.md

mapreduce.md

monitoring.md

query-compilation.md

schema-metadata.md

server.md

transactions.md

types.md

tile.json