or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

sql-engine.mddocs/

0

# SQL Engine Support

1

2

SQL engine integration for advanced query processing, optimization, and relational operations in CDAP ETL pipelines with support for multiple execution engines.

3

4

## Core SQL Engine Interfaces

5

6

### SQLEngine

7

8

Base interface for SQL engines providing core query processing capabilities.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api.engine.sql;

12

13

public interface SQLEngine {

14

/**

15

* Check if join operation is supported.

16

*/

17

boolean canJoin(SQLJoinDefinition joinDefinition);

18

19

/**

20

* Execute join operation.

21

*/

22

SQLDataset join(SQLJoinRequest joinRequest);

23

24

/**

25

* Check if transform operation is supported.

26

*/

27

boolean canTransform(SQLTransformDefinition transformDefinition);

28

29

/**

30

* Execute transform operation.

31

*/

32

SQLDataset transform(SQLTransformRequest transformRequest);

33

34

/**

35

* Check if dataset exists.

36

*/

37

boolean exists(String datasetName);

38

39

/**

40

* Read data from dataset.

41

*/

42

SQLDataset read(SQLReadRequest readRequest);

43

44

/**

45

* Write data to dataset.

46

*/

47

SQLWriteResult write(SQLWriteRequest writeRequest);

48

49

/**

50

* Clean up dataset.

51

*/

52

void cleanup(String datasetName);

53

}

54

```

55

56

### BatchSQLEngine

57

58

SQL engine specifically for batch processing operations.

59

60

```java { .api }

61

package io.cdap.cdap.etl.api.engine.sql;

62

63

public interface BatchSQLEngine extends SQLEngine {

64

// Inherits all SQLEngine capabilities for batch operations

65

// Additional batch-specific optimizations may be added

66

}

67

```

68

69

**SQL Engine Implementation Example:**

70

```java

71

public class SparkSQLEngine implements BatchSQLEngine {

72

73

private final SparkSession sparkSession;

74

private final Map<String, Dataset<Row>> registeredDatasets;

75

76

public SparkSQLEngine(SparkSession sparkSession) {

77

this.sparkSession = sparkSession;

78

this.registeredDatasets = new HashMap<>();

79

}

80

81

@Override

82

public boolean canJoin(SQLJoinDefinition joinDefinition) {

83

// Check if all required datasets are available

84

for (String datasetName : joinDefinition.getDatasetNames()) {

85

if (!exists(datasetName)) {

86

return false;

87

}

88

}

89

90

// Check if join type is supported

91

JoinType joinType = joinDefinition.getJoinType();

92

return joinType == JoinType.INNER ||

93

joinType == JoinType.LEFT_OUTER ||

94

joinType == JoinType.RIGHT_OUTER ||

95

joinType == JoinType.FULL_OUTER;

96

}

97

98

@Override

99

public SQLDataset join(SQLJoinRequest joinRequest) {

100

SQLJoinDefinition joinDef = joinRequest.getJoinDefinition();

101

String outputDatasetName = joinRequest.getOutputDatasetName();

102

103

// Build SQL join query

104

StringBuilder query = new StringBuilder();

105

query.append("SELECT ");

106

107

// Add selected fields

108

List<String> selectedFields = joinDef.getSelectedFields();

109

if (selectedFields.isEmpty()) {

110

query.append("*");

111

} else {

112

query.append(String.join(", ", selectedFields));

113

}

114

115

// Add FROM clause with main dataset

116

String mainDataset = joinDef.getMainDataset();

117

query.append(" FROM ").append(mainDataset);

118

119

// Add join clauses

120

for (JoinClause joinClause : joinDef.getJoinClauses()) {

121

query.append(" ").append(joinClause.getJoinType().toString());

122

query.append(" JOIN ").append(joinClause.getDatasetName());

123

query.append(" ON ").append(joinClause.getOnCondition());

124

}

125

126

// Add WHERE clause if specified

127

if (joinDef.getWhereCondition() != null) {

128

query.append(" WHERE ").append(joinDef.getWhereCondition());

129

}

130

131

// Execute query

132

Dataset<Row> resultDataset = sparkSession.sql(query.toString());

133

134

// Register result dataset

135

resultDataset.createOrReplaceTempView(outputDatasetName);

136

registeredDatasets.put(outputDatasetName, resultDataset);

137

138

return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);

139

}

140

141

@Override

142

public boolean canTransform(SQLTransformDefinition transformDefinition) {

143

String inputDataset = transformDefinition.getInputDataset();

144

String sqlQuery = transformDefinition.getSqlQuery();

145

146

// Check if input dataset exists

147

if (!exists(inputDataset)) {

148

return false;

149

}

150

151

// Validate SQL query syntax

152

try {

153

sparkSession.sql("EXPLAIN " + sqlQuery);

154

return true;

155

} catch (Exception e) {

156

return false;

157

}

158

}

159

160

@Override

161

public SQLDataset transform(SQLTransformRequest transformRequest) {

162

SQLTransformDefinition transformDef = transformRequest.getTransformDefinition();

163

String outputDatasetName = transformRequest.getOutputDatasetName();

164

String sqlQuery = transformDef.getSqlQuery();

165

166

// Execute transformation query

167

Dataset<Row> resultDataset = sparkSession.sql(sqlQuery);

168

169

// Register result dataset

170

resultDataset.createOrReplaceTempView(outputDatasetName);

171

registeredDatasets.put(outputDatasetName, resultDataset);

172

173

return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);

174

}

175

176

@Override

177

public boolean exists(String datasetName) {

178

return registeredDatasets.containsKey(datasetName) ||

179

sparkSession.catalog().tableExists(datasetName);

180

}

181

182

@Override

183

public SQLDataset read(SQLReadRequest readRequest) {

184

String datasetName = readRequest.getDatasetName();

185

186

if (registeredDatasets.containsKey(datasetName)) {

187

Dataset<Row> dataset = registeredDatasets.get(datasetName);

188

return new SparkSQLDataset(datasetName, dataset.schema(), dataset);

189

}

190

191

// Try reading as table

192

try {

193

Dataset<Row> dataset = sparkSession.table(datasetName);

194

return new SparkSQLDataset(datasetName, dataset.schema(), dataset);

195

} catch (Exception e) {

196

throw new SQLEngineException("Failed to read dataset: " + datasetName, e);

197

}

198

}

199

200

@Override

201

public SQLWriteResult write(SQLWriteRequest writeRequest) {

202

String datasetName = writeRequest.getDatasetName();

203

SQLDataset sqlDataset = writeRequest.getDataset();

204

205

if (sqlDataset instanceof SparkSQLDataset) {

206

SparkSQLDataset sparkDataset = (SparkSQLDataset) sqlDataset;

207

Dataset<Row> dataset = sparkDataset.getSparkDataset();

208

209

// Write dataset based on format

210

WriteMode writeMode = writeRequest.getWriteMode();

211

switch (writeMode) {

212

case OVERWRITE:

213

dataset.write().mode("overwrite").saveAsTable(datasetName);

214

break;

215

case APPEND:

216

dataset.write().mode("append").saveAsTable(datasetName);

217

break;

218

case ERROR_IF_EXISTS:

219

dataset.write().mode("errorifexists").saveAsTable(datasetName);

220

break;

221

default:

222

throw new SQLEngineException("Unsupported write mode: " + writeMode);

223

}

224

225

return new SQLWriteResult(datasetName, dataset.count());

226

}

227

228

throw new SQLEngineException("Unsupported dataset type: " +

229

sqlDataset.getClass().getName());

230

}

231

232

@Override

233

public void cleanup(String datasetName) {

234

registeredDatasets.remove(datasetName);

235

236

try {

237

sparkSession.catalog().dropTempView(datasetName);

238

} catch (Exception e) {

239

// Ignore cleanup errors

240

}

241

}

242

}

243

```

244

245

## SQL Engine Input/Output

246

247

### SQLEngineInput

248

249

Interface for SQL engine input operations.

250

251

```java { .api }

252

package io.cdap.cdap.etl.api.engine.sql;

253

254

public interface SQLEngineInput {

255

/**

256

* Get input dataset name.

257

*/

258

String getDatasetName();

259

260

/**

261

* Get input schema.

262

*/

263

Schema getSchema();

264

}

265

```

266

267

### SQLEngineOutput

268

269

Interface for SQL engine output operations.

270

271

```java { .api }

272

package io.cdap.cdap.etl.api.engine.sql;

273

274

public interface SQLEngineOutput {

275

/**

276

* Get output dataset name.

277

*/

278

String getDatasetName();

279

280

/**

281

* Get output schema.

282

*/

283

Schema getSchema();

284

}

285

```

286

287

## SQL Capabilities

288

289

### StandardSQLCapabilities

290

291

Interface defining standard SQL capabilities.

292

293

```java { .api }

294

package io.cdap.cdap.etl.api.engine.sql;

295

296

public interface StandardSQLCapabilities {

297

/**

298

* Check if SELECT operations are supported.

299

*/

300

boolean supportsSelect();

301

302

/**

303

* Check if JOIN operations are supported.

304

*/

305

boolean supportsJoin();

306

307

/**

308

* Check if GROUP BY operations are supported.

309

*/

310

boolean supportsGroupBy();

311

312

/**

313

* Check if window functions are supported.

314

*/

315

boolean supportsWindowFunctions();

316

317

/**

318

* Check if subqueries are supported.

319

*/

320

boolean supportsSubqueries();

321

}

322

```

323

324

## SQL Engine Capabilities

325

326

### Push and Pull Capabilities

327

328

#### PullCapability

329

330

Capability to pull data from external sources.

331

332

```java { .api }

333

package io.cdap.cdap.etl.api.engine.sql.capability;

334

335

public interface PullCapability {

336

/**

337

* Check if source can be pulled into SQL engine.

338

*/

339

boolean canPull(String sourceType);

340

341

/**

342

* Pull data from external source.

343

*/

344

SQLDataset pull(PullRequest pullRequest);

345

}

346

```

347

348

#### PushCapability

349

350

Capability to push data to external sinks.

351

352

```java { .api }

353

package io.cdap.cdap.etl.api.engine.sql.capability;

354

355

public interface PushCapability {

356

/**

357

* Check if data can be pushed to sink.

358

*/

359

boolean canPush(String sinkType);

360

361

/**

362

* Push data to external sink.

363

*/

364

PushResult push(PushRequest pushRequest);

365

}

366

```

367

368

#### DefaultPullCapability

369

370

Default implementation of pull capability.

371

372

```java { .api }

373

package io.cdap.cdap.etl.api.engine.sql.capability;

374

375

public class DefaultPullCapability implements PullCapability {

376

@Override

377

public boolean canPull(String sourceType) {

378

// Default implementation - check supported source types

379

return Arrays.asList("jdbc", "file", "kafka").contains(sourceType.toLowerCase());

380

}

381

382

@Override

383

public SQLDataset pull(PullRequest pullRequest) {

384

// Default pull implementation

385

throw new UnsupportedOperationException("Pull not implemented");

386

}

387

}

388

```

389

390

#### DefaultPushCapability

391

392

Default implementation of push capability.

393

394

```java { .api }

395

package io.cdap.cdap.etl.api.engine.sql.capability;

396

397

public class DefaultPushCapability implements PushCapability {

398

@Override

399

public boolean canPush(String sinkType) {

400

// Default implementation - check supported sink types

401

return Arrays.asList("jdbc", "file", "elasticsearch").contains(sinkType.toLowerCase());

402

}

403

404

@Override

405

public PushResult push(PushRequest pushRequest) {

406

// Default push implementation

407

throw new UnsupportedOperationException("Push not implemented");

408

}

409

}

410

```

411

412

## SQL Datasets

413

414

### SQLDataset

415

416

Base interface for SQL datasets.

417

418

```java { .api }

419

package io.cdap.cdap.etl.api.engine.sql.dataset;

420

421

public interface SQLDataset {

422

/**

423

* Get dataset name.

424

*/

425

String getDatasetName();

426

427

/**

428

* Get dataset schema.

429

*/

430

Schema getSchema();

431

}

432

```

433

434

### SQLDatasetConsumer

435

436

Consumer interface for SQL datasets.

437

438

```java { .api }

439

package io.cdap.cdap.etl.api.engine.sql.dataset;

440

441

public interface SQLDatasetConsumer extends SQLDataset {

442

/**

443

* Consume data from the dataset.

444

*/

445

Iterator<StructuredRecord> consume();

446

}

447

```

448

449

### SQLDatasetProducer

450

451

Producer interface for SQL datasets.

452

453

```java { .api }

454

package io.cdap.cdap.etl.api.engine.sql.dataset;

455

456

public interface SQLDatasetProducer extends SQLDataset {

457

/**

458

* Produce data to the dataset.

459

*/

460

void produce(Iterator<StructuredRecord> records);

461

}

462

```

463

464

### SQLPullDataset

465

466

Dataset that can be pulled from external source.

467

468

```java { .api }

469

package io.cdap.cdap.etl.api.engine.sql.dataset;

470

471

public interface SQLPullDataset extends SQLDatasetConsumer {

472

/**

473

* Pull data from external source.

474

*/

475

void pull();

476

477

/**

478

* Get pull statistics.

479

*/

480

PullStatistics getPullStatistics();

481

}

482

```

483

484

### SQLPushDataset

485

486

Dataset that can be pushed to external sink.

487

488

```java { .api }

489

package io.cdap.cdap.etl.api.engine.sql.dataset;

490

491

public interface SQLPushDataset extends SQLDatasetProducer {

492

/**

493

* Push data to external sink.

494

*/

495

void push();

496

497

/**

498

* Get push statistics.

499

*/

500

PushStatistics getPushStatistics();

501

}

502

```

503

504

### SQLDatasetDescription

505

506

Description of SQL dataset with metadata.

507

508

```java { .api }

509

package io.cdap.cdap.etl.api.engine.sql.dataset;

510

511

public class SQLDatasetDescription {

512

/**

513

* Create dataset description.

514

*/

515

public SQLDatasetDescription(String name, Schema schema, Map<String, String> properties) {}

516

517

/**

518

* Get dataset name.

519

*/

520

public String getName() {}

521

522

/**

523

* Get dataset schema.

524

*/

525

public Schema getSchema() {}

526

527

/**

528

* Get dataset properties.

529

*/

530

public Map<String, String> getProperties() {}

531

}

532

```

533

534

### RecordCollection

535

536

Collection interface for records.

537

538

```java { .api }

539

package io.cdap.cdap.etl.api.engine.sql.dataset;

540

541

public interface RecordCollection extends Iterable<StructuredRecord> {

542

/**

543

* Get collection size.

544

*/

545

long size();

546

547

/**

548

* Check if collection is empty.

549

*/

550

boolean isEmpty();

551

}

552

```

553

554

## SQL Requests

555

556

### SQLJoinRequest

557

558

Request object for SQL join operations.

559

560

```java { .api }

561

package io.cdap.cdap.etl.api.engine.sql.request;

562

563

public class SQLJoinRequest {

564

/**

565

* Create join request.

566

*/

567

public SQLJoinRequest(SQLJoinDefinition joinDefinition, String outputDatasetName) {}

568

569

/**

570

* Get join definition.

571

*/

572

public SQLJoinDefinition getJoinDefinition() {}

573

574

/**

575

* Get output dataset name.

576

*/

577

public String getOutputDatasetName() {}

578

}

579

```

580

581

### SQLJoinDefinition

582

583

Definition for SQL join operations.

584

585

```java { .api }

586

package io.cdap.cdap.etl.api.engine.sql.request;

587

588

public class SQLJoinDefinition {

589

/**

590

* Get participating datasets.

591

*/

592

public List<String> getDatasetNames() {}

593

594

/**

595

* Get join type.

596

*/

597

public JoinType getJoinType() {}

598

599

/**

600

* Get selected fields.

601

*/

602

public List<String> getSelectedFields() {}

603

604

/**

605

* Get join conditions.

606

*/

607

public List<JoinClause> getJoinClauses() {}

608

}

609

```

610

611

### SQLTransformRequest

612

613

Request object for SQL transformation operations.

614

615

```java { .api }

616

package io.cdap.cdap.etl.api.engine.sql.request;

617

618

public class SQLTransformRequest {

619

/**

620

* Create transform request.

621

*/

622

public SQLTransformRequest(SQLTransformDefinition transformDefinition, String outputDatasetName) {}

623

624

/**

625

* Get transform definition.

626

*/

627

public SQLTransformDefinition getTransformDefinition() {}

628

629

/**

630

* Get output dataset name.

631

*/

632

public String getOutputDatasetName() {}

633

}

634

```

635

636

### SQLTransformDefinition

637

638

Definition for SQL transformations.

639

640

```java { .api }

641

package io.cdap.cdap.etl.api.engine.sql.request;

642

643

public class SQLTransformDefinition {

644

/**

645

* Create transform definition.

646

*/

647

public SQLTransformDefinition(String inputDataset, String sqlQuery) {}

648

649

/**

650

* Get input dataset name.

651

*/

652

public String getInputDataset() {}

653

654

/**

655

* Get SQL query for transformation.

656

*/

657

public String getSqlQuery() {}

658

}

659

```

660

661

### SQLReadRequest

662

663

Request object for SQL read operations.

664

665

```java { .api }

666

package io.cdap.cdap.etl.api.engine.sql.request;

667

668

public class SQLReadRequest {

669

/**

670

* Create read request.

671

*/

672

public SQLReadRequest(String datasetName) {}

673

674

/**

675

* Get dataset name to read.

676

*/

677

public String getDatasetName() {}

678

}

679

```

680

681

### SQLWriteRequest

682

683

Request object for SQL write operations.

684

685

```java { .api }

686

package io.cdap.cdap.etl.api.engine.sql.request;

687

688

public class SQLWriteRequest {

689

/**

690

* Create write request.

691

*/

692

public SQLWriteRequest(String datasetName, SQLDataset dataset, WriteMode writeMode) {}

693

694

/**

695

* Get target dataset name.

696

*/

697

public String getDatasetName() {}

698

699

/**

700

* Get source dataset.

701

*/

702

public SQLDataset getDataset() {}

703

704

/**

705

* Get write mode.

706

*/

707

public WriteMode getWriteMode() {}

708

}

709

```

710

711

### SQL Pull and Push Requests

712

713

#### SQLPullRequest

714

715

Request for SQL pull operations.

716

717

```java { .api }

718

package io.cdap.cdap.etl.api.engine.sql.request;

719

720

public class SQLPullRequest {

721

/**

722

* Create pull request.

723

*/

724

public SQLPullRequest(String sourceType, Map<String, String> sourceProperties) {}

725

726

/**

727

* Get source type.

728

*/

729

public String getSourceType() {}

730

731

/**

732

* Get source properties.

733

*/

734

public Map<String, String> getSourceProperties() {}

735

}

736

```

737

738

#### SQLPushRequest

739

740

Request for SQL push operations.

741

742

```java { .api }

743

package io.cdap.cdap.etl.api.engine.sql.request;

744

745

public class SQLPushRequest {

746

/**

747

* Create push request.

748

*/

749

public SQLPushRequest(SQLDataset dataset, String sinkType,

750

Map<String, String> sinkProperties) {}

751

752

/**

753

* Get dataset to push.

754

*/

755

public SQLDataset getDataset() {}

756

757

/**

758

* Get sink type.

759

*/

760

public String getSinkType() {}

761

762

/**

763

* Get sink properties.

764

*/

765

public Map<String, String> getSinkProperties() {}

766

}

767

```

768

769

## Result Objects

770

771

### SQLReadResult

772

773

Result from SQL read operations.

774

775

```java { .api }

776

package io.cdap.cdap.etl.api.engine.sql.request;

777

778

public class SQLReadResult {

779

/**

780

* Create read result.

781

*/

782

public SQLReadResult(SQLDataset dataset, long recordCount) {}

783

784

/**

785

* Get result dataset.

786

*/

787

public SQLDataset getDataset() {}

788

789

/**

790

* Get number of records read.

791

*/

792

public long getRecordCount() {}

793

}

794

```

795

796

### SQLWriteResult

797

798

Result from SQL write operations.

799

800

```java { .api }

801

package io.cdap.cdap.etl.api.engine.sql.request;

802

803

public class SQLWriteResult {

804

/**

805

* Create write result.

806

*/

807

public SQLWriteResult(String datasetName, long recordCount) {}

808

809

/**

810

* Get dataset name written to.

811

*/

812

public String getDatasetName() {}

813

814

/**

815

* Get number of records written.

816

*/

817

public long getRecordCount() {}

818

}

819

```

820

821

## SQL Engine Exception Handling

822

823

### SQLEngineException

824

825

Exception for SQL engine operations.

826

827

```java { .api }

828

package io.cdap.cdap.etl.api.engine.sql;

829

830

public class SQLEngineException extends Exception {

831

/**

832

* Create SQL engine exception.

833

*/

834

public SQLEngineException(String message) {}

835

836

/**

837

* Create SQL engine exception with cause.

838

*/

839

public SQLEngineException(String message, Throwable cause) {}

840

}

841

```

842

843

## Advanced SQL Engine Usage

844

845

### Complex Query Optimization

846

847

```java

848

public class QueryOptimizer {

849

850

public static SQLTransformDefinition optimizeQuery(SQLTransformDefinition originalDef,

851

Map<String, Statistics> datasetStats) {

852

String originalQuery = originalDef.getSqlQuery();

853

854

// Parse and analyze query

855

QueryAnalysis analysis = analyzeQuery(originalQuery);

856

857

// Apply optimizations based on statistics

858

StringBuilder optimizedQuery = new StringBuilder();

859

860

// Add query hints for join optimization

861

if (analysis.hasJoins()) {

862

optimizedQuery.append("/* + USE_HASH_JOIN */ ");

863

}

864

865

// Optimize WHERE clauses - push down predicates

866

optimizedQuery.append(pushDownPredicates(originalQuery, analysis));

867

868

// Add partitioning hints

869

if (analysis.hasGroupBy()) {

870

optimizedQuery.append(" /* + PARTITION_BY(")

871

.append(String.join(", ", analysis.getGroupByFields()))

872

.append(") */");

873

}

874

875

return new SQLTransformDefinition(originalDef.getInputDataset(),

876

optimizedQuery.toString());

877

}

878

879

private static String pushDownPredicates(String query, QueryAnalysis analysis) {

880

// Implementation to push WHERE clauses closer to data sources

881

// This is a simplified example - real implementation would use SQL parser

882

883

if (analysis.hasSelectivePredicates()) {

884

// Rewrite query to apply filters early

885

return rewriteWithEarlyFilters(query, analysis.getSelectivePredicates());

886

}

887

888

return query;

889

}

890

}

891

```

892

893

### Multi-Engine SQL Processing

894

895

```java

896

public class HybridSQLEngine implements BatchSQLEngine {

897

898

private final BatchSQLEngine primaryEngine;

899

private final BatchSQLEngine fallbackEngine;

900

901

public HybridSQLEngine(BatchSQLEngine primaryEngine, BatchSQLEngine fallbackEngine) {

902

this.primaryEngine = primaryEngine;

903

this.fallbackEngine = fallbackEngine;

904

}

905

906

@Override

907

public boolean canTransform(SQLTransformDefinition transformDefinition) {

908

return primaryEngine.canTransform(transformDefinition) ||

909

fallbackEngine.canTransform(transformDefinition);

910

}

911

912

@Override

913

public SQLDataset transform(SQLTransformRequest transformRequest) {

914

// Try primary engine first

915

if (primaryEngine.canTransform(transformRequest.getTransformDefinition())) {

916

try {

917

return primaryEngine.transform(transformRequest);

918

} catch (Exception e) {

919

// Log warning and fall back

920

logWarning("Primary engine failed, falling back", e);

921

}

922

}

923

924

// Use fallback engine

925

if (fallbackEngine.canTransform(transformRequest.getTransformDefinition())) {

926

return fallbackEngine.transform(transformRequest);

927

}

928

929

throw new SQLEngineException("No suitable engine found for transformation");

930

}

931

932

@Override

933

public SQLDataset join(SQLJoinRequest joinRequest) {

934

// Similar hybrid approach for joins

935

if (primaryEngine.canJoin(joinRequest.getJoinDefinition())) {

936

try {

937

return primaryEngine.join(joinRequest);

938

} catch (Exception e) {

939

logWarning("Primary engine join failed, falling back", e);

940

}

941

}

942

943

if (fallbackEngine.canJoin(joinRequest.getJoinDefinition())) {

944

return fallbackEngine.join(joinRequest);

945

}

946

947

throw new SQLEngineException("No suitable engine found for join");

948

}

949

}

950

```