CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
SQL engine integration for advanced query processing, optimization, and relational operations in CDAP ETL pipelines with support for multiple execution engines.
Base interface for SQL engines providing core query processing capabilities.
package io.cdap.cdap.etl.api.engine.sql;
public interface SQLEngine {
/**
* Check if join operation is supported.
*/
boolean canJoin(SQLJoinDefinition joinDefinition);
/**
* Execute join operation.
*/
SQLDataset join(SQLJoinRequest joinRequest);
/**
* Check if transform operation is supported.
*/
boolean canTransform(SQLTransformDefinition transformDefinition);
/**
* Execute transform operation.
*/
SQLDataset transform(SQLTransformRequest transformRequest);
/**
* Check if dataset exists.
*/
boolean exists(String datasetName);
/**
* Read data from dataset.
*/
SQLDataset read(SQLReadRequest readRequest);
/**
* Write data to dataset.
*/
SQLWriteResult write(SQLWriteRequest writeRequest);
/**
* Clean up dataset.
*/
void cleanup(String datasetName);
}SQL engine specifically for batch processing operations.
package io.cdap.cdap.etl.api.engine.sql;
public interface BatchSQLEngine extends SQLEngine {
// Inherits all SQLEngine capabilities for batch operations
// Additional batch-specific optimizations may be added
}SQL Engine Implementation Example:
public class SparkSQLEngine implements BatchSQLEngine {
private final SparkSession sparkSession;
private final Map<String, Dataset<Row>> registeredDatasets;
public SparkSQLEngine(SparkSession sparkSession) {
this.sparkSession = sparkSession;
this.registeredDatasets = new HashMap<>();
}
@Override
public boolean canJoin(SQLJoinDefinition joinDefinition) {
// Check if all required datasets are available
for (String datasetName : joinDefinition.getDatasetNames()) {
if (!exists(datasetName)) {
return false;
}
}
// Check if join type is supported
JoinType joinType = joinDefinition.getJoinType();
return joinType == JoinType.INNER ||
joinType == JoinType.LEFT_OUTER ||
joinType == JoinType.RIGHT_OUTER ||
joinType == JoinType.FULL_OUTER;
}
@Override
public SQLDataset join(SQLJoinRequest joinRequest) {
SQLJoinDefinition joinDef = joinRequest.getJoinDefinition();
String outputDatasetName = joinRequest.getOutputDatasetName();
// Build SQL join query
StringBuilder query = new StringBuilder();
query.append("SELECT ");
// Add selected fields
List<String> selectedFields = joinDef.getSelectedFields();
if (selectedFields.isEmpty()) {
query.append("*");
} else {
query.append(String.join(", ", selectedFields));
}
// Add FROM clause with main dataset
String mainDataset = joinDef.getMainDataset();
query.append(" FROM ").append(mainDataset);
// Add join clauses
for (JoinClause joinClause : joinDef.getJoinClauses()) {
query.append(" ").append(joinClause.getJoinType().toString());
query.append(" JOIN ").append(joinClause.getDatasetName());
query.append(" ON ").append(joinClause.getOnCondition());
}
// Add WHERE clause if specified
if (joinDef.getWhereCondition() != null) {
query.append(" WHERE ").append(joinDef.getWhereCondition());
}
// Execute query
Dataset<Row> resultDataset = sparkSession.sql(query.toString());
// Register result dataset
resultDataset.createOrReplaceTempView(outputDatasetName);
registeredDatasets.put(outputDatasetName, resultDataset);
return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);
}
@Override
public boolean canTransform(SQLTransformDefinition transformDefinition) {
String inputDataset = transformDefinition.getInputDataset();
String sqlQuery = transformDefinition.getSqlQuery();
// Check if input dataset exists
if (!exists(inputDataset)) {
return false;
}
// Validate SQL query syntax
try {
sparkSession.sql("EXPLAIN " + sqlQuery);
return true;
} catch (Exception e) {
return false;
}
}
@Override
public SQLDataset transform(SQLTransformRequest transformRequest) {
SQLTransformDefinition transformDef = transformRequest.getTransformDefinition();
String outputDatasetName = transformRequest.getOutputDatasetName();
String sqlQuery = transformDef.getSqlQuery();
// Execute transformation query
Dataset<Row> resultDataset = sparkSession.sql(sqlQuery);
// Register result dataset
resultDataset.createOrReplaceTempView(outputDatasetName);
registeredDatasets.put(outputDatasetName, resultDataset);
return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);
}
@Override
public boolean exists(String datasetName) {
return registeredDatasets.containsKey(datasetName) ||
sparkSession.catalog().tableExists(datasetName);
}
@Override
public SQLDataset read(SQLReadRequest readRequest) {
String datasetName = readRequest.getDatasetName();
if (registeredDatasets.containsKey(datasetName)) {
Dataset<Row> dataset = registeredDatasets.get(datasetName);
return new SparkSQLDataset(datasetName, dataset.schema(), dataset);
}
// Try reading as table
try {
Dataset<Row> dataset = sparkSession.table(datasetName);
return new SparkSQLDataset(datasetName, dataset.schema(), dataset);
} catch (Exception e) {
throw new SQLEngineException("Failed to read dataset: " + datasetName, e);
}
}
@Override
public SQLWriteResult write(SQLWriteRequest writeRequest) {
String datasetName = writeRequest.getDatasetName();
SQLDataset sqlDataset = writeRequest.getDataset();
if (sqlDataset instanceof SparkSQLDataset) {
SparkSQLDataset sparkDataset = (SparkSQLDataset) sqlDataset;
Dataset<Row> dataset = sparkDataset.getSparkDataset();
// Write dataset based on format
WriteMode writeMode = writeRequest.getWriteMode();
switch (writeMode) {
case OVERWRITE:
dataset.write().mode("overwrite").saveAsTable(datasetName);
break;
case APPEND:
dataset.write().mode("append").saveAsTable(datasetName);
break;
case ERROR_IF_EXISTS:
dataset.write().mode("errorifexists").saveAsTable(datasetName);
break;
default:
throw new SQLEngineException("Unsupported write mode: " + writeMode);
}
return new SQLWriteResult(datasetName, dataset.count());
}
throw new SQLEngineException("Unsupported dataset type: " +
sqlDataset.getClass().getName());
}
@Override
public void cleanup(String datasetName) {
registeredDatasets.remove(datasetName);
try {
sparkSession.catalog().dropTempView(datasetName);
} catch (Exception e) {
// Ignore cleanup errors
}
}
}Interface for SQL engine input operations.
package io.cdap.cdap.etl.api.engine.sql;
public interface SQLEngineInput {
/**
* Get input dataset name.
*/
String getDatasetName();
/**
* Get input schema.
*/
Schema getSchema();
}Interface for SQL engine output operations.
package io.cdap.cdap.etl.api.engine.sql;
public interface SQLEngineOutput {
/**
* Get output dataset name.
*/
String getDatasetName();
/**
* Get output schema.
*/
Schema getSchema();
}Interface defining standard SQL capabilities.
package io.cdap.cdap.etl.api.engine.sql;
public interface StandardSQLCapabilities {
/**
* Check if SELECT operations are supported.
*/
boolean supportsSelect();
/**
* Check if JOIN operations are supported.
*/
boolean supportsJoin();
/**
* Check if GROUP BY operations are supported.
*/
boolean supportsGroupBy();
/**
* Check if window functions are supported.
*/
boolean supportsWindowFunctions();
/**
* Check if subqueries are supported.
*/
boolean supportsSubqueries();
}Capability to pull data from external sources.
package io.cdap.cdap.etl.api.engine.sql.capability;
public interface PullCapability {
/**
* Check if source can be pulled into SQL engine.
*/
boolean canPull(String sourceType);
/**
* Pull data from external source.
*/
SQLDataset pull(PullRequest pullRequest);
}Capability to push data to external sinks.
package io.cdap.cdap.etl.api.engine.sql.capability;
public interface PushCapability {
/**
* Check if data can be pushed to sink.
*/
boolean canPush(String sinkType);
/**
* Push data to external sink.
*/
PushResult push(PushRequest pushRequest);
}Default implementation of pull capability.
package io.cdap.cdap.etl.api.engine.sql.capability;
public class DefaultPullCapability implements PullCapability {
@Override
public boolean canPull(String sourceType) {
// Default implementation - check supported source types
return Arrays.asList("jdbc", "file", "kafka").contains(sourceType.toLowerCase());
}
@Override
public SQLDataset pull(PullRequest pullRequest) {
// Default pull implementation
throw new UnsupportedOperationException("Pull not implemented");
}
}Default implementation of push capability.
package io.cdap.cdap.etl.api.engine.sql.capability;
public class DefaultPushCapability implements PushCapability {
@Override
public boolean canPush(String sinkType) {
// Default implementation - check supported sink types
return Arrays.asList("jdbc", "file", "elasticsearch").contains(sinkType.toLowerCase());
}
@Override
public PushResult push(PushRequest pushRequest) {
// Default push implementation
throw new UnsupportedOperationException("Push not implemented");
}
}Base interface for SQL datasets.
package io.cdap.cdap.etl.api.engine.sql.dataset;
public interface SQLDataset {
/**
* Get dataset name.
*/
String getDatasetName();
/**
* Get dataset schema.
*/
Schema getSchema();
}Consumer interface for SQL datasets.
package io.cdap.cdap.etl.api.engine.sql.dataset;
public interface SQLDatasetConsumer extends SQLDataset {
/**
* Consume data from the dataset.
*/
Iterator<StructuredRecord> consume();
}Producer interface for SQL datasets.
package io.cdap.cdap.etl.api.engine.sql.dataset;
public interface SQLDatasetProducer extends SQLDataset {
/**
* Produce data to the dataset.
*/
void produce(Iterator<StructuredRecord> records);
}Dataset that can be pulled from external source.
package io.cdap.cdap.etl.api.engine.sql.dataset;
public interface SQLPullDataset extends SQLDatasetConsumer {
/**
* Pull data from external source.
*/
void pull();
/**
* Get pull statistics.
*/
PullStatistics getPullStatistics();
}Dataset that can be pushed to external sink.
package io.cdap.cdap.etl.api.engine.sql.dataset;
public interface SQLPushDataset extends SQLDatasetProducer {
/**
* Push data to external sink.
*/
void push();
/**
* Get push statistics.
*/
PushStatistics getPushStatistics();
}Description of SQL dataset with metadata.
package io.cdap.cdap.etl.api.engine.sql.dataset;
public class SQLDatasetDescription {
/**
* Create dataset description.
*/
public SQLDatasetDescription(String name, Schema schema, Map<String, String> properties) {}
/**
* Get dataset name.
*/
public String getName() {}
/**
* Get dataset schema.
*/
public Schema getSchema() {}
/**
* Get dataset properties.
*/
public Map<String, String> getProperties() {}
}Collection interface for records.
package io.cdap.cdap.etl.api.engine.sql.dataset;
public interface RecordCollection extends Iterable<StructuredRecord> {
/**
* Get collection size.
*/
long size();
/**
* Check if collection is empty.
*/
boolean isEmpty();
}Request object for SQL join operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLJoinRequest {
/**
* Create join request.
*/
public SQLJoinRequest(SQLJoinDefinition joinDefinition, String outputDatasetName) {}
/**
* Get join definition.
*/
public SQLJoinDefinition getJoinDefinition() {}
/**
* Get output dataset name.
*/
public String getOutputDatasetName() {}
}Definition for SQL join operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLJoinDefinition {
/**
* Get participating datasets.
*/
public List<String> getDatasetNames() {}
/**
* Get join type.
*/
public JoinType getJoinType() {}
/**
* Get selected fields.
*/
public List<String> getSelectedFields() {}
/**
* Get join conditions.
*/
public List<JoinClause> getJoinClauses() {}
}Request object for SQL transformation operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLTransformRequest {
/**
* Create transform request.
*/
public SQLTransformRequest(SQLTransformDefinition transformDefinition, String outputDatasetName) {}
/**
* Get transform definition.
*/
public SQLTransformDefinition getTransformDefinition() {}
/**
* Get output dataset name.
*/
public String getOutputDatasetName() {}
}Definition for SQL transformations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLTransformDefinition {
/**
* Create transform definition.
*/
public SQLTransformDefinition(String inputDataset, String sqlQuery) {}
/**
* Get input dataset name.
*/
public String getInputDataset() {}
/**
* Get SQL query for transformation.
*/
public String getSqlQuery() {}
}Request object for SQL read operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLReadRequest {
/**
* Create read request.
*/
public SQLReadRequest(String datasetName) {}
/**
* Get dataset name to read.
*/
public String getDatasetName() {}
}Request object for SQL write operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLWriteRequest {
/**
* Create write request.
*/
public SQLWriteRequest(String datasetName, SQLDataset dataset, WriteMode writeMode) {}
/**
* Get target dataset name.
*/
public String getDatasetName() {}
/**
* Get source dataset.
*/
public SQLDataset getDataset() {}
/**
* Get write mode.
*/
public WriteMode getWriteMode() {}
}Request for SQL pull operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLPullRequest {
/**
* Create pull request.
*/
public SQLPullRequest(String sourceType, Map<String, String> sourceProperties) {}
/**
* Get source type.
*/
public String getSourceType() {}
/**
* Get source properties.
*/
public Map<String, String> getSourceProperties() {}
}Request for SQL push operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLPushRequest {
/**
* Create push request.
*/
public SQLPushRequest(SQLDataset dataset, String sinkType,
Map<String, String> sinkProperties) {}
/**
* Get dataset to push.
*/
public SQLDataset getDataset() {}
/**
* Get sink type.
*/
public String getSinkType() {}
/**
* Get sink properties.
*/
public Map<String, String> getSinkProperties() {}
}Result from SQL read operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLReadResult {
/**
* Create read result.
*/
public SQLReadResult(SQLDataset dataset, long recordCount) {}
/**
* Get result dataset.
*/
public SQLDataset getDataset() {}
/**
* Get number of records read.
*/
public long getRecordCount() {}
}Result from SQL write operations.
package io.cdap.cdap.etl.api.engine.sql.request;
public class SQLWriteResult {
/**
* Create write result.
*/
public SQLWriteResult(String datasetName, long recordCount) {}
/**
* Get dataset name written to.
*/
public String getDatasetName() {}
/**
* Get number of records written.
*/
public long getRecordCount() {}
}Exception for SQL engine operations.
package io.cdap.cdap.etl.api.engine.sql;
public class SQLEngineException extends Exception {
/**
* Create SQL engine exception.
*/
public SQLEngineException(String message) {}
/**
* Create SQL engine exception with cause.
*/
public SQLEngineException(String message, Throwable cause) {}
}public class QueryOptimizer {
public static SQLTransformDefinition optimizeQuery(SQLTransformDefinition originalDef,
Map<String, Statistics> datasetStats) {
String originalQuery = originalDef.getSqlQuery();
// Parse and analyze query
QueryAnalysis analysis = analyzeQuery(originalQuery);
// Apply optimizations based on statistics
StringBuilder optimizedQuery = new StringBuilder();
// Add query hints for join optimization
if (analysis.hasJoins()) {
optimizedQuery.append("/* + USE_HASH_JOIN */ ");
}
// Optimize WHERE clauses - push down predicates
optimizedQuery.append(pushDownPredicates(originalQuery, analysis));
// Add partitioning hints
if (analysis.hasGroupBy()) {
optimizedQuery.append(" /* + PARTITION_BY(")
.append(String.join(", ", analysis.getGroupByFields()))
.append(") */");
}
return new SQLTransformDefinition(originalDef.getInputDataset(),
optimizedQuery.toString());
}
private static String pushDownPredicates(String query, QueryAnalysis analysis) {
// Implementation to push WHERE clauses closer to data sources
// This is a simplified example - real implementation would use SQL parser
if (analysis.hasSelectivePredicates()) {
// Rewrite query to apply filters early
return rewriteWithEarlyFilters(query, analysis.getSelectivePredicates());
}
return query;
}
}public class HybridSQLEngine implements BatchSQLEngine {
private final BatchSQLEngine primaryEngine;
private final BatchSQLEngine fallbackEngine;
public HybridSQLEngine(BatchSQLEngine primaryEngine, BatchSQLEngine fallbackEngine) {
this.primaryEngine = primaryEngine;
this.fallbackEngine = fallbackEngine;
}
@Override
public boolean canTransform(SQLTransformDefinition transformDefinition) {
return primaryEngine.canTransform(transformDefinition) ||
fallbackEngine.canTransform(transformDefinition);
}
@Override
public SQLDataset transform(SQLTransformRequest transformRequest) {
// Try primary engine first
if (primaryEngine.canTransform(transformRequest.getTransformDefinition())) {
try {
return primaryEngine.transform(transformRequest);
} catch (Exception e) {
// Log warning and fall back
logWarning("Primary engine failed, falling back", e);
}
}
// Use fallback engine
if (fallbackEngine.canTransform(transformRequest.getTransformDefinition())) {
return fallbackEngine.transform(transformRequest);
}
throw new SQLEngineException("No suitable engine found for transformation");
}
@Override
public SQLDataset join(SQLJoinRequest joinRequest) {
// Similar hybrid approach for joins
if (primaryEngine.canJoin(joinRequest.getJoinDefinition())) {
try {
return primaryEngine.join(joinRequest);
} catch (Exception e) {
logWarning("Primary engine join failed, falling back", e);
}
}
if (fallbackEngine.canJoin(joinRequest.getJoinDefinition())) {
return fallbackEngine.join(joinRequest);
}
throw new SQLEngineException("No suitable engine found for join");
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api