SQL parser component for Apache Flink that provides Hive dialect support for parsing Hive-specific DDL and DML statements
—
Data manipulation provides enhanced INSERT statements with comprehensive partition support for both static and dynamic partitioning in Hive tables.
Enhanced INSERT statement for Hive tables with comprehensive partition support.
/**
* Enhanced INSERT statement for Hive tables with partition support
* Supports both static and dynamic partitioning with OVERWRITE option
*/
public class RichSqlHiveInsert extends RichSqlInsert {
/**
* Creates a new Hive INSERT statement with partition support
* @param pos Parser position information
* @param keywords INSERT keywords (INSERT, INSERT OVERWRITE, etc.)
* @param extendedKeywords Extended keywords for Hive-specific features
* @param targetTable Target table for the insert operation
* @param source Source data (SELECT query or VALUES clause)
* @param columnList Target column list (optional)
* @param staticPartitions Static partition specifications
* @param allPartKeys All partition key columns
*/
public RichSqlHiveInsert(SqlParserPos pos, SqlNodeList keywords, SqlNodeList extendedKeywords,
SqlNode targetTable, SqlNode source, SqlNodeList columnList,
SqlNodeList staticPartitions, SqlNodeList allPartKeys);
}Usage Examples:
// Basic INSERT into partitioned table
String basicInsertSql = """
INSERT INTO TABLE sales_data
PARTITION (year=2023, month=12)
SELECT id, customer_id, product_name, amount, transaction_date
FROM raw_sales_data
WHERE YEAR(transaction_date) = 2023
AND MONTH(transaction_date) = 12
""";
// INSERT OVERWRITE with static partitions
String insertOverwriteSql = """
INSERT OVERWRITE TABLE sales_data
PARTITION (year=2023, month=12)
SELECT id, customer_id, product_name, amount, transaction_date
FROM updated_sales_data
WHERE YEAR(transaction_date) = 2023
AND MONTH(transaction_date) = 12
""";
// Dynamic partition INSERT
String dynamicPartitionSql = """
INSERT INTO TABLE sales_data
PARTITION (year, month)
SELECT
id,
customer_id,
product_name,
amount,
transaction_date,
YEAR(transaction_date) as year,
MONTH(transaction_date) as month
FROM raw_sales_data
""";
// Mixed static and dynamic partitions
String mixedPartitionSql = """
INSERT INTO TABLE sales_data
PARTITION (year=2023, month)
SELECT
id,
customer_id,
product_name,
amount,
transaction_date,
MONTH(transaction_date) as month
FROM raw_sales_data
WHERE YEAR(transaction_date) = 2023
""";
// INSERT with explicit column list
String insertWithColumnsSql = """
INSERT INTO TABLE sales_data (id, customer_id, amount)
PARTITION (year=2023, month=12)
SELECT transaction_id, cust_id, total_amount
FROM external_data
""";Efficient bulk data loading patterns for large datasets:
// Bulk insert from external data source
String bulkInsertSql = """
INSERT OVERWRITE TABLE sales_data
PARTITION (year, month)
SELECT
CAST(id as BIGINT) as id,
TRIM(customer_id) as customer_id,
UPPER(product_name) as product_name,
CAST(amount as DECIMAL(10,2)) as amount,
CAST(transaction_date as DATE) as transaction_date,
YEAR(CAST(transaction_date as DATE)) as year,
MONTH(CAST(transaction_date as DATE)) as month
FROM (
SELECT
raw_id as id,
raw_customer as customer_id,
raw_product as product_name,
raw_amount as amount,
raw_date as transaction_date
FROM external_sales_table
WHERE raw_date IS NOT NULL
AND raw_amount > 0
AND raw_customer IS NOT NULL
) cleaned_data
""";Insert into multiple tables from a single source:
// Multi-table insert pattern
String multiTableInsertSql = """
FROM raw_transaction_data rtd
INSERT INTO TABLE sales_data
PARTITION (year, month)
SELECT
id, customer_id, product_name, amount, transaction_date,
YEAR(transaction_date), MONTH(transaction_date)
WHERE transaction_type = 'SALE'
INSERT INTO TABLE refund_data
PARTITION (year, month)
SELECT
id, customer_id, product_name, ABS(amount), transaction_date,
YEAR(transaction_date), MONTH(transaction_date)
WHERE transaction_type = 'REFUND'
""";Use conditional logic within INSERT statements:
// Conditional insert with data transformation
String conditionalInsertSql = """
INSERT INTO TABLE customer_segments
PARTITION (segment_date)
SELECT
customer_id,
customer_name,
total_spent,
transaction_count,
CASE
WHEN total_spent > 10000 THEN 'Premium'
WHEN total_spent > 5000 THEN 'Gold'
WHEN total_spent > 1000 THEN 'Silver'
ELSE 'Bronze'
END as segment,
CURRENT_DATE as segment_date
FROM (
SELECT
c.customer_id,
c.customer_name,
COALESCE(SUM(s.amount), 0) as total_spent,
COALESCE(COUNT(s.id), 0) as transaction_count
FROM customer_profile c
LEFT JOIN sales_data s ON c.customer_id = s.customer_id
GROUP BY c.customer_id, c.customer_name
) aggregated_data
""";Configure dynamic partition behavior:
// Enable dynamic partitioning (typically set at session level)
String enableDynamicPartitions = """
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.exec.max.dynamic.partitions = 10000;
SET hive.exec.max.dynamic.partitions.pernode = 1000;
""";
// Dynamic partition insert with configuration
String configuredDynamicInsert = """
INSERT INTO TABLE time_series_data
PARTITION (year, month, day)
SELECT
event_id,
user_id,
event_type,
event_timestamp,
YEAR(event_timestamp) as year,
MONTH(event_timestamp) as month,
DAY(event_timestamp) as day
FROM raw_events
WHERE event_timestamp >= '2023-01-01'
""";Optimize INSERT operations with partition pruning:
// Optimized insert with partition pruning
String optimizedInsertSql = """
INSERT INTO TABLE sales_data_optimized
PARTITION (year=2023, month, region)
SELECT
id,
customer_id,
product_name,
amount,
transaction_date,
MONTH(transaction_date) as month,
customer_region as region
FROM sales_data
WHERE year = 2023 -- Partition pruning on source
AND customer_region IN ('US', 'EU', 'APAC') -- Limit partition creation
""";Implement data quality validation during INSERT:
// Insert with data quality validation
String qualityCheckedInsertSql = """
INSERT INTO TABLE validated_sales_data
PARTITION (year, month, quality_flag)
SELECT
id,
customer_id,
product_name,
amount,
transaction_date,
YEAR(transaction_date) as year,
MONTH(transaction_date) as month,
CASE
WHEN amount > 0
AND customer_id IS NOT NULL
AND product_name IS NOT NULL
AND transaction_date IS NOT NULL
THEN 'VALID'
ELSE 'INVALID'
END as quality_flag
FROM raw_sales_data
WHERE transaction_date >= '2023-01-01'
""";Implement upsert (insert or update) pattern:
// Upsert pattern using INSERT OVERWRITE
String upsertPatternSql = """
-- Step 1: Create temporary table with new/updated data
CREATE TEMPORARY TABLE temp_updates AS
SELECT * FROM new_sales_data;
-- Step 2: Insert overwrite with merged data
INSERT OVERWRITE TABLE sales_data
PARTITION (year, month)
SELECT
COALESCE(updates.id, existing.id) as id,
COALESCE(updates.customer_id, existing.customer_id) as customer_id,
COALESCE(updates.product_name, existing.product_name) as product_name,
COALESCE(updates.amount, existing.amount) as amount,
COALESCE(updates.transaction_date, existing.transaction_date) as transaction_date,
YEAR(COALESCE(updates.transaction_date, existing.transaction_date)) as year,
MONTH(COALESCE(updates.transaction_date, existing.transaction_date)) as month
FROM (
SELECT * FROM sales_data
WHERE (year, month, id) NOT IN (
SELECT YEAR(transaction_date), MONTH(transaction_date), id
FROM temp_updates
)
) existing
FULL OUTER JOIN temp_updates updates ON existing.id = updates.id;
""";Optimize performance with batch operations:
public class HiveDataLoader {
private TableEnvironment tableEnv;
public HiveDataLoader(TableEnvironment tableEnv) {
this.tableEnv = tableEnv;
}
/**
* Loads data in batches to optimize performance
*/
public void loadDataInBatches(String targetTable, String sourceQuery,
String partitionColumn, List<String> partitionValues) {
for (String partitionValue : partitionValues) {
String batchInsertSql = String.format("""
INSERT INTO TABLE %s
PARTITION (%s='%s')
%s
WHERE %s = '%s'
""", targetTable, partitionColumn, partitionValue,
sourceQuery, partitionColumn, partitionValue);
try {
tableEnv.executeSql(batchInsertSql);
System.out.println("Loaded partition: " + partitionValue);
} catch (Exception e) {
System.err.println("Failed to load partition " + partitionValue + ": " + e.getMessage());
}
}
}
/**
* Performs incremental data loading
*/
public void incrementalLoad(String targetTable, String sourceTable,
String timestampColumn, String lastLoadTimestamp) {
String incrementalInsertSql = String.format("""
INSERT INTO TABLE %s
PARTITION (load_date)
SELECT
*,
CURRENT_DATE as load_date
FROM %s
WHERE %s > '%s'
""", targetTable, sourceTable, timestampColumn, lastLoadTimestamp);
try {
tableEnv.executeSql(incrementalInsertSql);
System.out.println("Completed incremental load from " + lastLoadTimestamp);
} catch (Exception e) {
System.err.println("Incremental load failed: " + e.getMessage());
}
}
/**
* Handles duplicate detection and deduplication
*/
public void insertWithDeduplication(String targetTable, String sourceTable, String keyColumns) {
String deduplicatedInsertSql = String.format("""
INSERT OVERWRITE TABLE %s
PARTITION (load_date)
SELECT
t1.*,
CURRENT_DATE as load_date
FROM %s t1
JOIN (
SELECT %s, MAX(transaction_date) as max_date
FROM %s
GROUP BY %s
) t2 ON t1.%s = t2.%s
AND t1.transaction_date = t2.max_date
""", targetTable, sourceTable, keyColumns, sourceTable,
keyColumns, keyColumns, keyColumns);
try {
tableEnv.executeSql(deduplicatedInsertSql);
System.out.println("Completed insert with deduplication");
} catch (Exception e) {
System.err.println("Deduplication insert failed: " + e.getMessage());
}
}
}
// Usage example
HiveDataLoader loader = new HiveDataLoader(tableEnv);
// Load data in monthly batches
List<String> months = List.of("2023-01", "2023-02", "2023-03", "2023-04");
loader.loadDataInBatches("sales_data",
"SELECT * FROM raw_sales",
"year_month",
months);
// Perform incremental load
loader.incrementalLoad("sales_data", "raw_sales", "transaction_date", "2023-12-01");
// Insert with deduplication
loader.insertWithDeduplication("unique_sales_data", "raw_sales", "id, customer_id");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parser-hive