0
# Data Manipulation
1
2
Data manipulation provides enhanced INSERT statements with comprehensive partition support for both static and dynamic partitioning in Hive tables.
3
4
## Capabilities
5
6
### Enhanced INSERT Operations
7
8
Enhanced INSERT statement for Hive tables with comprehensive partition support.
9
10
```java { .api }
11
/**
12
* Enhanced INSERT statement for Hive tables with partition support
13
* Supports both static and dynamic partitioning with OVERWRITE option
14
*/
15
public class RichSqlHiveInsert extends RichSqlInsert {
16
/**
17
* Creates a new Hive INSERT statement with partition support
18
* @param pos Parser position information
19
* @param keywords INSERT keywords (INSERT, INSERT OVERWRITE, etc.)
20
* @param extendedKeywords Extended keywords for Hive-specific features
21
* @param targetTable Target table for the insert operation
22
* @param source Source data (SELECT query or VALUES clause)
23
* @param columnList Target column list (optional)
24
* @param staticPartitions Static partition specifications
25
* @param allPartKeys All partition key columns
26
*/
27
public RichSqlHiveInsert(SqlParserPos pos, SqlNodeList keywords, SqlNodeList extendedKeywords,
28
SqlNode targetTable, SqlNode source, SqlNodeList columnList,
29
SqlNodeList staticPartitions, SqlNodeList allPartKeys);
30
}
31
```
32
33
**Usage Examples:**
34
35
```java
36
// Basic INSERT into partitioned table
37
String basicInsertSql = """
38
INSERT INTO TABLE sales_data
39
PARTITION (year=2023, month=12)
40
SELECT id, customer_id, product_name, amount, transaction_date
41
FROM raw_sales_data
42
WHERE YEAR(transaction_date) = 2023
43
AND MONTH(transaction_date) = 12
44
""";
45
46
// INSERT OVERWRITE with static partitions
47
String insertOverwriteSql = """
48
INSERT OVERWRITE TABLE sales_data
49
PARTITION (year=2023, month=12)
50
SELECT id, customer_id, product_name, amount, transaction_date
51
FROM updated_sales_data
52
WHERE YEAR(transaction_date) = 2023
53
AND MONTH(transaction_date) = 12
54
""";
55
56
// Dynamic partition INSERT
57
String dynamicPartitionSql = """
58
INSERT INTO TABLE sales_data
59
PARTITION (year, month)
60
SELECT
61
id,
62
customer_id,
63
product_name,
64
amount,
65
transaction_date,
66
YEAR(transaction_date) as year,
67
MONTH(transaction_date) as month
68
FROM raw_sales_data
69
""";
70
71
// Mixed static and dynamic partitions
72
String mixedPartitionSql = """
73
INSERT INTO TABLE sales_data
74
PARTITION (year=2023, month)
75
SELECT
76
id,
77
customer_id,
78
product_name,
79
amount,
80
transaction_date,
81
MONTH(transaction_date) as month
82
FROM raw_sales_data
83
WHERE YEAR(transaction_date) = 2023
84
""";
85
86
// INSERT with explicit column list
87
String insertWithColumnsSql = """
88
INSERT INTO TABLE sales_data (id, customer_id, amount)
89
PARTITION (year=2023, month=12)
90
SELECT transaction_id, cust_id, total_amount
91
FROM external_data
92
""";
93
```
94
95
## Advanced INSERT Operations
96
97
### Bulk Data Loading
98
99
Efficient bulk data loading patterns for large datasets:
100
101
```java
102
// Bulk insert from external data source
103
String bulkInsertSql = """
104
INSERT OVERWRITE TABLE sales_data
105
PARTITION (year, month)
106
SELECT
107
CAST(id as BIGINT) as id,
108
TRIM(customer_id) as customer_id,
109
UPPER(product_name) as product_name,
110
CAST(amount as DECIMAL(10,2)) as amount,
111
CAST(transaction_date as DATE) as transaction_date,
112
YEAR(CAST(transaction_date as DATE)) as year,
113
MONTH(CAST(transaction_date as DATE)) as month
114
FROM (
115
SELECT
116
raw_id as id,
117
raw_customer as customer_id,
118
raw_product as product_name,
119
raw_amount as amount,
120
raw_date as transaction_date
121
FROM external_sales_table
122
WHERE raw_date IS NOT NULL
123
AND raw_amount > 0
124
AND raw_customer IS NOT NULL
125
) cleaned_data
126
""";
127
```
128
129
### Multi-Table INSERT
130
131
Insert into multiple tables from a single source:
132
133
```java
134
// Multi-table insert pattern
135
String multiTableInsertSql = """
136
FROM raw_transaction_data rtd
137
INSERT INTO TABLE sales_data
138
PARTITION (year, month)
139
SELECT
140
id, customer_id, product_name, amount, transaction_date,
141
YEAR(transaction_date), MONTH(transaction_date)
142
WHERE transaction_type = 'SALE'
143
144
INSERT INTO TABLE refund_data
145
PARTITION (year, month)
146
SELECT
147
id, customer_id, product_name, ABS(amount), transaction_date,
148
YEAR(transaction_date), MONTH(transaction_date)
149
WHERE transaction_type = 'REFUND'
150
""";
151
```
152
153
### Conditional INSERT Operations
154
155
Use conditional logic within INSERT statements:
156
157
```java
158
// Conditional insert with data transformation
159
String conditionalInsertSql = """
160
INSERT INTO TABLE customer_segments
161
PARTITION (segment_date)
162
SELECT
163
customer_id,
164
customer_name,
165
total_spent,
166
transaction_count,
167
CASE
168
WHEN total_spent > 10000 THEN 'Premium'
169
WHEN total_spent > 5000 THEN 'Gold'
170
WHEN total_spent > 1000 THEN 'Silver'
171
ELSE 'Bronze'
172
END as segment,
173
CURRENT_DATE as segment_date
174
FROM (
175
SELECT
176
c.customer_id,
177
c.customer_name,
178
COALESCE(SUM(s.amount), 0) as total_spent,
179
COALESCE(COUNT(s.id), 0) as transaction_count
180
FROM customer_profile c
181
LEFT JOIN sales_data s ON c.customer_id = s.customer_id
182
GROUP BY c.customer_id, c.customer_name
183
) aggregated_data
184
""";
185
```
186
187
## Partition Management in INSERT Operations
188
189
### Dynamic Partition Configuration
190
191
Configure dynamic partition behavior:
192
193
```java
194
// Enable dynamic partitioning (typically set at session level)
195
String enableDynamicPartitions = """
196
SET hive.exec.dynamic.partition = true;
197
SET hive.exec.dynamic.partition.mode = nonstrict;
198
SET hive.exec.max.dynamic.partitions = 10000;
199
SET hive.exec.max.dynamic.partitions.pernode = 1000;
200
""";
201
202
// Dynamic partition insert with configuration
203
String configuredDynamicInsert = """
204
INSERT INTO TABLE time_series_data
205
PARTITION (year, month, day)
206
SELECT
207
event_id,
208
user_id,
209
event_type,
210
event_timestamp,
211
YEAR(event_timestamp) as year,
212
MONTH(event_timestamp) as month,
213
DAY(event_timestamp) as day
214
FROM raw_events
215
WHERE event_timestamp >= '2023-01-01'
216
""";
217
```
218
219
### Partition Pruning and Optimization
220
221
Optimize INSERT operations with partition pruning:
222
223
```java
224
// Optimized insert with partition pruning
225
String optimizedInsertSql = """
226
INSERT INTO TABLE sales_data_optimized
227
PARTITION (year=2023, month, region)
228
SELECT
229
id,
230
customer_id,
231
product_name,
232
amount,
233
transaction_date,
234
MONTH(transaction_date) as month,
235
customer_region as region
236
FROM sales_data
237
WHERE year = 2023 -- Partition pruning on source
238
AND customer_region IN ('US', 'EU', 'APAC') -- Limit partition creation
239
""";
240
```
241
242
## Data Quality and Validation
243
244
### INSERT with Data Quality Checks
245
246
Implement data quality validation during INSERT:
247
248
```java
249
// Insert with data quality validation
250
String qualityCheckedInsertSql = """
251
INSERT INTO TABLE validated_sales_data
252
PARTITION (year, month, quality_flag)
253
SELECT
254
id,
255
customer_id,
256
product_name,
257
amount,
258
transaction_date,
259
YEAR(transaction_date) as year,
260
MONTH(transaction_date) as month,
261
CASE
262
WHEN amount > 0
263
AND customer_id IS NOT NULL
264
AND product_name IS NOT NULL
265
AND transaction_date IS NOT NULL
266
THEN 'VALID'
267
ELSE 'INVALID'
268
END as quality_flag
269
FROM raw_sales_data
270
WHERE transaction_date >= '2023-01-01'
271
""";
272
```
273
274
### Upsert Pattern Implementation
275
276
Implement upsert (insert or update) pattern:
277
278
```java
279
// Upsert pattern using INSERT OVERWRITE
280
String upsertPatternSql = """
281
-- Step 1: Create temporary table with new/updated data
282
CREATE TEMPORARY TABLE temp_updates AS
283
SELECT * FROM new_sales_data;
284
285
-- Step 2: Insert overwrite with merged data
286
INSERT OVERWRITE TABLE sales_data
287
PARTITION (year, month)
288
SELECT
289
COALESCE(updates.id, existing.id) as id,
290
COALESCE(updates.customer_id, existing.customer_id) as customer_id,
291
COALESCE(updates.product_name, existing.product_name) as product_name,
292
COALESCE(updates.amount, existing.amount) as amount,
293
COALESCE(updates.transaction_date, existing.transaction_date) as transaction_date,
294
YEAR(COALESCE(updates.transaction_date, existing.transaction_date)) as year,
295
MONTH(COALESCE(updates.transaction_date, existing.transaction_date)) as month
296
FROM (
297
SELECT * FROM sales_data
298
WHERE (year, month, id) NOT IN (
299
SELECT YEAR(transaction_date), MONTH(transaction_date), id
300
FROM temp_updates
301
)
302
) existing
303
FULL OUTER JOIN temp_updates updates ON existing.id = updates.id;
304
""";
305
```
306
307
## Performance Optimization
308
309
### Batch INSERT Operations
310
311
Optimize performance with batch operations:
312
313
```java
314
public class HiveDataLoader {
315
private TableEnvironment tableEnv;
316
317
public HiveDataLoader(TableEnvironment tableEnv) {
318
this.tableEnv = tableEnv;
319
}
320
321
/**
322
* Loads data in batches to optimize performance
323
*/
324
public void loadDataInBatches(String targetTable, String sourceQuery,
325
String partitionColumn, List<String> partitionValues) {
326
for (String partitionValue : partitionValues) {
327
String batchInsertSql = String.format("""
328
INSERT INTO TABLE %s
329
PARTITION (%s='%s')
330
%s
331
WHERE %s = '%s'
332
""", targetTable, partitionColumn, partitionValue,
333
sourceQuery, partitionColumn, partitionValue);
334
335
try {
336
tableEnv.executeSql(batchInsertSql);
337
System.out.println("Loaded partition: " + partitionValue);
338
} catch (Exception e) {
339
System.err.println("Failed to load partition " + partitionValue + ": " + e.getMessage());
340
}
341
}
342
}
343
344
/**
345
* Performs incremental data loading
346
*/
347
public void incrementalLoad(String targetTable, String sourceTable,
348
String timestampColumn, String lastLoadTimestamp) {
349
String incrementalInsertSql = String.format("""
350
INSERT INTO TABLE %s
351
PARTITION (load_date)
352
SELECT
353
*,
354
CURRENT_DATE as load_date
355
FROM %s
356
WHERE %s > '%s'
357
""", targetTable, sourceTable, timestampColumn, lastLoadTimestamp);
358
359
try {
360
tableEnv.executeSql(incrementalInsertSql);
361
System.out.println("Completed incremental load from " + lastLoadTimestamp);
362
} catch (Exception e) {
363
System.err.println("Incremental load failed: " + e.getMessage());
364
}
365
}
366
367
/**
368
* Handles duplicate detection and deduplication
369
*/
370
public void insertWithDeduplication(String targetTable, String sourceTable, String keyColumns) {
371
String deduplicatedInsertSql = String.format("""
372
INSERT OVERWRITE TABLE %s
373
PARTITION (load_date)
374
SELECT
375
t1.*,
376
CURRENT_DATE as load_date
377
FROM %s t1
378
JOIN (
379
SELECT %s, MAX(transaction_date) as max_date
380
FROM %s
381
GROUP BY %s
382
) t2 ON t1.%s = t2.%s
383
AND t1.transaction_date = t2.max_date
384
""", targetTable, sourceTable, keyColumns, sourceTable,
385
keyColumns, keyColumns, keyColumns);
386
387
try {
388
tableEnv.executeSql(deduplicatedInsertSql);
389
System.out.println("Completed insert with deduplication");
390
} catch (Exception e) {
391
System.err.println("Deduplication insert failed: " + e.getMessage());
392
}
393
}
394
}
395
396
// Usage example
397
HiveDataLoader loader = new HiveDataLoader(tableEnv);
398
399
// Load data in monthly batches
400
List<String> months = List.of("2023-01", "2023-02", "2023-03", "2023-04");
401
loader.loadDataInBatches("sales_data",
402
"SELECT * FROM raw_sales",
403
"year_month",
404
months);
405
406
// Perform incremental load
407
loader.incrementalLoad("sales_data", "raw_sales", "transaction_date", "2023-12-01");
408
409
// Insert with deduplication
410
loader.insertWithDeduplication("unique_sales_data", "raw_sales", "id, customer_id");
411
```