0
# Query Execution
1
2
Phoenix's query execution framework provides optimized query processing with support for parallel execution, result iteration, mutation state management, and distributed operations. The execution system leverages HBase's distributed architecture while providing SQL semantics.
3
4
## Core Imports
5
6
```java
7
import org.apache.phoenix.execute.*;
8
import org.apache.phoenix.iterate.*;
9
import org.apache.phoenix.compile.QueryPlan;
10
import org.apache.phoenix.schema.tuple.Tuple;
11
```
12
13
## Mutation State Management
14
15
### MutationState
16
17
Manages mutation state for Phoenix transactions, batching mutations for efficient execution.
18
19
```java{ .api }
20
public class MutationState implements SQLCloseable {
21
// Constructor
22
public MutationState(int maxSize, int maxSizeBytes, PhoenixConnection connection)
23
24
// Mutation operations
25
public void addMutation(PName tableName, Mutation mutation) throws SQLException
26
public void addMutations(PName tableName, List<Mutation> mutations) throws SQLException
27
28
// Batch management
29
public void send() throws SQLException
30
public void commit() throws SQLException
31
public void rollback() throws SQLException
32
33
// State information
34
public int getUpdateCount()
35
public boolean hasUncommittedData()
36
public long getEstimatedSize()
37
public int getMaxSize()
38
39
// Transaction management
40
public void startTransaction() throws SQLException
41
public void join(MutationState newMutation) throws SQLException
42
public MutationState newMutationState(int maxSize, int maxSizeBytes)
43
}
44
```
45
46
**Usage:**
47
```java
48
PhoenixConnection connection = getPhoenixConnection();
49
MutationState mutationState = connection.getMutationState();
50
51
// Add individual mutations
52
PName tableName = PNameFactory.newName("users");
53
Put userPut = new Put(Bytes.toBytes("user123"));
54
userPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("John Doe"));
55
56
mutationState.addMutation(tableName, userPut);
57
58
// Add batch of mutations
59
List<Mutation> batch = new ArrayList<>();
60
for (int i = 0; i < 100; i++) {
61
Put put = new Put(Bytes.toBytes("user" + i));
62
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("User " + i));
63
batch.add(put);
64
}
65
mutationState.addMutations(tableName, batch);
66
67
// Check state before commit
68
boolean hasData = mutationState.hasUncommittedData();
69
long estimatedSize = mutationState.getEstimatedSize();
70
int updateCount = mutationState.getUpdateCount();
71
72
System.out.println("Has uncommitted data: " + hasData);
73
System.out.println("Estimated size: " + estimatedSize + " bytes");
74
System.out.println("Update count: " + updateCount);
75
76
// Commit mutations
77
mutationState.commit();
78
```
79
80
## Query Execution Plans
81
82
### QueryPlan Interface
83
84
Base interface for query execution plans providing iteration and metadata.
85
86
```java{ .api }
87
public interface QueryPlan {
88
// Plan execution
89
ResultIterator iterator() throws SQLException
90
ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException
91
ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException
92
93
// Plan information
94
StatementContext getContext()
95
ParameterMetaData getParameterMetaData()
96
ExplainPlan getExplainPlan() throws SQLException
97
98
// Cost estimation
99
long getEstimatedSize()
100
Cost getCost()
101
long getEstimatedRowsToScan()
102
Long getEstimatedBytesToScan()
103
104
// Plan properties
105
Operation getOperation()
106
boolean useRoundRobinIterator()
107
}
108
```
109
110
### BaseQueryPlan
111
112
Abstract base implementation of QueryPlan with common functionality.
113
114
```java{ .api }
115
public abstract class BaseQueryPlan implements QueryPlan {
116
protected final StatementContext context;
117
protected final FilterableStatement statement;
118
protected final TableRef tableRef;
119
protected final RowProjector projector;
120
121
// Common plan operations
122
public StatementContext getContext()
123
public ParameterMetaData getParameterMetaData()
124
public Cost getCost()
125
public Operation getOperation()
126
127
// Abstract methods for subclasses
128
public abstract ResultIterator iterator() throws SQLException
129
public abstract ExplainPlan getExplainPlan() throws SQLException
130
}
131
```
132
133
**Usage:**
134
```java
135
// Get query plan from compiled statement
136
QueryCompiler compiler = new QueryCompiler();
137
String sql = "SELECT id, name, salary FROM employees WHERE department = ? ORDER BY salary DESC";
138
QueryPlan plan = compiler.compile(sql, context);
139
140
// Examine plan properties
141
long estimatedSize = plan.getEstimatedSize();
142
long estimatedRows = plan.getEstimatedRowsToScan();
143
Cost cost = plan.getCost();
144
Operation operation = plan.getOperation();
145
146
System.out.println("Estimated size: " + estimatedSize + " bytes");
147
System.out.println("Estimated rows: " + estimatedRows);
148
System.out.println("Operation: " + operation);
149
150
// Get explain plan
151
ExplainPlan explainPlan = plan.getExplainPlan();
152
System.out.println("Query plan:");
153
for (String step : explainPlan.getPlanSteps()) {
154
System.out.println(" " + step);
155
}
156
157
// Execute plan
158
try (ResultIterator iterator = plan.iterator()) {
159
Tuple tuple;
160
while ((tuple = iterator.next()) != null) {
161
// Process tuple
162
processTuple(tuple);
163
}
164
}
165
```
166
167
## Result Iteration
168
169
### ResultIterator
170
171
Interface for iterating over query results with support for peeking and closing.
172
173
```java{ .api }
174
public interface ResultIterator extends SQLCloseable {
175
// Iteration methods
176
Tuple next() throws SQLException
177
Tuple peek() throws SQLException
178
179
// Iterator state
180
void close() throws SQLException
181
ExplainPlan getExplainPlan() throws SQLException
182
183
// Aggregation support
184
Aggregators getAggregators()
185
}
186
```
187
188
### TableResultIterator
189
190
Interface for iterating over table scan results with additional scan information.
191
192
```java{ .api }
193
public interface TableResultIterator extends ResultIterator {
194
// Scan information
195
void initScanner() throws SQLException
196
Scan getScan()
197
void setScan(Scan scan)
198
199
// Region information
200
HRegionLocation getRegionLocation()
201
long getReadMetricQueue()
202
long getOverAllQueryMetrics()
203
}
204
```
205
206
### BaseResultIterator
207
208
Abstract base implementation providing common iterator functionality.
209
210
```java{ .api }
211
public abstract class BaseResultIterator implements ResultIterator {
212
// Common iterator operations
213
public Tuple peek() throws SQLException
214
public void close() throws SQLException
215
public ExplainPlan getExplainPlan() throws SQLException
216
217
// Abstract methods
218
public abstract Tuple next() throws SQLException
219
}
220
```
221
222
**Usage:**
223
```java
224
// Basic result iteration
225
QueryPlan plan = getQueryPlan();
226
try (ResultIterator iterator = plan.iterator()) {
227
Tuple tuple;
228
int rowCount = 0;
229
230
while ((tuple = iterator.next()) != null) {
231
rowCount++;
232
233
// Access column values from tuple
234
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
235
236
// Get first column value
237
tuple.getKey(ptr, 0);
238
Object value = PVarchar.INSTANCE.toObject(ptr);
239
System.out.println("Column 0: " + value);
240
241
// Peek at next tuple without consuming it
242
Tuple nextTuple = iterator.peek();
243
if (nextTuple != null) {
244
System.out.println("Next tuple available");
245
}
246
}
247
248
System.out.println("Processed " + rowCount + " rows");
249
}
250
251
// Table scan iteration with scan details
252
if (iterator instanceof TableResultIterator) {
253
TableResultIterator tableIter = (TableResultIterator) iterator;
254
Scan scan = tableIter.getScan();
255
HRegionLocation region = tableIter.getRegionLocation();
256
257
System.out.println("Scan start row: " + Bytes.toString(scan.getStartRow()));
258
System.out.println("Scan stop row: " + Bytes.toString(scan.getStopRow()));
259
System.out.println("Region: " + region.getRegionInfo().getRegionNameAsString());
260
}
261
```
262
263
### ParallelIteratorFactory
264
265
Factory for creating parallel result iterators for distributed query execution.
266
267
```java{ .api }
268
public interface ParallelIteratorFactory {
269
// Iterator creation
270
ResultIterator newIterator(StatementContext context, ResultIterator iterator,
271
Scan scan, String tableName) throws SQLException
272
273
// Parallel execution support
274
List<PeekingResultIterator> getIterators() throws SQLException
275
void submitWork(Callable<Boolean> callable) throws SQLException
276
}
277
```
278
279
**Usage:**
280
```java
281
// Create parallel iterators for distributed execution
282
ParallelIteratorFactory factory = getParallelIteratorFactory();
283
List<PeekingResultIterator> parallelIterators = factory.getIterators();
284
285
System.out.println("Created " + parallelIterators.size() + " parallel iterators");
286
287
// Process results from multiple iterators
288
ExecutorService executor = Executors.newFixedThreadPool(parallelIterators.size());
289
List<Future<Integer>> futures = new ArrayList<>();
290
291
for (PeekingResultIterator iter : parallelIterators) {
292
Future<Integer> future = executor.submit(() -> {
293
int count = 0;
294
try (ResultIterator iterator = iter) {
295
Tuple tuple;
296
while ((tuple = iterator.next()) != null) {
297
count++;
298
// Process tuple
299
}
300
}
301
return count;
302
});
303
futures.add(future);
304
}
305
306
// Collect results
307
int totalRows = 0;
308
for (Future<Integer> future : futures) {
309
totalRows += future.get();
310
}
311
312
System.out.println("Total rows processed: " + totalRows);
313
executor.shutdown();
314
```
315
316
## Tuple Processing
317
318
### Tuple
319
320
Interface representing a row of data with key-value access methods.
321
322
```java{ .api }
323
public interface Tuple {
324
// Key access
325
void getKey(ImmutableBytesWritable ptr)
326
void getKey(ImmutableBytesWritable ptr, int position)
327
328
// Value access
329
boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)
330
Cell getValue(byte[] family, byte[] qualifier)
331
332
// Tuple properties
333
boolean isImmutable()
334
int size()
335
KeyValue getValue(int index)
336
337
// Comparison
338
int compareTo(Tuple other)
339
}
340
```
341
342
### KeyValueTuple
343
344
Implementation of Tuple backed by HBase KeyValue objects.
345
346
```java{ .api }
347
public class KeyValueTuple implements Tuple {
348
public KeyValueTuple(KeyValue keyValue)
349
public KeyValueTuple(List<KeyValue> keyValues)
350
351
// Tuple implementation
352
public void getKey(ImmutableBytesWritable ptr)
353
public boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)
354
public int size()
355
public KeyValue getValue(int index)
356
}
357
```
358
359
**Usage:**
360
```java
361
// Process tuples from result iterator
362
try (ResultIterator iterator = plan.iterator()) {
363
Tuple tuple;
364
while ((tuple = iterator.next()) != null) {
365
// Access row key
366
ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
367
tuple.getKey(keyPtr);
368
String rowKey = Bytes.toString(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
369
370
// Access specific column values
371
ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
372
byte[] family = Bytes.toBytes("cf");
373
byte[] qualifier = Bytes.toBytes("name");
374
375
if (tuple.getValue(family, qualifier, valuePtr)) {
376
String name = Bytes.toString(valuePtr.get(), valuePtr.getOffset(), valuePtr.getLength());
377
System.out.println("Row " + rowKey + ", Name: " + name);
378
}
379
380
// Access tuple properties
381
int tupleSize = tuple.size();
382
boolean immutable = tuple.isImmutable();
383
384
// Access by index
385
for (int i = 0; i < tupleSize; i++) {
386
KeyValue kv = tuple.getValue(i);
387
if (kv != null) {
388
String qual = Bytes.toString(kv.getQualifierArray(),
389
kv.getQualifierOffset(),
390
kv.getQualifierLength());
391
String value = Bytes.toString(kv.getValueArray(),
392
kv.getValueOffset(),
393
kv.getValueLength());
394
System.out.println(" " + qual + ": " + value);
395
}
396
}
397
}
398
}
399
```
400
401
## Execution Context
402
403
### StatementContext
404
405
Maintains execution context and state during query processing.
406
407
```java{ .api }
408
public class StatementContext {
409
// Context information
410
public PhoenixConnection getConnection()
411
public ColumnResolver getResolver()
412
public Scan getScan()
413
public long getCurrentTime()
414
415
// Execution state
416
public SequenceManager getSequenceManager()
417
public TupleProjector getTupleProjector()
418
public GroupBy getGroupBy()
419
public OrderBy getOrderBy()
420
421
// Metrics and monitoring
422
public ReadMetricQueue getReadMetricsQueue()
423
public WriteMetricQueue getWriteMetricsQueue()
424
public OverAllQueryMetrics getOverallQueryMetrics()
425
426
// Execution configuration
427
public int getPageSize()
428
public Integer getLimit()
429
public Integer getOffset()
430
}
431
```
432
433
**Usage:**
434
```java
435
// Access execution context
436
QueryPlan plan = getQueryPlan();
437
StatementContext context = plan.getContext();
438
439
// Get connection and scan information
440
PhoenixConnection connection = context.getConnection();
441
Scan scan = context.getScan();
442
443
// Configure scan properties
444
scan.setCaching(1000); // Set row caching
445
scan.setBatch(100); // Set batch size
446
scan.setMaxVersions(1); // Only get latest version
447
448
// Access execution limits
449
Integer limit = context.getLimit();
450
Integer offset = context.getOffset();
451
int pageSize = context.getPageSize();
452
453
System.out.println("Query limit: " + limit);
454
System.out.println("Query offset: " + offset);
455
System.out.println("Page size: " + pageSize);
456
457
// Access metrics
458
ReadMetricQueue readMetrics = context.getReadMetricsQueue();
459
WriteMetricQueue writeMetrics = context.getWriteMetricsQueue();
460
461
System.out.println("Read metrics queue size: " + readMetrics.size());
462
System.out.println("Write metrics queue size: " + writeMetrics.size());
463
```
464
465
## Advanced Execution Features
466
467
### Parallel Query Execution
468
469
```java
470
// Configure parallel execution
471
public class ParallelQueryExecutor {
472
private final int parallelism;
473
private final ExecutorService executorService;
474
475
public ParallelQueryExecutor(int parallelism) {
476
this.parallelism = parallelism;
477
this.executorService = Executors.newFixedThreadPool(parallelism);
478
}
479
480
public List<Future<QueryResult>> executeParallel(List<QueryPlan> plans) {
481
List<Future<QueryResult>> futures = new ArrayList<>();
482
483
for (QueryPlan plan : plans) {
484
Future<QueryResult> future = executorService.submit(() -> {
485
List<Tuple> results = new ArrayList<>();
486
try (ResultIterator iterator = plan.iterator()) {
487
Tuple tuple;
488
while ((tuple = iterator.next()) != null) {
489
results.add(tuple);
490
}
491
}
492
return new QueryResult(results, plan.getEstimatedSize());
493
});
494
futures.add(future);
495
}
496
497
return futures;
498
}
499
}
500
501
// Usage
502
ParallelQueryExecutor executor = new ParallelQueryExecutor(4);
503
List<QueryPlan> parallelPlans = createParallelPlans(originalPlan);
504
List<Future<QueryResult>> futures = executor.executeParallel(parallelPlans);
505
506
// Collect results
507
List<Tuple> allResults = new ArrayList<>();
508
for (Future<QueryResult> future : futures) {
509
QueryResult result = future.get();
510
allResults.addAll(result.getTuples());
511
}
512
513
System.out.println("Parallel execution completed, total results: " + allResults.size());
514
```
515
516
### Query Optimization and Caching
517
518
```java
519
// Query result caching
520
public class QueryCache {
521
private final Map<String, CachedResult> cache = new ConcurrentHashMap<>();
522
private final long maxAge = 300000; // 5 minutes
523
524
public CachedResult getCachedResult(String queryKey) {
525
CachedResult cached = cache.get(queryKey);
526
if (cached != null && System.currentTimeMillis() - cached.getTimestamp() < maxAge) {
527
return cached;
528
}
529
cache.remove(queryKey);
530
return null;
531
}
532
533
public void cacheResult(String queryKey, List<Tuple> results) {
534
cache.put(queryKey, new CachedResult(results, System.currentTimeMillis()));
535
}
536
}
537
538
// Usage with query execution
539
QueryCache queryCache = new QueryCache();
540
String queryKey = generateQueryKey(sql, parameters);
541
542
// Check cache first
543
CachedResult cached = queryCache.getCachedResult(queryKey);
544
if (cached != null) {
545
System.out.println("Using cached results");
546
return cached.getResults();
547
}
548
549
// Execute query if not cached
550
List<Tuple> results = new ArrayList<>();
551
try (ResultIterator iterator = plan.iterator()) {
552
Tuple tuple;
553
while ((tuple = iterator.next()) != null) {
554
results.add(tuple);
555
}
556
}
557
558
// Cache results for future queries
559
queryCache.cacheResult(queryKey, results);
560
return results;
561
```
562
563
### Custom Result Processing
564
565
```java
566
// Custom result processor with aggregation
567
public class ResultProcessor {
568
public ProcessedResult processResults(ResultIterator iterator,
569
List<Expression> aggregateExpressions) throws SQLException {
570
Map<String, Object> aggregatedValues = new HashMap<>();
571
List<ProcessedRow> processedRows = new ArrayList<>();
572
int totalRows = 0;
573
574
// Initialize aggregators
575
Map<String, Aggregator> aggregators = new HashMap<>();
576
for (Expression expr : aggregateExpressions) {
577
if (expr instanceof AggregateFunction) {
578
AggregateFunction aggFunc = (AggregateFunction) expr;
579
aggregators.put(expr.toString(), aggFunc.newAggregator());
580
}
581
}
582
583
// Process each row
584
Tuple tuple;
585
while ((tuple = iterator.next()) != null) {
586
totalRows++;
587
588
// Create processed row
589
ProcessedRow row = new ProcessedRow();
590
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
591
592
// Extract values from tuple
593
for (int i = 0; i < tuple.size(); i++) {
594
tuple.getKey(ptr, i);
595
if (ptr.getLength() > 0) {
596
String value = Bytes.toString(ptr.get(), ptr.getOffset(), ptr.getLength());
597
row.addValue("column_" + i, value);
598
}
599
}
600
601
processedRows.add(row);
602
603
// Update aggregators
604
for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {
605
entry.getValue().aggregate(tuple, ptr);
606
}
607
}
608
609
// Finalize aggregates
610
ImmutableBytesWritable result = new ImmutableBytesWritable();
611
for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {
612
if (entry.getValue().evaluate(null, result)) {
613
Object aggregatedValue = PVarchar.INSTANCE.toObject(result);
614
aggregatedValues.put(entry.getKey(), aggregatedValue);
615
}
616
}
617
618
return new ProcessedResult(processedRows, aggregatedValues, totalRows);
619
}
620
}
621
622
// Usage
623
ResultProcessor processor = new ResultProcessor();
624
List<Expression> aggregateExprs = Arrays.asList(
625
new CountAggregateFunction(Arrays.asList(LiteralExpression.newConstant(1))),
626
new SumAggregateFunction(Arrays.asList(salaryColumn))
627
);
628
629
try (ResultIterator iterator = plan.iterator()) {
630
ProcessedResult result = processor.processResults(iterator, aggregateExprs);
631
632
System.out.println("Total rows: " + result.getTotalRows());
633
System.out.println("Processed rows: " + result.getProcessedRows().size());
634
635
Map<String, Object> aggregates = result.getAggregatedValues();
636
for (Map.Entry<String, Object> entry : aggregates.entrySet()) {
637
System.out.println("Aggregate " + entry.getKey() + ": " + entry.getValue());
638
}
639
}
640
```
641
642
### Execution Monitoring
643
644
```java
645
// Monitor query execution
646
public class ExecutionMonitor {
647
public void monitorExecution(QueryPlan plan) throws SQLException {
648
StatementContext context = plan.getContext();
649
long startTime = System.currentTimeMillis();
650
651
System.out.println("=== Query Execution Monitor ===");
652
System.out.println("Start time: " + new Date(startTime));
653
System.out.println("Estimated rows: " + plan.getEstimatedRowsToScan());
654
System.out.println("Estimated bytes: " + plan.getEstimatedBytesToScan());
655
656
int rowCount = 0;
657
try (ResultIterator iterator = plan.iterator()) {
658
Tuple tuple;
659
while ((tuple = iterator.next()) != null) {
660
rowCount++;
661
662
// Log progress every 1000 rows
663
if (rowCount % 1000 == 0) {
664
long elapsed = System.currentTimeMillis() - startTime;
665
double rowsPerSecond = (double) rowCount / (elapsed / 1000.0);
666
System.out.println("Processed " + rowCount + " rows, " +
667
String.format("%.2f", rowsPerSecond) + " rows/sec");
668
}
669
}
670
671
// Final statistics
672
long totalTime = System.currentTimeMillis() - startTime;
673
double avgRowsPerSecond = (double) rowCount / (totalTime / 1000.0);
674
675
System.out.println("=== Execution Complete ===");
676
System.out.println("Total rows: " + rowCount);
677
System.out.println("Total time: " + totalTime + "ms");
678
System.out.println("Average rate: " + String.format("%.2f", avgRowsPerSecond) + " rows/sec");
679
680
// Access execution metrics if available
681
ReadMetricQueue readMetrics = context.getReadMetricsQueue();
682
if (readMetrics != null) {
683
System.out.println("Read operations: " + readMetrics.size());
684
}
685
}
686
}
687
}
688
689
// Usage
690
ExecutionMonitor monitor = new ExecutionMonitor();
691
monitor.monitorExecution(queryPlan);
692
```