CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-etl-api

CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform

Pending
Overview
Eval results
Files

sql-engine.mddocs/

SQL Engine Support

SQL engine integration for advanced query processing, optimization, and relational operations in CDAP ETL pipelines with support for multiple execution engines.

Core SQL Engine Interfaces

SQLEngine

Base interface for SQL engines providing core query processing capabilities.

package io.cdap.cdap.etl.api.engine.sql;

public interface SQLEngine {
    /**
     * Check if join operation is supported.
     */
    boolean canJoin(SQLJoinDefinition joinDefinition);
    
    /**
     * Execute join operation.
     */
    SQLDataset join(SQLJoinRequest joinRequest);
    
    /**
     * Check if transform operation is supported.
     */
    boolean canTransform(SQLTransformDefinition transformDefinition);
    
    /**
     * Execute transform operation.
     */
    SQLDataset transform(SQLTransformRequest transformRequest);
    
    /**
     * Check if dataset exists.
     */
    boolean exists(String datasetName);
    
    /**
     * Read data from dataset.
     */
    SQLDataset read(SQLReadRequest readRequest);
    
    /**
     * Write data to dataset.
     */
    SQLWriteResult write(SQLWriteRequest writeRequest);
    
    /**
     * Clean up dataset.
     */
    void cleanup(String datasetName);
}

BatchSQLEngine

SQL engine specifically for batch processing operations.

package io.cdap.cdap.etl.api.engine.sql;

public interface BatchSQLEngine extends SQLEngine {
    // Inherits all SQLEngine capabilities for batch operations
    // Additional batch-specific optimizations may be added
}

SQL Engine Implementation Example:

public class SparkSQLEngine implements BatchSQLEngine {
    
    private final SparkSession sparkSession;
    private final Map<String, Dataset<Row>> registeredDatasets;
    
    public SparkSQLEngine(SparkSession sparkSession) {
        this.sparkSession = sparkSession;
        this.registeredDatasets = new HashMap<>();
    }
    
    @Override
    public boolean canJoin(SQLJoinDefinition joinDefinition) {
        // Check if all required datasets are available
        for (String datasetName : joinDefinition.getDatasetNames()) {
            if (!exists(datasetName)) {
                return false;
            }
        }
        
        // Check if join type is supported
        JoinType joinType = joinDefinition.getJoinType();
        return joinType == JoinType.INNER || 
               joinType == JoinType.LEFT_OUTER || 
               joinType == JoinType.RIGHT_OUTER || 
               joinType == JoinType.FULL_OUTER;
    }
    
    @Override
    public SQLDataset join(SQLJoinRequest joinRequest) {
        SQLJoinDefinition joinDef = joinRequest.getJoinDefinition();
        String outputDatasetName = joinRequest.getOutputDatasetName();
        
        // Build SQL join query
        StringBuilder query = new StringBuilder();
        query.append("SELECT ");
        
        // Add selected fields
        List<String> selectedFields = joinDef.getSelectedFields();
        if (selectedFields.isEmpty()) {
            query.append("*");
        } else {
            query.append(String.join(", ", selectedFields));
        }
        
        // Add FROM clause with main dataset
        String mainDataset = joinDef.getMainDataset();
        query.append(" FROM ").append(mainDataset);
        
        // Add join clauses
        for (JoinClause joinClause : joinDef.getJoinClauses()) {
            query.append(" ").append(joinClause.getJoinType().toString());
            query.append(" JOIN ").append(joinClause.getDatasetName());
            query.append(" ON ").append(joinClause.getOnCondition());
        }
        
        // Add WHERE clause if specified
        if (joinDef.getWhereCondition() != null) {
            query.append(" WHERE ").append(joinDef.getWhereCondition());
        }
        
        // Execute query
        Dataset<Row> resultDataset = sparkSession.sql(query.toString());
        
        // Register result dataset
        resultDataset.createOrReplaceTempView(outputDatasetName);
        registeredDatasets.put(outputDatasetName, resultDataset);
        
        return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);
    }
    
    @Override
    public boolean canTransform(SQLTransformDefinition transformDefinition) {
        String inputDataset = transformDefinition.getInputDataset();
        String sqlQuery = transformDefinition.getSqlQuery();
        
        // Check if input dataset exists
        if (!exists(inputDataset)) {
            return false;
        }
        
        // Validate SQL query syntax
        try {
            sparkSession.sql("EXPLAIN " + sqlQuery);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
    
    @Override
    public SQLDataset transform(SQLTransformRequest transformRequest) {
        SQLTransformDefinition transformDef = transformRequest.getTransformDefinition();
        String outputDatasetName = transformRequest.getOutputDatasetName();
        String sqlQuery = transformDef.getSqlQuery();
        
        // Execute transformation query
        Dataset<Row> resultDataset = sparkSession.sql(sqlQuery);
        
        // Register result dataset
        resultDataset.createOrReplaceTempView(outputDatasetName);
        registeredDatasets.put(outputDatasetName, resultDataset);
        
        return new SparkSQLDataset(outputDatasetName, resultDataset.schema(), resultDataset);
    }
    
    @Override
    public boolean exists(String datasetName) {
        return registeredDatasets.containsKey(datasetName) ||
               sparkSession.catalog().tableExists(datasetName);
    }
    
    @Override
    public SQLDataset read(SQLReadRequest readRequest) {
        String datasetName = readRequest.getDatasetName();
        
        if (registeredDatasets.containsKey(datasetName)) {
            Dataset<Row> dataset = registeredDatasets.get(datasetName);
            return new SparkSQLDataset(datasetName, dataset.schema(), dataset);
        }
        
        // Try reading as table
        try {
            Dataset<Row> dataset = sparkSession.table(datasetName);
            return new SparkSQLDataset(datasetName, dataset.schema(), dataset);
        } catch (Exception e) {
            throw new SQLEngineException("Failed to read dataset: " + datasetName, e);
        }
    }
    
    @Override
    public SQLWriteResult write(SQLWriteRequest writeRequest) {
        String datasetName = writeRequest.getDatasetName();
        SQLDataset sqlDataset = writeRequest.getDataset();
        
        if (sqlDataset instanceof SparkSQLDataset) {
            SparkSQLDataset sparkDataset = (SparkSQLDataset) sqlDataset;
            Dataset<Row> dataset = sparkDataset.getSparkDataset();
            
            // Write dataset based on format
            WriteMode writeMode = writeRequest.getWriteMode();
            switch (writeMode) {
                case OVERWRITE:
                    dataset.write().mode("overwrite").saveAsTable(datasetName);
                    break;
                case APPEND:
                    dataset.write().mode("append").saveAsTable(datasetName);
                    break;
                case ERROR_IF_EXISTS:
                    dataset.write().mode("errorifexists").saveAsTable(datasetName);
                    break;
                default:
                    throw new SQLEngineException("Unsupported write mode: " + writeMode);
            }
            
            return new SQLWriteResult(datasetName, dataset.count());
        }
        
        throw new SQLEngineException("Unsupported dataset type: " + 
                                   sqlDataset.getClass().getName());
    }
    
    @Override
    public void cleanup(String datasetName) {
        registeredDatasets.remove(datasetName);
        
        try {
            sparkSession.catalog().dropTempView(datasetName);
        } catch (Exception e) {
            // Ignore cleanup errors
        }
    }
}

SQL Engine Input/Output

SQLEngineInput

Interface for SQL engine input operations.

package io.cdap.cdap.etl.api.engine.sql;

public interface SQLEngineInput {
    /**
     * Get input dataset name.
     */
    String getDatasetName();
    
    /**
     * Get input schema.
     */
    Schema getSchema();
}

SQLEngineOutput

Interface for SQL engine output operations.

package io.cdap.cdap.etl.api.engine.sql;

public interface SQLEngineOutput {
    /**
     * Get output dataset name.
     */
    String getDatasetName();
    
    /**
     * Get output schema.
     */
    Schema getSchema();
}

SQL Capabilities

StandardSQLCapabilities

Interface defining standard SQL capabilities.

package io.cdap.cdap.etl.api.engine.sql;

public interface StandardSQLCapabilities {
    /**
     * Check if SELECT operations are supported.
     */
    boolean supportsSelect();
    
    /**
     * Check if JOIN operations are supported.
     */
    boolean supportsJoin();
    
    /**
     * Check if GROUP BY operations are supported.
     */
    boolean supportsGroupBy();
    
    /**
     * Check if window functions are supported.
     */
    boolean supportsWindowFunctions();
    
    /**
     * Check if subqueries are supported.
     */
    boolean supportsSubqueries();
}

SQL Engine Capabilities

Push and Pull Capabilities

PullCapability

Capability to pull data from external sources.

package io.cdap.cdap.etl.api.engine.sql.capability;

public interface PullCapability {
    /**
     * Check if source can be pulled into SQL engine.
     */
    boolean canPull(String sourceType);
    
    /**
     * Pull data from external source.
     */
    SQLDataset pull(PullRequest pullRequest);
}

PushCapability

Capability to push data to external sinks.

package io.cdap.cdap.etl.api.engine.sql.capability;

public interface PushCapability {
    /**
     * Check if data can be pushed to sink.
     */
    boolean canPush(String sinkType);
    
    /**
     * Push data to external sink.
     */
    PushResult push(PushRequest pushRequest);
}

DefaultPullCapability

Default implementation of pull capability.

package io.cdap.cdap.etl.api.engine.sql.capability;

public class DefaultPullCapability implements PullCapability {
    @Override
    public boolean canPull(String sourceType) {
        // Default implementation - check supported source types
        return Arrays.asList("jdbc", "file", "kafka").contains(sourceType.toLowerCase());
    }
    
    @Override
    public SQLDataset pull(PullRequest pullRequest) {
        // Default pull implementation
        throw new UnsupportedOperationException("Pull not implemented");
    }
}

DefaultPushCapability

Default implementation of push capability.

package io.cdap.cdap.etl.api.engine.sql.capability;

public class DefaultPushCapability implements PushCapability {
    @Override
    public boolean canPush(String sinkType) {
        // Default implementation - check supported sink types
        return Arrays.asList("jdbc", "file", "elasticsearch").contains(sinkType.toLowerCase());
    }
    
    @Override
    public PushResult push(PushRequest pushRequest) {
        // Default push implementation
        throw new UnsupportedOperationException("Push not implemented");
    }
}

SQL Datasets

SQLDataset

Base interface for SQL datasets.

package io.cdap.cdap.etl.api.engine.sql.dataset;

public interface SQLDataset {
    /**
     * Get dataset name.
     */
    String getDatasetName();
    
    /**
     * Get dataset schema.
     */
    Schema getSchema();
}

SQLDatasetConsumer

Consumer interface for SQL datasets.

package io.cdap.cdap.etl.api.engine.sql.dataset;

public interface SQLDatasetConsumer extends SQLDataset {
    /**
     * Consume data from the dataset.
     */
    Iterator<StructuredRecord> consume();
}

SQLDatasetProducer

Producer interface for SQL datasets.

package io.cdap.cdap.etl.api.engine.sql.dataset;

public interface SQLDatasetProducer extends SQLDataset {
    /**
     * Produce data to the dataset.
     */
    void produce(Iterator<StructuredRecord> records);
}

SQLPullDataset

Dataset that can be pulled from external source.

package io.cdap.cdap.etl.api.engine.sql.dataset;

public interface SQLPullDataset extends SQLDatasetConsumer {
    /**
     * Pull data from external source.
     */
    void pull();
    
    /**
     * Get pull statistics.
     */
    PullStatistics getPullStatistics();
}

SQLPushDataset

Dataset that can be pushed to external sink.

package io.cdap.cdap.etl.api.engine.sql.dataset;

public interface SQLPushDataset extends SQLDatasetProducer {
    /**
     * Push data to external sink.
     */
    void push();
    
    /**
     * Get push statistics.
     */
    PushStatistics getPushStatistics();
}

SQLDatasetDescription

Description of SQL dataset with metadata.

package io.cdap.cdap.etl.api.engine.sql.dataset;

public class SQLDatasetDescription {
    /**
     * Create dataset description.
     */
    public SQLDatasetDescription(String name, Schema schema, Map<String, String> properties) {}
    
    /**
     * Get dataset name.
     */
    public String getName() {}
    
    /**
     * Get dataset schema.
     */
    public Schema getSchema() {}
    
    /**
     * Get dataset properties.
     */
    public Map<String, String> getProperties() {}
}

RecordCollection

Collection interface for records.

package io.cdap.cdap.etl.api.engine.sql.dataset;

public interface RecordCollection extends Iterable<StructuredRecord> {
    /**
     * Get collection size.
     */
    long size();
    
    /**
     * Check if collection is empty.
     */
    boolean isEmpty();
}

SQL Requests

SQLJoinRequest

Request object for SQL join operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLJoinRequest {
    /**
     * Create join request.
     */
    public SQLJoinRequest(SQLJoinDefinition joinDefinition, String outputDatasetName) {}
    
    /**
     * Get join definition.
     */
    public SQLJoinDefinition getJoinDefinition() {}
    
    /**
     * Get output dataset name.
     */
    public String getOutputDatasetName() {}
}

SQLJoinDefinition

Definition for SQL join operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLJoinDefinition {
    /**
     * Get participating datasets.
     */
    public List<String> getDatasetNames() {}
    
    /**
     * Get join type.
     */
    public JoinType getJoinType() {}
    
    /**
     * Get selected fields.
     */
    public List<String> getSelectedFields() {}
    
    /**
     * Get join conditions.
     */
    public List<JoinClause> getJoinClauses() {}
}

SQLTransformRequest

Request object for SQL transformation operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLTransformRequest {
    /**
     * Create transform request.
     */
    public SQLTransformRequest(SQLTransformDefinition transformDefinition, String outputDatasetName) {}
    
    /**
     * Get transform definition.
     */
    public SQLTransformDefinition getTransformDefinition() {}
    
    /**
     * Get output dataset name.
     */
    public String getOutputDatasetName() {}
}

SQLTransformDefinition

Definition for SQL transformations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLTransformDefinition {
    /**
     * Create transform definition.
     */
    public SQLTransformDefinition(String inputDataset, String sqlQuery) {}
    
    /**
     * Get input dataset name.
     */
    public String getInputDataset() {}
    
    /**
     * Get SQL query for transformation.
     */
    public String getSqlQuery() {}
}

SQLReadRequest

Request object for SQL read operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLReadRequest {
    /**
     * Create read request.
     */
    public SQLReadRequest(String datasetName) {}
    
    /**
     * Get dataset name to read.
     */
    public String getDatasetName() {}
}

SQLWriteRequest

Request object for SQL write operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLWriteRequest {
    /**
     * Create write request.
     */
    public SQLWriteRequest(String datasetName, SQLDataset dataset, WriteMode writeMode) {}
    
    /**
     * Get target dataset name.
     */
    public String getDatasetName() {}
    
    /**
     * Get source dataset.
     */
    public SQLDataset getDataset() {}
    
    /**
     * Get write mode.
     */
    public WriteMode getWriteMode() {}
}

SQL Pull and Push Requests

SQLPullRequest

Request for SQL pull operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLPullRequest {
    /**
     * Create pull request.
     */
    public SQLPullRequest(String sourceType, Map<String, String> sourceProperties) {}
    
    /**
     * Get source type.
     */
    public String getSourceType() {}
    
    /**
     * Get source properties.
     */
    public Map<String, String> getSourceProperties() {}
}

SQLPushRequest

Request for SQL push operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLPushRequest {
    /**
     * Create push request.
     */
    public SQLPushRequest(SQLDataset dataset, String sinkType, 
                         Map<String, String> sinkProperties) {}
    
    /**
     * Get dataset to push.
     */
    public SQLDataset getDataset() {}
    
    /**
     * Get sink type.
     */
    public String getSinkType() {}
    
    /**
     * Get sink properties.
     */
    public Map<String, String> getSinkProperties() {}
}

Result Objects

SQLReadResult

Result from SQL read operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLReadResult {
    /**
     * Create read result.
     */
    public SQLReadResult(SQLDataset dataset, long recordCount) {}
    
    /**
     * Get result dataset.
     */
    public SQLDataset getDataset() {}
    
    /**
     * Get number of records read.
     */
    public long getRecordCount() {}
}

SQLWriteResult

Result from SQL write operations.

package io.cdap.cdap.etl.api.engine.sql.request;

public class SQLWriteResult {
    /**
     * Create write result.
     */
    public SQLWriteResult(String datasetName, long recordCount) {}
    
    /**
     * Get dataset name written to.
     */
    public String getDatasetName() {}
    
    /**
     * Get number of records written.
     */
    public long getRecordCount() {}
}

SQL Engine Exception Handling

SQLEngineException

Exception for SQL engine operations.

package io.cdap.cdap.etl.api.engine.sql;

public class SQLEngineException extends Exception {
    /**
     * Create SQL engine exception.
     */
    public SQLEngineException(String message) {}
    
    /**
     * Create SQL engine exception with cause.
     */
    public SQLEngineException(String message, Throwable cause) {}
}

Advanced SQL Engine Usage

Complex Query Optimization

public class QueryOptimizer {
    
    public static SQLTransformDefinition optimizeQuery(SQLTransformDefinition originalDef,
                                                      Map<String, Statistics> datasetStats) {
        String originalQuery = originalDef.getSqlQuery();
        
        // Parse and analyze query
        QueryAnalysis analysis = analyzeQuery(originalQuery);
        
        // Apply optimizations based on statistics
        StringBuilder optimizedQuery = new StringBuilder();
        
        // Add query hints for join optimization
        if (analysis.hasJoins()) {
            optimizedQuery.append("/* + USE_HASH_JOIN */ ");
        }
        
        // Optimize WHERE clauses - push down predicates
        optimizedQuery.append(pushDownPredicates(originalQuery, analysis));
        
        // Add partitioning hints
        if (analysis.hasGroupBy()) {
            optimizedQuery.append(" /* + PARTITION_BY(")
                         .append(String.join(", ", analysis.getGroupByFields()))
                         .append(") */");
        }
        
        return new SQLTransformDefinition(originalDef.getInputDataset(), 
                                        optimizedQuery.toString());
    }
    
    private static String pushDownPredicates(String query, QueryAnalysis analysis) {
        // Implementation to push WHERE clauses closer to data sources
        // This is a simplified example - real implementation would use SQL parser
        
        if (analysis.hasSelectivePredicates()) {
            // Rewrite query to apply filters early
            return rewriteWithEarlyFilters(query, analysis.getSelectivePredicates());
        }
        
        return query;
    }
}

Multi-Engine SQL Processing

public class HybridSQLEngine implements BatchSQLEngine {
    
    private final BatchSQLEngine primaryEngine;
    private final BatchSQLEngine fallbackEngine;
    
    public HybridSQLEngine(BatchSQLEngine primaryEngine, BatchSQLEngine fallbackEngine) {
        this.primaryEngine = primaryEngine;
        this.fallbackEngine = fallbackEngine;
    }
    
    @Override
    public boolean canTransform(SQLTransformDefinition transformDefinition) {
        return primaryEngine.canTransform(transformDefinition) || 
               fallbackEngine.canTransform(transformDefinition);
    }
    
    @Override
    public SQLDataset transform(SQLTransformRequest transformRequest) {
        // Try primary engine first
        if (primaryEngine.canTransform(transformRequest.getTransformDefinition())) {
            try {
                return primaryEngine.transform(transformRequest);
            } catch (Exception e) {
                // Log warning and fall back
                logWarning("Primary engine failed, falling back", e);
            }
        }
        
        // Use fallback engine
        if (fallbackEngine.canTransform(transformRequest.getTransformDefinition())) {
            return fallbackEngine.transform(transformRequest);
        }
        
        throw new SQLEngineException("No suitable engine found for transformation");
    }
    
    @Override
    public SQLDataset join(SQLJoinRequest joinRequest) {
        // Similar hybrid approach for joins
        if (primaryEngine.canJoin(joinRequest.getJoinDefinition())) {
            try {
                return primaryEngine.join(joinRequest);
            } catch (Exception e) {
                logWarning("Primary engine join failed, falling back", e);
            }
        }
        
        if (fallbackEngine.canJoin(joinRequest.getJoinDefinition())) {
            return fallbackEngine.join(joinRequest);
        }
        
        throw new SQLEngineException("No suitable engine found for join");
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api

docs

actions-conditions.md

batch-processing.md

core-pipeline.md

data-connectors.md

index.md

join-operations.md

lineage-metadata.md

sql-engine.md

validation.md

tile.json