0
# SQL Engine Support
1
2
SQL engine integration for advanced query processing, optimization, and relational operations in CDAP ETL pipelines with support for multiple execution engines.
3
4
## Core SQL Engine Interfaces
5
6
### SQLEngine
7
8
Base interface for SQL engines providing core query processing capabilities.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api.engine.sql;
12
13
public interface SQLEngine {
14
/**
15
* Check if join operation is supported.
16
*/
17
boolean canJoin(SQLJoinDefinition joinDefinition);
18
19
/**
20
* Execute join operation.
21
*/
22
SQLDataset join(SQLJoinRequest joinRequest);
23
24
/**
25
* Check if transform operation is supported.
26
*/
27
boolean canTransform(SQLTransformDefinition transformDefinition);
28
29
/**
30
* Execute transform operation.
31
*/
32
SQLDataset transform(SQLTransformRequest transformRequest);
33
34
/**
35
* Check if dataset exists.
36
*/
37
boolean exists(String datasetName);
38
39
/**
40
* Read data from dataset.
41
*/
42
SQLDataset read(SQLReadRequest readRequest);
43
44
/**
45
* Write data to dataset.
46
*/
47
SQLWriteResult write(SQLWriteRequest writeRequest);
48
49
/**
50
* Clean up dataset.
51
*/
52
void cleanup(String datasetName);
53
}
54
```
55
56
### BatchSQLEngine
57
58
SQL engine specifically for batch processing operations.
59
60
```java { .api }
61
package io.cdap.cdap.etl.api.engine.sql;
62
63
public interface BatchSQLEngine extends SQLEngine {
64
// Inherits all SQLEngine capabilities for batch operations
65
// Additional batch-specific optimizations may be added
66
}
67
```
68
69
**SQL Engine Implementation Example:**
70
```java
71
public class SparkSQLEngine implements BatchSQLEngine {
72
73
private final SparkSession sparkSession;
74
private final Map<String, Dataset<Row>> registeredDatasets;
75
76
public SparkSQLEngine(SparkSession sparkSession) {
77
this.sparkSession = sparkSession;
78
this.registeredDatasets = new HashMap<>();
79
}
80
81
@Override
82
public boolean canJoin(SQLJoinDefinition joinDefinition) {
83
// Check if all required datasets are available
84
for (String datasetName : joinDefinition.getDatasetNames()) {
85
if (!exists(datasetName)) {
86
return false;
87
}
88
}
89
90
// Check if join type is supported
91
JoinType joinType = joinDefinition.getJoinType();
92
return joinType == JoinType.INNER ||
93
joinType == JoinType.LEFT_OUTER ||
94
joinType == JoinType.RIGHT_OUTER ||
95
joinType == JoinType.FULL_OUTER;
96
}
97
98
@Override
99
public SQLDataset join(SQLJoinRequest joinRequest) {
100
SQLJoinDefinition joinDef = joinRequest.getJoinDefinition();
101
String outputDatasetName = joinRequest.getOutputDatasetName();
102
103
// Build SQL join query
104
StringBuilder query = new StringBuilder();
105
query.append("SELECT ");
106
107
// Add selected fields
108
List<String> selectedFields = joinDef.getSelectedFields();
109
if (selectedFields.isEmpty()) {
110
query.append("*");
111
} else {
112
query.append(String.join(", ", selectedFields));
113
}
114
115
// Add FROM clause with main dataset
116
String mainDataset = joinDef.getMainDataset();
117
query.append(" FROM ").append(mainDataset);
118
119
// Add join clauses
120
for (JoinClause joinClause : joinDef.getJoinClauses()) {
121
query.append(" ").append(joinClause.getJoinType().toString());
122
query.append(" JOIN ").append(joinClause.getDatasetName());
123
query.append(" ON ").append(joinClause.getOnCondition());
124
}
125
126
// Add WHERE clause if specified
127
if (joinDef.getWhereCondition() != null) {
128
query.append(" WHERE ").append(joinDef.getWhereCondition());
129
}
130
131
// Execute query
132
Dataset<Row> resultDataset = sparkSession.sql(query.toString());
133
134
// Register result dataset
135
resultDataset.createOrReplaceTempView(outputDatasetName);
136
registeredDatasets.put(outputDatasetName, resultDataset);
137
138
return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);
139
}
140
141
@Override
142
public boolean canTransform(SQLTransformDefinition transformDefinition) {
143
String inputDataset = transformDefinition.getInputDataset();
144
String sqlQuery = transformDefinition.getSqlQuery();
145
146
// Check if input dataset exists
147
if (!exists(inputDataset)) {
148
return false;
149
}
150
151
// Validate SQL query syntax
152
try {
153
sparkSession.sql("EXPLAIN " + sqlQuery);
154
return true;
155
} catch (Exception e) {
156
return false;
157
}
158
}
159
160
@Override
161
public SQLDataset transform(SQLTransformRequest transformRequest) {
162
SQLTransformDefinition transformDef = transformRequest.getTransformDefinition();
163
String outputDatasetName = transformRequest.getOutputDatasetName();
164
String sqlQuery = transformDef.getSqlQuery();
165
166
// Execute transformation query
167
Dataset<Row> resultDataset = sparkSession.sql(sqlQuery);
168
169
// Register result dataset
170
resultDataset.createOrReplaceTempView(outputDatasetName);
171
registeredDatasets.put(outputDatasetName, resultDataset);
172
173
return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);
174
}
175
176
@Override
177
public boolean exists(String datasetName) {
178
return registeredDatasets.containsKey(datasetName) ||
179
sparkSession.catalog().tableExists(datasetName);
180
}
181
182
@Override
183
public SQLDataset read(SQLReadRequest readRequest) {
184
String datasetName = readRequest.getDatasetName();
185
186
if (registeredDatasets.containsKey(datasetName)) {
187
Dataset<Row> dataset = registeredDatasets.get(datasetName);
188
return new SparkSQLDataset(datasetName, dataset.schema(), dataset);
189
}
190
191
// Try reading as table
192
try {
193
Dataset<Row> dataset = sparkSession.table(datasetName);
194
return new SparkSQLDataset(datasetName, dataset.schema(), dataset);
195
} catch (Exception e) {
196
throw new SQLEngineException("Failed to read dataset: " + datasetName, e);
197
}
198
}
199
200
@Override
201
public SQLWriteResult write(SQLWriteRequest writeRequest) {
202
String datasetName = writeRequest.getDatasetName();
203
SQLDataset sqlDataset = writeRequest.getDataset();
204
205
if (sqlDataset instanceof SparkSQLDataset) {
206
SparkSQLDataset sparkDataset = (SparkSQLDataset) sqlDataset;
207
Dataset<Row> dataset = sparkDataset.getSparkDataset();
208
209
// Write dataset based on format
210
WriteMode writeMode = writeRequest.getWriteMode();
211
switch (writeMode) {
212
case OVERWRITE:
213
dataset.write().mode("overwrite").saveAsTable(datasetName);
214
break;
215
case APPEND:
216
dataset.write().mode("append").saveAsTable(datasetName);
217
break;
218
case ERROR_IF_EXISTS:
219
dataset.write().mode("errorifexists").saveAsTable(datasetName);
220
break;
221
default:
222
throw new SQLEngineException("Unsupported write mode: " + writeMode);
223
}
224
225
return new SQLWriteResult(datasetName, dataset.count());
226
}
227
228
throw new SQLEngineException("Unsupported dataset type: " +
229
sqlDataset.getClass().getName());
230
}
231
232
@Override
233
public void cleanup(String datasetName) {
234
registeredDatasets.remove(datasetName);
235
236
try {
237
sparkSession.catalog().dropTempView(datasetName);
238
} catch (Exception e) {
239
// Ignore cleanup errors
240
}
241
}
242
}
243
```
244
245
## SQL Engine Input/Output
246
247
### SQLEngineInput
248
249
Interface for SQL engine input operations.
250
251
```java { .api }
252
package io.cdap.cdap.etl.api.engine.sql;
253
254
public interface SQLEngineInput {
255
/**
256
* Get input dataset name.
257
*/
258
String getDatasetName();
259
260
/**
261
* Get input schema.
262
*/
263
Schema getSchema();
264
}
265
```
266
267
### SQLEngineOutput
268
269
Interface for SQL engine output operations.
270
271
```java { .api }
272
package io.cdap.cdap.etl.api.engine.sql;
273
274
public interface SQLEngineOutput {
275
/**
276
* Get output dataset name.
277
*/
278
String getDatasetName();
279
280
/**
281
* Get output schema.
282
*/
283
Schema getSchema();
284
}
285
```
286
287
## SQL Capabilities
288
289
### StandardSQLCapabilities
290
291
Interface defining standard SQL capabilities.
292
293
```java { .api }
294
package io.cdap.cdap.etl.api.engine.sql;
295
296
public interface StandardSQLCapabilities {
297
/**
298
* Check if SELECT operations are supported.
299
*/
300
boolean supportsSelect();
301
302
/**
303
* Check if JOIN operations are supported.
304
*/
305
boolean supportsJoin();
306
307
/**
308
* Check if GROUP BY operations are supported.
309
*/
310
boolean supportsGroupBy();
311
312
/**
313
* Check if window functions are supported.
314
*/
315
boolean supportsWindowFunctions();
316
317
/**
318
* Check if subqueries are supported.
319
*/
320
boolean supportsSubqueries();
321
}
322
```
323
324
## SQL Engine Capabilities
325
326
### Push and Pull Capabilities
327
328
#### PullCapability
329
330
Capability to pull data from external sources.
331
332
```java { .api }
333
package io.cdap.cdap.etl.api.engine.sql.capability;
334
335
public interface PullCapability {
336
/**
337
* Check if source can be pulled into SQL engine.
338
*/
339
boolean canPull(String sourceType);
340
341
/**
342
* Pull data from external source.
343
*/
344
SQLDataset pull(PullRequest pullRequest);
345
}
346
```
347
348
#### PushCapability
349
350
Capability to push data to external sinks.
351
352
```java { .api }
353
package io.cdap.cdap.etl.api.engine.sql.capability;
354
355
public interface PushCapability {
356
/**
357
* Check if data can be pushed to sink.
358
*/
359
boolean canPush(String sinkType);
360
361
/**
362
* Push data to external sink.
363
*/
364
PushResult push(PushRequest pushRequest);
365
}
366
```
367
368
#### DefaultPullCapability
369
370
Default implementation of pull capability.
371
372
```java { .api }
373
package io.cdap.cdap.etl.api.engine.sql.capability;
374
375
public class DefaultPullCapability implements PullCapability {
376
@Override
377
public boolean canPull(String sourceType) {
378
// Default implementation - check supported source types
379
return Arrays.asList("jdbc", "file", "kafka").contains(sourceType.toLowerCase());
380
}
381
382
@Override
383
public SQLDataset pull(PullRequest pullRequest) {
384
// Default pull implementation
385
throw new UnsupportedOperationException("Pull not implemented");
386
}
387
}
388
```
389
390
#### DefaultPushCapability
391
392
Default implementation of push capability.
393
394
```java { .api }
395
package io.cdap.cdap.etl.api.engine.sql.capability;
396
397
public class DefaultPushCapability implements PushCapability {
398
@Override
399
public boolean canPush(String sinkType) {
400
// Default implementation - check supported sink types
401
return Arrays.asList("jdbc", "file", "elasticsearch").contains(sinkType.toLowerCase());
402
}
403
404
@Override
405
public PushResult push(PushRequest pushRequest) {
406
// Default push implementation
407
throw new UnsupportedOperationException("Push not implemented");
408
}
409
}
410
```
411
412
## SQL Datasets
413
414
### SQLDataset
415
416
Base interface for SQL datasets.
417
418
```java { .api }
419
package io.cdap.cdap.etl.api.engine.sql.dataset;
420
421
public interface SQLDataset {
422
/**
423
* Get dataset name.
424
*/
425
String getDatasetName();
426
427
/**
428
* Get dataset schema.
429
*/
430
Schema getSchema();
431
}
432
```
433
434
### SQLDatasetConsumer
435
436
Consumer interface for SQL datasets.
437
438
```java { .api }
439
package io.cdap.cdap.etl.api.engine.sql.dataset;
440
441
public interface SQLDatasetConsumer extends SQLDataset {
442
/**
443
* Consume data from the dataset.
444
*/
445
Iterator<StructuredRecord> consume();
446
}
447
```
448
449
### SQLDatasetProducer
450
451
Producer interface for SQL datasets.
452
453
```java { .api }
454
package io.cdap.cdap.etl.api.engine.sql.dataset;
455
456
public interface SQLDatasetProducer extends SQLDataset {
457
/**
458
* Produce data to the dataset.
459
*/
460
void produce(Iterator<StructuredRecord> records);
461
}
462
```
463
464
### SQLPullDataset
465
466
Dataset that can be pulled from external source.
467
468
```java { .api }
469
package io.cdap.cdap.etl.api.engine.sql.dataset;
470
471
public interface SQLPullDataset extends SQLDatasetConsumer {
472
/**
473
* Pull data from external source.
474
*/
475
void pull();
476
477
/**
478
* Get pull statistics.
479
*/
480
PullStatistics getPullStatistics();
481
}
482
```
483
484
### SQLPushDataset
485
486
Dataset that can be pushed to external sink.
487
488
```java { .api }
489
package io.cdap.cdap.etl.api.engine.sql.dataset;
490
491
public interface SQLPushDataset extends SQLDatasetProducer {
492
/**
493
* Push data to external sink.
494
*/
495
void push();
496
497
/**
498
* Get push statistics.
499
*/
500
PushStatistics getPushStatistics();
501
}
502
```
503
504
### SQLDatasetDescription
505
506
Description of SQL dataset with metadata.
507
508
```java { .api }
509
package io.cdap.cdap.etl.api.engine.sql.dataset;
510
511
public class SQLDatasetDescription {
512
/**
513
* Create dataset description.
514
*/
515
public SQLDatasetDescription(String name, Schema schema, Map<String, String> properties) {}
516
517
/**
518
* Get dataset name.
519
*/
520
public String getName() {}
521
522
/**
523
* Get dataset schema.
524
*/
525
public Schema getSchema() {}
526
527
/**
528
* Get dataset properties.
529
*/
530
public Map<String, String> getProperties() {}
531
}
532
```
533
534
### RecordCollection
535
536
Collection interface for records.
537
538
```java { .api }
539
package io.cdap.cdap.etl.api.engine.sql.dataset;
540
541
public interface RecordCollection extends Iterable<StructuredRecord> {
542
/**
543
* Get collection size.
544
*/
545
long size();
546
547
/**
548
* Check if collection is empty.
549
*/
550
boolean isEmpty();
551
}
552
```
553
554
## SQL Requests
555
556
### SQLJoinRequest
557
558
Request object for SQL join operations.
559
560
```java { .api }
561
package io.cdap.cdap.etl.api.engine.sql.request;
562
563
public class SQLJoinRequest {
564
/**
565
* Create join request.
566
*/
567
public SQLJoinRequest(SQLJoinDefinition joinDefinition, String outputDatasetName) {}
568
569
/**
570
* Get join definition.
571
*/
572
public SQLJoinDefinition getJoinDefinition() {}
573
574
/**
575
* Get output dataset name.
576
*/
577
public String getOutputDatasetName() {}
578
}
579
```
580
581
### SQLJoinDefinition
582
583
Definition for SQL join operations.
584
585
```java { .api }
586
package io.cdap.cdap.etl.api.engine.sql.request;
587
588
public class SQLJoinDefinition {
589
/**
590
* Get participating datasets.
591
*/
592
public List<String> getDatasetNames() {}
593
594
/**
595
* Get join type.
596
*/
597
public JoinType getJoinType() {}
598
599
/**
600
* Get selected fields.
601
*/
602
public List<String> getSelectedFields() {}
603
604
/**
605
* Get join conditions.
606
*/
607
public List<JoinClause> getJoinClauses() {}
608
}
609
```
610
611
### SQLTransformRequest
612
613
Request object for SQL transformation operations.
614
615
```java { .api }
616
package io.cdap.cdap.etl.api.engine.sql.request;
617
618
public class SQLTransformRequest {
619
/**
620
* Create transform request.
621
*/
622
public SQLTransformRequest(SQLTransformDefinition transformDefinition, String outputDatasetName) {}
623
624
/**
625
* Get transform definition.
626
*/
627
public SQLTransformDefinition getTransformDefinition() {}
628
629
/**
630
* Get output dataset name.
631
*/
632
public String getOutputDatasetName() {}
633
}
634
```
635
636
### SQLTransformDefinition
637
638
Definition for SQL transformations.
639
640
```java { .api }
641
package io.cdap.cdap.etl.api.engine.sql.request;
642
643
public class SQLTransformDefinition {
644
/**
645
* Create transform definition.
646
*/
647
public SQLTransformDefinition(String inputDataset, String sqlQuery) {}
648
649
/**
650
* Get input dataset name.
651
*/
652
public String getInputDataset() {}
653
654
/**
655
* Get SQL query for transformation.
656
*/
657
public String getSqlQuery() {}
658
}
659
```
660
661
### SQLReadRequest
662
663
Request object for SQL read operations.
664
665
```java { .api }
666
package io.cdap.cdap.etl.api.engine.sql.request;
667
668
public class SQLReadRequest {
669
/**
670
* Create read request.
671
*/
672
public SQLReadRequest(String datasetName) {}
673
674
/**
675
* Get dataset name to read.
676
*/
677
public String getDatasetName() {}
678
}
679
```
680
681
### SQLWriteRequest
682
683
Request object for SQL write operations.
684
685
```java { .api }
686
package io.cdap.cdap.etl.api.engine.sql.request;
687
688
public class SQLWriteRequest {
689
/**
690
* Create write request.
691
*/
692
public SQLWriteRequest(String datasetName, SQLDataset dataset, WriteMode writeMode) {}
693
694
/**
695
* Get target dataset name.
696
*/
697
public String getDatasetName() {}
698
699
/**
700
* Get source dataset.
701
*/
702
public SQLDataset getDataset() {}
703
704
/**
705
* Get write mode.
706
*/
707
public WriteMode getWriteMode() {}
708
}
709
```
710
711
### SQL Pull and Push Requests
712
713
#### SQLPullRequest
714
715
Request for SQL pull operations.
716
717
```java { .api }
718
package io.cdap.cdap.etl.api.engine.sql.request;
719
720
public class SQLPullRequest {
721
/**
722
* Create pull request.
723
*/
724
public SQLPullRequest(String sourceType, Map<String, String> sourceProperties) {}
725
726
/**
727
* Get source type.
728
*/
729
public String getSourceType() {}
730
731
/**
732
* Get source properties.
733
*/
734
public Map<String, String> getSourceProperties() {}
735
}
736
```
737
738
#### SQLPushRequest
739
740
Request for SQL push operations.
741
742
```java { .api }
743
package io.cdap.cdap.etl.api.engine.sql.request;
744
745
public class SQLPushRequest {
746
/**
747
* Create push request.
748
*/
749
public SQLPushRequest(SQLDataset dataset, String sinkType,
750
Map<String, String> sinkProperties) {}
751
752
/**
753
* Get dataset to push.
754
*/
755
public SQLDataset getDataset() {}
756
757
/**
758
* Get sink type.
759
*/
760
public String getSinkType() {}
761
762
/**
763
* Get sink properties.
764
*/
765
public Map<String, String> getSinkProperties() {}
766
}
767
```
768
769
## Result Objects
770
771
### SQLReadResult
772
773
Result from SQL read operations.
774
775
```java { .api }
776
package io.cdap.cdap.etl.api.engine.sql.request;
777
778
public class SQLReadResult {
779
/**
780
* Create read result.
781
*/
782
public SQLReadResult(SQLDataset dataset, long recordCount) {}
783
784
/**
785
* Get result dataset.
786
*/
787
public SQLDataset getDataset() {}
788
789
/**
790
* Get number of records read.
791
*/
792
public long getRecordCount() {}
793
}
794
```
795
796
### SQLWriteResult
797
798
Result from SQL write operations.
799
800
```java { .api }
801
package io.cdap.cdap.etl.api.engine.sql.request;
802
803
public class SQLWriteResult {
804
/**
805
* Create write result.
806
*/
807
public SQLWriteResult(String datasetName, long recordCount) {}
808
809
/**
810
* Get dataset name written to.
811
*/
812
public String getDatasetName() {}
813
814
/**
815
* Get number of records written.
816
*/
817
public long getRecordCount() {}
818
}
819
```
820
821
## SQL Engine Exception Handling
822
823
### SQLEngineException
824
825
Exception for SQL engine operations.
826
827
```java { .api }
828
package io.cdap.cdap.etl.api.engine.sql;
829
830
public class SQLEngineException extends Exception {
831
/**
832
* Create SQL engine exception.
833
*/
834
public SQLEngineException(String message) {}
835
836
/**
837
* Create SQL engine exception with cause.
838
*/
839
public SQLEngineException(String message, Throwable cause) {}
840
}
841
```
842
843
## Advanced SQL Engine Usage
844
845
### Complex Query Optimization
846
847
```java
848
public class QueryOptimizer {
849
850
public static SQLTransformDefinition optimizeQuery(SQLTransformDefinition originalDef,
851
Map<String, Statistics> datasetStats) {
852
String originalQuery = originalDef.getSqlQuery();
853
854
// Parse and analyze query
855
QueryAnalysis analysis = analyzeQuery(originalQuery);
856
857
// Apply optimizations based on statistics
858
StringBuilder optimizedQuery = new StringBuilder();
859
860
// Add query hints for join optimization
861
if (analysis.hasJoins()) {
862
optimizedQuery.append("/* + USE_HASH_JOIN */ ");
863
}
864
865
// Optimize WHERE clauses - push down predicates
866
optimizedQuery.append(pushDownPredicates(originalQuery, analysis));
867
868
// Add partitioning hints
869
if (analysis.hasGroupBy()) {
870
optimizedQuery.append(" /* + PARTITION_BY(")
871
.append(String.join(", ", analysis.getGroupByFields()))
872
.append(") */");
873
}
874
875
return new SQLTransformDefinition(originalDef.getInputDataset(),
876
optimizedQuery.toString());
877
}
878
879
private static String pushDownPredicates(String query, QueryAnalysis analysis) {
880
// Implementation to push WHERE clauses closer to data sources
881
// This is a simplified example - real implementation would use SQL parser
882
883
if (analysis.hasSelectivePredicates()) {
884
// Rewrite query to apply filters early
885
return rewriteWithEarlyFilters(query, analysis.getSelectivePredicates());
886
}
887
888
return query;
889
}
890
}
891
```
892
893
### Multi-Engine SQL Processing
894
895
```java
896
public class HybridSQLEngine implements BatchSQLEngine {
897
898
private final BatchSQLEngine primaryEngine;
899
private final BatchSQLEngine fallbackEngine;
900
901
public HybridSQLEngine(BatchSQLEngine primaryEngine, BatchSQLEngine fallbackEngine) {
902
this.primaryEngine = primaryEngine;
903
this.fallbackEngine = fallbackEngine;
904
}
905
906
@Override
907
public boolean canTransform(SQLTransformDefinition transformDefinition) {
908
return primaryEngine.canTransform(transformDefinition) ||
909
fallbackEngine.canTransform(transformDefinition);
910
}
911
912
@Override
913
public SQLDataset transform(SQLTransformRequest transformRequest) {
914
// Try primary engine first
915
if (primaryEngine.canTransform(transformRequest.getTransformDefinition())) {
916
try {
917
return primaryEngine.transform(transformRequest);
918
} catch (Exception e) {
919
// Log warning and fall back
920
logWarning("Primary engine failed, falling back", e);
921
}
922
}
923
924
// Use fallback engine
925
if (fallbackEngine.canTransform(transformRequest.getTransformDefinition())) {
926
return fallbackEngine.transform(transformRequest);
927
}
928
929
throw new SQLEngineException("No suitable engine found for transformation");
930
}
931
932
@Override
933
public SQLDataset join(SQLJoinRequest joinRequest) {
934
// Similar hybrid approach for joins
935
if (primaryEngine.canJoin(joinRequest.getJoinDefinition())) {
936
try {
937
return primaryEngine.join(joinRequest);
938
} catch (Exception e) {
939
logWarning("Primary engine join failed, falling back", e);
940
}
941
}
942
943
if (fallbackEngine.canJoin(joinRequest.getJoinDefinition())) {
944
return fallbackEngine.join(joinRequest);
945
}
946
947
throw new SQLEngineException("No suitable engine found for join");
948
}
949
}
950
```