CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-parser-hive

SQL parser component for Apache Flink that provides Hive dialect support for parsing Hive-specific DDL and DML statements

Pending
Overview
Eval results
Files

data-manipulation.mddocs/

Data Manipulation

Data manipulation provides enhanced INSERT statements with comprehensive partition support for both static and dynamic partitioning in Hive tables.

Capabilities

Enhanced INSERT Operations

Enhanced INSERT statement for Hive tables with comprehensive partition support.

/**
 * Enhanced INSERT statement for Hive tables with partition support
 * Supports both static and dynamic partitioning with OVERWRITE option
 */
public class RichSqlHiveInsert extends RichSqlInsert {
    /**
     * Creates a new Hive INSERT statement with partition support
     * @param pos Parser position information
     * @param keywords INSERT keywords (INSERT, INSERT OVERWRITE, etc.)
     * @param extendedKeywords Extended keywords for Hive-specific features
     * @param targetTable Target table for the insert operation
     * @param source Source data (SELECT query or VALUES clause)
     * @param columnList Target column list (optional)
     * @param staticPartitions Static partition specifications
     * @param allPartKeys All partition key columns
     */
    public RichSqlHiveInsert(SqlParserPos pos, SqlNodeList keywords, SqlNodeList extendedKeywords,
                            SqlNode targetTable, SqlNode source, SqlNodeList columnList,
                            SqlNodeList staticPartitions, SqlNodeList allPartKeys);
}

Usage Examples:

// Basic INSERT into partitioned table
String basicInsertSql = """
    INSERT INTO TABLE sales_data
    PARTITION (year=2023, month=12)
    SELECT id, customer_id, product_name, amount, transaction_date
    FROM raw_sales_data
    WHERE YEAR(transaction_date) = 2023 
      AND MONTH(transaction_date) = 12
    """;

// INSERT OVERWRITE with static partitions
String insertOverwriteSql = """
    INSERT OVERWRITE TABLE sales_data
    PARTITION (year=2023, month=12)
    SELECT id, customer_id, product_name, amount, transaction_date
    FROM updated_sales_data
    WHERE YEAR(transaction_date) = 2023 
      AND MONTH(transaction_date) = 12
    """;

// Dynamic partition INSERT
String dynamicPartitionSql = """
    INSERT INTO TABLE sales_data
    PARTITION (year, month)
    SELECT 
        id, 
        customer_id, 
        product_name, 
        amount, 
        transaction_date,
        YEAR(transaction_date) as year,
        MONTH(transaction_date) as month
    FROM raw_sales_data
    """;

// Mixed static and dynamic partitions
String mixedPartitionSql = """
    INSERT INTO TABLE sales_data
    PARTITION (year=2023, month)
    SELECT 
        id, 
        customer_id, 
        product_name, 
        amount, 
        transaction_date,
        MONTH(transaction_date) as month
    FROM raw_sales_data
    WHERE YEAR(transaction_date) = 2023
    """;

// INSERT with explicit column list
String insertWithColumnsSql = """
    INSERT INTO TABLE sales_data (id, customer_id, amount)
    PARTITION (year=2023, month=12)
    SELECT transaction_id, cust_id, total_amount
    FROM external_data
    """;

Advanced INSERT Operations

Bulk Data Loading

Efficient bulk data loading patterns for large datasets:

// Bulk insert from external data source
String bulkInsertSql = """
    INSERT OVERWRITE TABLE sales_data
    PARTITION (year, month)
    SELECT 
        CAST(id as BIGINT) as id,
        TRIM(customer_id) as customer_id,
        UPPER(product_name) as product_name,
        CAST(amount as DECIMAL(10,2)) as amount,
        CAST(transaction_date as DATE) as transaction_date,
        YEAR(CAST(transaction_date as DATE)) as year,
        MONTH(CAST(transaction_date as DATE)) as month
    FROM (
        SELECT 
            raw_id as id,
            raw_customer as customer_id,
            raw_product as product_name,
            raw_amount as amount,
            raw_date as transaction_date
        FROM external_sales_table
        WHERE raw_date IS NOT NULL
          AND raw_amount > 0
          AND raw_customer IS NOT NULL
    ) cleaned_data
    """;

Multi-Table INSERT

Insert into multiple tables from a single source:

// Multi-table insert pattern
String multiTableInsertSql = """
    FROM raw_transaction_data rtd
    INSERT INTO TABLE sales_data
    PARTITION (year, month)
    SELECT 
        id, customer_id, product_name, amount, transaction_date,
        YEAR(transaction_date), MONTH(transaction_date)
    WHERE transaction_type = 'SALE'
    
    INSERT INTO TABLE refund_data  
    PARTITION (year, month)
    SELECT 
        id, customer_id, product_name, ABS(amount), transaction_date,
        YEAR(transaction_date), MONTH(transaction_date)
    WHERE transaction_type = 'REFUND'
    """;

Conditional INSERT Operations

Use conditional logic within INSERT statements:

// Conditional insert with data transformation
String conditionalInsertSql = """
    INSERT INTO TABLE customer_segments
    PARTITION (segment_date)
    SELECT 
        customer_id,
        customer_name,
        total_spent,
        transaction_count,
        CASE 
            WHEN total_spent > 10000 THEN 'Premium'
            WHEN total_spent > 5000 THEN 'Gold'
            WHEN total_spent > 1000 THEN 'Silver'
            ELSE 'Bronze'
        END as segment,
        CURRENT_DATE as segment_date
    FROM (
        SELECT 
            c.customer_id,
            c.customer_name,
            COALESCE(SUM(s.amount), 0) as total_spent,
            COALESCE(COUNT(s.id), 0) as transaction_count
        FROM customer_profile c
        LEFT JOIN sales_data s ON c.customer_id = s.customer_id
        GROUP BY c.customer_id, c.customer_name
    ) aggregated_data
    """;

Partition Management in INSERT Operations

Dynamic Partition Configuration

Configure dynamic partition behavior:

// Enable dynamic partitioning (typically set at session level)
String enableDynamicPartitions = """
    SET hive.exec.dynamic.partition = true;
    SET hive.exec.dynamic.partition.mode = nonstrict;
    SET hive.exec.max.dynamic.partitions = 10000;
    SET hive.exec.max.dynamic.partitions.pernode = 1000;
    """;

// Dynamic partition insert with configuration
String configuredDynamicInsert = """
    INSERT INTO TABLE time_series_data
    PARTITION (year, month, day)
    SELECT 
        event_id,
        user_id,
        event_type,
        event_timestamp,
        YEAR(event_timestamp) as year,
        MONTH(event_timestamp) as month,
        DAY(event_timestamp) as day
    FROM raw_events
    WHERE event_timestamp >= '2023-01-01'
    """;

Partition Pruning and Optimization

Optimize INSERT operations with partition pruning:

// Optimized insert with partition pruning
String optimizedInsertSql = """
    INSERT INTO TABLE sales_data_optimized
    PARTITION (year=2023, month, region)
    SELECT 
        id, 
        customer_id, 
        product_name, 
        amount, 
        transaction_date,
        MONTH(transaction_date) as month,
        customer_region as region
    FROM sales_data
    WHERE year = 2023  -- Partition pruning on source
      AND customer_region IN ('US', 'EU', 'APAC')  -- Limit partition creation
    """;

Data Quality and Validation

INSERT with Data Quality Checks

Implement data quality validation during INSERT:

// Insert with data quality validation
String qualityCheckedInsertSql = """
    INSERT INTO TABLE validated_sales_data
    PARTITION (year, month, quality_flag)
    SELECT 
        id,
        customer_id,
        product_name,
        amount,
        transaction_date,
        YEAR(transaction_date) as year,
        MONTH(transaction_date) as month,
        CASE 
            WHEN amount > 0 
             AND customer_id IS NOT NULL 
             AND product_name IS NOT NULL 
             AND transaction_date IS NOT NULL 
            THEN 'VALID'
            ELSE 'INVALID'
        END as quality_flag
    FROM raw_sales_data
    WHERE transaction_date >= '2023-01-01'
    """;

Upsert Pattern Implementation

Implement upsert (insert or update) pattern:

// Upsert pattern using INSERT OVERWRITE
String upsertPatternSql = """
    -- Step 1: Create temporary table with new/updated data
    CREATE TEMPORARY TABLE temp_updates AS
    SELECT * FROM new_sales_data;
    
    -- Step 2: Insert overwrite with merged data
    INSERT OVERWRITE TABLE sales_data
    PARTITION (year, month)
    SELECT 
        COALESCE(updates.id, existing.id) as id,
        COALESCE(updates.customer_id, existing.customer_id) as customer_id,
        COALESCE(updates.product_name, existing.product_name) as product_name,
        COALESCE(updates.amount, existing.amount) as amount,
        COALESCE(updates.transaction_date, existing.transaction_date) as transaction_date,
        YEAR(COALESCE(updates.transaction_date, existing.transaction_date)) as year,
        MONTH(COALESCE(updates.transaction_date, existing.transaction_date)) as month
    FROM (
        SELECT * FROM sales_data 
        WHERE (year, month, id) NOT IN (
            SELECT YEAR(transaction_date), MONTH(transaction_date), id 
            FROM temp_updates
        )
    ) existing
    FULL OUTER JOIN temp_updates updates ON existing.id = updates.id;
    """;

Performance Optimization

Batch INSERT Operations

Optimize performance with batch operations:

public class HiveDataLoader {
    private TableEnvironment tableEnv;
    
    public HiveDataLoader(TableEnvironment tableEnv) {
        this.tableEnv = tableEnv;
    }
    
    /**
     * Loads data in batches to optimize performance
     */
    public void loadDataInBatches(String targetTable, String sourceQuery, 
                                 String partitionColumn, List<String> partitionValues) {
        for (String partitionValue : partitionValues) {
            String batchInsertSql = String.format("""
                INSERT INTO TABLE %s
                PARTITION (%s='%s')
                %s
                WHERE %s = '%s'
                """, targetTable, partitionColumn, partitionValue, 
                     sourceQuery, partitionColumn, partitionValue);
            
            try {
                tableEnv.executeSql(batchInsertSql);
                System.out.println("Loaded partition: " + partitionValue);
            } catch (Exception e) {
                System.err.println("Failed to load partition " + partitionValue + ": " + e.getMessage());
            }
        }
    }
    
    /**
     * Performs incremental data loading
     */
    public void incrementalLoad(String targetTable, String sourceTable, 
                               String timestampColumn, String lastLoadTimestamp) {
        String incrementalInsertSql = String.format("""
            INSERT INTO TABLE %s
            PARTITION (load_date)
            SELECT 
                *,
                CURRENT_DATE as load_date
            FROM %s
            WHERE %s > '%s'
            """, targetTable, sourceTable, timestampColumn, lastLoadTimestamp);
        
        try {
            tableEnv.executeSql(incrementalInsertSql);
            System.out.println("Completed incremental load from " + lastLoadTimestamp);
        } catch (Exception e) {
            System.err.println("Incremental load failed: " + e.getMessage());
        }
    }
    
    /**
     * Handles duplicate detection and deduplication
     */
    public void insertWithDeduplication(String targetTable, String sourceTable, String keyColumns) {
        String deduplicatedInsertSql = String.format("""
            INSERT OVERWRITE TABLE %s
            PARTITION (load_date)
            SELECT 
                t1.*,
                CURRENT_DATE as load_date
            FROM %s t1
            JOIN (
                SELECT %s, MAX(transaction_date) as max_date
                FROM %s
                GROUP BY %s
            ) t2 ON t1.%s = t2.%s 
                 AND t1.transaction_date = t2.max_date
            """, targetTable, sourceTable, keyColumns, sourceTable, 
                 keyColumns, keyColumns, keyColumns);
        
        try {
            tableEnv.executeSql(deduplicatedInsertSql);
            System.out.println("Completed insert with deduplication");
        } catch (Exception e) {
            System.err.println("Deduplication insert failed: " + e.getMessage());
        }
    }
}

// Usage example
HiveDataLoader loader = new HiveDataLoader(tableEnv);

// Load data in monthly batches
List<String> months = List.of("2023-01", "2023-02", "2023-03", "2023-04");
loader.loadDataInBatches("sales_data", 
    "SELECT * FROM raw_sales", 
    "year_month", 
    months);

// Perform incremental load
loader.incrementalLoad("sales_data", "raw_sales", "transaction_date", "2023-12-01");

// Insert with deduplication
loader.insertWithDeduplication("unique_sales_data", "raw_sales", "id, customer_id");

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-parser-hive

docs

constraint-system.md

data-manipulation.md

database-operations.md

index.md

parser-integration.md

partition-management.md

table-operations.md

type-system.md

utilities.md

view-operations.md

tile.json