or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdexecution.mdexpressions.mdindex.mdjdbc.mdmapreduce.mdmonitoring.mdquery-compilation.mdschema-metadata.mdserver.mdtransactions.mdtypes.md

execution.mddocs/

0

# Query Execution

1

2

Phoenix's query execution framework provides optimized query processing with support for parallel execution, result iteration, mutation state management, and distributed operations. The execution system leverages HBase's distributed architecture while providing SQL semantics.

3

4

## Core Imports

5

6

```java

7

import org.apache.phoenix.execute.*;

8

import org.apache.phoenix.iterate.*;

9

import org.apache.phoenix.compile.QueryPlan;

10

import org.apache.phoenix.schema.tuple.Tuple;

11

```

12

13

## Mutation State Management

14

15

### MutationState

16

17

Manages mutation state for Phoenix transactions, batching mutations for efficient execution.

18

19

```java{ .api }

20

public class MutationState implements SQLCloseable {

21

// Constructor

22

public MutationState(int maxSize, int maxSizeBytes, PhoenixConnection connection)

23

24

// Mutation operations

25

public void addMutation(PName tableName, Mutation mutation) throws SQLException

26

public void addMutations(PName tableName, List<Mutation> mutations) throws SQLException

27

28

// Batch management

29

public void send() throws SQLException

30

public void commit() throws SQLException

31

public void rollback() throws SQLException

32

33

// State information

34

public int getUpdateCount()

35

public boolean hasUncommittedData()

36

public long getEstimatedSize()

37

public int getMaxSize()

38

39

// Transaction management

40

public void startTransaction() throws SQLException

41

public void join(MutationState newMutation) throws SQLException

42

public MutationState newMutationState(int maxSize, int maxSizeBytes)

43

}

44

```

45

46

**Usage:**

47

```java

48

PhoenixConnection connection = getPhoenixConnection();

49

MutationState mutationState = connection.getMutationState();

50

51

// Add individual mutations

52

PName tableName = PNameFactory.newName("users");

53

Put userPut = new Put(Bytes.toBytes("user123"));

54

userPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("John Doe"));

55

56

mutationState.addMutation(tableName, userPut);

57

58

// Add batch of mutations

59

List<Mutation> batch = new ArrayList<>();

60

for (int i = 0; i < 100; i++) {

61

Put put = new Put(Bytes.toBytes("user" + i));

62

put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes("User " + i));

63

batch.add(put);

64

}

65

mutationState.addMutations(tableName, batch);

66

67

// Check state before commit

68

boolean hasData = mutationState.hasUncommittedData();

69

long estimatedSize = mutationState.getEstimatedSize();

70

int updateCount = mutationState.getUpdateCount();

71

72

System.out.println("Has uncommitted data: " + hasData);

73

System.out.println("Estimated size: " + estimatedSize + " bytes");

74

System.out.println("Update count: " + updateCount);

75

76

// Commit mutations

77

mutationState.commit();

78

```

79

80

## Query Execution Plans

81

82

### QueryPlan Interface

83

84

Base interface for query execution plans providing iteration and metadata.

85

86

```java{ .api }

87

public interface QueryPlan {

88

// Plan execution

89

ResultIterator iterator() throws SQLException

90

ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException

91

ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException

92

93

// Plan information

94

StatementContext getContext()

95

ParameterMetaData getParameterMetaData()

96

ExplainPlan getExplainPlan() throws SQLException

97

98

// Cost estimation

99

long getEstimatedSize()

100

Cost getCost()

101

long getEstimatedRowsToScan()

102

Long getEstimatedBytesToScan()

103

104

// Plan properties

105

Operation getOperation()

106

boolean useRoundRobinIterator()

107

}

108

```

109

110

### BaseQueryPlan

111

112

Abstract base implementation of QueryPlan with common functionality.

113

114

```java{ .api }

115

public abstract class BaseQueryPlan implements QueryPlan {

116

protected final StatementContext context;

117

protected final FilterableStatement statement;

118

protected final TableRef tableRef;

119

protected final RowProjector projector;

120

121

// Common plan operations

122

public StatementContext getContext()

123

public ParameterMetaData getParameterMetaData()

124

public Cost getCost()

125

public Operation getOperation()

126

127

// Abstract methods for subclasses

128

public abstract ResultIterator iterator() throws SQLException

129

public abstract ExplainPlan getExplainPlan() throws SQLException

130

}

131

```

132

133

**Usage:**

134

```java

135

// Get query plan from compiled statement

136

QueryCompiler compiler = new QueryCompiler();

137

String sql = "SELECT id, name, salary FROM employees WHERE department = ? ORDER BY salary DESC";

138

QueryPlan plan = compiler.compile(sql, context);

139

140

// Examine plan properties

141

long estimatedSize = plan.getEstimatedSize();

142

long estimatedRows = plan.getEstimatedRowsToScan();

143

Cost cost = plan.getCost();

144

Operation operation = plan.getOperation();

145

146

System.out.println("Estimated size: " + estimatedSize + " bytes");

147

System.out.println("Estimated rows: " + estimatedRows);

148

System.out.println("Operation: " + operation);

149

150

// Get explain plan

151

ExplainPlan explainPlan = plan.getExplainPlan();

152

System.out.println("Query plan:");

153

for (String step : explainPlan.getPlanSteps()) {

154

System.out.println(" " + step);

155

}

156

157

// Execute plan

158

try (ResultIterator iterator = plan.iterator()) {

159

Tuple tuple;

160

while ((tuple = iterator.next()) != null) {

161

// Process tuple

162

processTuple(tuple);

163

}

164

}

165

```

166

167

## Result Iteration

168

169

### ResultIterator

170

171

Interface for iterating over query results with support for peeking and closing.

172

173

```java{ .api }

174

public interface ResultIterator extends SQLCloseable {

175

// Iteration methods

176

Tuple next() throws SQLException

177

Tuple peek() throws SQLException

178

179

// Iterator state

180

void close() throws SQLException

181

ExplainPlan getExplainPlan() throws SQLException

182

183

// Aggregation support

184

Aggregators getAggregators()

185

}

186

```

187

188

### TableResultIterator

189

190

Interface for iterating over table scan results with additional scan information.

191

192

```java{ .api }

193

public interface TableResultIterator extends ResultIterator {

194

// Scan information

195

void initScanner() throws SQLException

196

Scan getScan()

197

void setScan(Scan scan)

198

199

// Region information

200

HRegionLocation getRegionLocation()

201

long getReadMetricQueue()

202

long getOverAllQueryMetrics()

203

}

204

```

205

206

### BaseResultIterator

207

208

Abstract base implementation providing common iterator functionality.

209

210

```java{ .api }

211

public abstract class BaseResultIterator implements ResultIterator {

212

// Common iterator operations

213

public Tuple peek() throws SQLException

214

public void close() throws SQLException

215

public ExplainPlan getExplainPlan() throws SQLException

216

217

// Abstract methods

218

public abstract Tuple next() throws SQLException

219

}

220

```

221

222

**Usage:**

223

```java

224

// Basic result iteration

225

QueryPlan plan = getQueryPlan();

226

try (ResultIterator iterator = plan.iterator()) {

227

Tuple tuple;

228

int rowCount = 0;

229

230

while ((tuple = iterator.next()) != null) {

231

rowCount++;

232

233

// Access column values from tuple

234

ImmutableBytesWritable ptr = new ImmutableBytesWritable();

235

236

// Get first column value

237

tuple.getKey(ptr, 0);

238

Object value = PVarchar.INSTANCE.toObject(ptr);

239

System.out.println("Column 0: " + value);

240

241

// Peek at next tuple without consuming it

242

Tuple nextTuple = iterator.peek();

243

if (nextTuple != null) {

244

System.out.println("Next tuple available");

245

}

246

}

247

248

System.out.println("Processed " + rowCount + " rows");

249

}

250

251

// Table scan iteration with scan details

252

if (iterator instanceof TableResultIterator) {

253

TableResultIterator tableIter = (TableResultIterator) iterator;

254

Scan scan = tableIter.getScan();

255

HRegionLocation region = tableIter.getRegionLocation();

256

257

System.out.println("Scan start row: " + Bytes.toString(scan.getStartRow()));

258

System.out.println("Scan stop row: " + Bytes.toString(scan.getStopRow()));

259

System.out.println("Region: " + region.getRegionInfo().getRegionNameAsString());

260

}

261

```

262

263

### ParallelIteratorFactory

264

265

Factory for creating parallel result iterators for distributed query execution.

266

267

```java{ .api }

268

public interface ParallelIteratorFactory {

269

// Iterator creation

270

ResultIterator newIterator(StatementContext context, ResultIterator iterator,

271

Scan scan, String tableName) throws SQLException

272

273

// Parallel execution support

274

List<PeekingResultIterator> getIterators() throws SQLException

275

void submitWork(Callable<Boolean> callable) throws SQLException

276

}

277

```

278

279

**Usage:**

280

```java

281

// Create parallel iterators for distributed execution

282

ParallelIteratorFactory factory = getParallelIteratorFactory();

283

List<PeekingResultIterator> parallelIterators = factory.getIterators();

284

285

System.out.println("Created " + parallelIterators.size() + " parallel iterators");

286

287

// Process results from multiple iterators

288

ExecutorService executor = Executors.newFixedThreadPool(parallelIterators.size());

289

List<Future<Integer>> futures = new ArrayList<>();

290

291

for (PeekingResultIterator iter : parallelIterators) {

292

Future<Integer> future = executor.submit(() -> {

293

int count = 0;

294

try (ResultIterator iterator = iter) {

295

Tuple tuple;

296

while ((tuple = iterator.next()) != null) {

297

count++;

298

// Process tuple

299

}

300

}

301

return count;

302

});

303

futures.add(future);

304

}

305

306

// Collect results

307

int totalRows = 0;

308

for (Future<Integer> future : futures) {

309

totalRows += future.get();

310

}

311

312

System.out.println("Total rows processed: " + totalRows);

313

executor.shutdown();

314

```

315

316

## Tuple Processing

317

318

### Tuple

319

320

Interface representing a row of data with key-value access methods.

321

322

```java{ .api }

323

public interface Tuple {

324

// Key access

325

void getKey(ImmutableBytesWritable ptr)

326

void getKey(ImmutableBytesWritable ptr, int position)

327

328

// Value access

329

boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)

330

Cell getValue(byte[] family, byte[] qualifier)

331

332

// Tuple properties

333

boolean isImmutable()

334

int size()

335

KeyValue getValue(int index)

336

337

// Comparison

338

int compareTo(Tuple other)

339

}

340

```

341

342

### KeyValueTuple

343

344

Implementation of Tuple backed by HBase KeyValue objects.

345

346

```java{ .api }

347

public class KeyValueTuple implements Tuple {

348

public KeyValueTuple(KeyValue keyValue)

349

public KeyValueTuple(List<KeyValue> keyValues)

350

351

// Tuple implementation

352

public void getKey(ImmutableBytesWritable ptr)

353

public boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr)

354

public int size()

355

public KeyValue getValue(int index)

356

}

357

```

358

359

**Usage:**

360

```java

361

// Process tuples from result iterator

362

try (ResultIterator iterator = plan.iterator()) {

363

Tuple tuple;

364

while ((tuple = iterator.next()) != null) {

365

// Access row key

366

ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();

367

tuple.getKey(keyPtr);

368

String rowKey = Bytes.toString(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());

369

370

// Access specific column values

371

ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();

372

byte[] family = Bytes.toBytes("cf");

373

byte[] qualifier = Bytes.toBytes("name");

374

375

if (tuple.getValue(family, qualifier, valuePtr)) {

376

String name = Bytes.toString(valuePtr.get(), valuePtr.getOffset(), valuePtr.getLength());

377

System.out.println("Row " + rowKey + ", Name: " + name);

378

}

379

380

// Access tuple properties

381

int tupleSize = tuple.size();

382

boolean immutable = tuple.isImmutable();

383

384

// Access by index

385

for (int i = 0; i < tupleSize; i++) {

386

KeyValue kv = tuple.getValue(i);

387

if (kv != null) {

388

String qual = Bytes.toString(kv.getQualifierArray(),

389

kv.getQualifierOffset(),

390

kv.getQualifierLength());

391

String value = Bytes.toString(kv.getValueArray(),

392

kv.getValueOffset(),

393

kv.getValueLength());

394

System.out.println(" " + qual + ": " + value);

395

}

396

}

397

}

398

}

399

```

400

401

## Execution Context

402

403

### StatementContext

404

405

Maintains execution context and state during query processing.

406

407

```java{ .api }

408

public class StatementContext {

409

// Context information

410

public PhoenixConnection getConnection()

411

public ColumnResolver getResolver()

412

public Scan getScan()

413

public long getCurrentTime()

414

415

// Execution state

416

public SequenceManager getSequenceManager()

417

public TupleProjector getTupleProjector()

418

public GroupBy getGroupBy()

419

public OrderBy getOrderBy()

420

421

// Metrics and monitoring

422

public ReadMetricQueue getReadMetricsQueue()

423

public WriteMetricQueue getWriteMetricsQueue()

424

public OverAllQueryMetrics getOverallQueryMetrics()

425

426

// Execution configuration

427

public int getPageSize()

428

public Integer getLimit()

429

public Integer getOffset()

430

}

431

```

432

433

**Usage:**

434

```java

435

// Access execution context

436

QueryPlan plan = getQueryPlan();

437

StatementContext context = plan.getContext();

438

439

// Get connection and scan information

440

PhoenixConnection connection = context.getConnection();

441

Scan scan = context.getScan();

442

443

// Configure scan properties

444

scan.setCaching(1000); // Set row caching

445

scan.setBatch(100); // Set batch size

446

scan.setMaxVersions(1); // Only get latest version

447

448

// Access execution limits

449

Integer limit = context.getLimit();

450

Integer offset = context.getOffset();

451

int pageSize = context.getPageSize();

452

453

System.out.println("Query limit: " + limit);

454

System.out.println("Query offset: " + offset);

455

System.out.println("Page size: " + pageSize);

456

457

// Access metrics

458

ReadMetricQueue readMetrics = context.getReadMetricsQueue();

459

WriteMetricQueue writeMetrics = context.getWriteMetricsQueue();

460

461

System.out.println("Read metrics queue size: " + readMetrics.size());

462

System.out.println("Write metrics queue size: " + writeMetrics.size());

463

```

464

465

## Advanced Execution Features

466

467

### Parallel Query Execution

468

469

```java

470

// Configure parallel execution

471

public class ParallelQueryExecutor {

472

private final int parallelism;

473

private final ExecutorService executorService;

474

475

public ParallelQueryExecutor(int parallelism) {

476

this.parallelism = parallelism;

477

this.executorService = Executors.newFixedThreadPool(parallelism);

478

}

479

480

public List<Future<QueryResult>> executeParallel(List<QueryPlan> plans) {

481

List<Future<QueryResult>> futures = new ArrayList<>();

482

483

for (QueryPlan plan : plans) {

484

Future<QueryResult> future = executorService.submit(() -> {

485

List<Tuple> results = new ArrayList<>();

486

try (ResultIterator iterator = plan.iterator()) {

487

Tuple tuple;

488

while ((tuple = iterator.next()) != null) {

489

results.add(tuple);

490

}

491

}

492

return new QueryResult(results, plan.getEstimatedSize());

493

});

494

futures.add(future);

495

}

496

497

return futures;

498

}

499

}

500

501

// Usage

502

ParallelQueryExecutor executor = new ParallelQueryExecutor(4);

503

List<QueryPlan> parallelPlans = createParallelPlans(originalPlan);

504

List<Future<QueryResult>> futures = executor.executeParallel(parallelPlans);

505

506

// Collect results

507

List<Tuple> allResults = new ArrayList<>();

508

for (Future<QueryResult> future : futures) {

509

QueryResult result = future.get();

510

allResults.addAll(result.getTuples());

511

}

512

513

System.out.println("Parallel execution completed, total results: " + allResults.size());

514

```

515

516

### Query Optimization and Caching

517

518

```java

519

// Query result caching

520

public class QueryCache {

521

private final Map<String, CachedResult> cache = new ConcurrentHashMap<>();

522

private final long maxAge = 300000; // 5 minutes

523

524

public CachedResult getCachedResult(String queryKey) {

525

CachedResult cached = cache.get(queryKey);

526

if (cached != null && System.currentTimeMillis() - cached.getTimestamp() < maxAge) {

527

return cached;

528

}

529

cache.remove(queryKey);

530

return null;

531

}

532

533

public void cacheResult(String queryKey, List<Tuple> results) {

534

cache.put(queryKey, new CachedResult(results, System.currentTimeMillis()));

535

}

536

}

537

538

// Usage with query execution

539

QueryCache queryCache = new QueryCache();

540

String queryKey = generateQueryKey(sql, parameters);

541

542

// Check cache first

543

CachedResult cached = queryCache.getCachedResult(queryKey);

544

if (cached != null) {

545

System.out.println("Using cached results");

546

return cached.getResults();

547

}

548

549

// Execute query if not cached

550

List<Tuple> results = new ArrayList<>();

551

try (ResultIterator iterator = plan.iterator()) {

552

Tuple tuple;

553

while ((tuple = iterator.next()) != null) {

554

results.add(tuple);

555

}

556

}

557

558

// Cache results for future queries

559

queryCache.cacheResult(queryKey, results);

560

return results;

561

```

562

563

### Custom Result Processing

564

565

```java

566

// Custom result processor with aggregation

567

public class ResultProcessor {

568

public ProcessedResult processResults(ResultIterator iterator,

569

List<Expression> aggregateExpressions) throws SQLException {

570

Map<String, Object> aggregatedValues = new HashMap<>();

571

List<ProcessedRow> processedRows = new ArrayList<>();

572

int totalRows = 0;

573

574

// Initialize aggregators

575

Map<String, Aggregator> aggregators = new HashMap<>();

576

for (Expression expr : aggregateExpressions) {

577

if (expr instanceof AggregateFunction) {

578

AggregateFunction aggFunc = (AggregateFunction) expr;

579

aggregators.put(expr.toString(), aggFunc.newAggregator());

580

}

581

}

582

583

// Process each row

584

Tuple tuple;

585

while ((tuple = iterator.next()) != null) {

586

totalRows++;

587

588

// Create processed row

589

ProcessedRow row = new ProcessedRow();

590

ImmutableBytesWritable ptr = new ImmutableBytesWritable();

591

592

// Extract values from tuple

593

for (int i = 0; i < tuple.size(); i++) {

594

tuple.getKey(ptr, i);

595

if (ptr.getLength() > 0) {

596

String value = Bytes.toString(ptr.get(), ptr.getOffset(), ptr.getLength());

597

row.addValue("column_" + i, value);

598

}

599

}

600

601

processedRows.add(row);

602

603

// Update aggregators

604

for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {

605

entry.getValue().aggregate(tuple, ptr);

606

}

607

}

608

609

// Finalize aggregates

610

ImmutableBytesWritable result = new ImmutableBytesWritable();

611

for (Map.Entry<String, Aggregator> entry : aggregators.entrySet()) {

612

if (entry.getValue().evaluate(null, result)) {

613

Object aggregatedValue = PVarchar.INSTANCE.toObject(result);

614

aggregatedValues.put(entry.getKey(), aggregatedValue);

615

}

616

}

617

618

return new ProcessedResult(processedRows, aggregatedValues, totalRows);

619

}

620

}

621

622

// Usage

623

ResultProcessor processor = new ResultProcessor();

624

List<Expression> aggregateExprs = Arrays.asList(

625

new CountAggregateFunction(Arrays.asList(LiteralExpression.newConstant(1))),

626

new SumAggregateFunction(Arrays.asList(salaryColumn))

627

);

628

629

try (ResultIterator iterator = plan.iterator()) {

630

ProcessedResult result = processor.processResults(iterator, aggregateExprs);

631

632

System.out.println("Total rows: " + result.getTotalRows());

633

System.out.println("Processed rows: " + result.getProcessedRows().size());

634

635

Map<String, Object> aggregates = result.getAggregatedValues();

636

for (Map.Entry<String, Object> entry : aggregates.entrySet()) {

637

System.out.println("Aggregate " + entry.getKey() + ": " + entry.getValue());

638

}

639

}

640

```

641

642

### Execution Monitoring

643

644

```java

645

// Monitor query execution

646

public class ExecutionMonitor {

647

public void monitorExecution(QueryPlan plan) throws SQLException {

648

StatementContext context = plan.getContext();

649

long startTime = System.currentTimeMillis();

650

651

System.out.println("=== Query Execution Monitor ===");

652

System.out.println("Start time: " + new Date(startTime));

653

System.out.println("Estimated rows: " + plan.getEstimatedRowsToScan());

654

System.out.println("Estimated bytes: " + plan.getEstimatedBytesToScan());

655

656

int rowCount = 0;

657

try (ResultIterator iterator = plan.iterator()) {

658

Tuple tuple;

659

while ((tuple = iterator.next()) != null) {

660

rowCount++;

661

662

// Log progress every 1000 rows

663

if (rowCount % 1000 == 0) {

664

long elapsed = System.currentTimeMillis() - startTime;

665

double rowsPerSecond = (double) rowCount / (elapsed / 1000.0);

666

System.out.println("Processed " + rowCount + " rows, " +

667

String.format("%.2f", rowsPerSecond) + " rows/sec");

668

}

669

}

670

671

// Final statistics

672

long totalTime = System.currentTimeMillis() - startTime;

673

double avgRowsPerSecond = (double) rowCount / (totalTime / 1000.0);

674

675

System.out.println("=== Execution Complete ===");

676

System.out.println("Total rows: " + rowCount);

677

System.out.println("Total time: " + totalTime + "ms");

678

System.out.println("Average rate: " + String.format("%.2f", avgRowsPerSecond) + " rows/sec");

679

680

// Access execution metrics if available

681

ReadMetricQueue readMetrics = context.getReadMetricsQueue();

682

if (readMetrics != null) {

683

System.out.println("Read operations: " + readMetrics.size());

684

}

685

}

686

}

687

}

688

689

// Usage

690

ExecutionMonitor monitor = new ExecutionMonitor();

691

monitor.monitorExecution(queryPlan);

692

```