or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

constraint-system.mddata-manipulation.mddatabase-operations.mdindex.mdparser-integration.mdpartition-management.mdtable-operations.mdtype-system.mdutilities.mdview-operations.md

data-manipulation.mddocs/

0

# Data Manipulation

1

2

Data manipulation provides enhanced INSERT statements with comprehensive partition support for both static and dynamic partitioning in Hive tables.

3

4

## Capabilities

5

6

### Enhanced INSERT Operations

7

8

Enhanced INSERT statement for Hive tables with comprehensive partition support.

9

10

```java { .api }

11

/**

12

* Enhanced INSERT statement for Hive tables with partition support

13

* Supports both static and dynamic partitioning with OVERWRITE option

14

*/

15

public class RichSqlHiveInsert extends RichSqlInsert {

16

/**

17

* Creates a new Hive INSERT statement with partition support

18

* @param pos Parser position information

19

* @param keywords INSERT keywords (INSERT, INSERT OVERWRITE, etc.)

20

* @param extendedKeywords Extended keywords for Hive-specific features

21

* @param targetTable Target table for the insert operation

22

* @param source Source data (SELECT query or VALUES clause)

23

* @param columnList Target column list (optional)

24

* @param staticPartitions Static partition specifications

25

* @param allPartKeys All partition key columns

26

*/

27

public RichSqlHiveInsert(SqlParserPos pos, SqlNodeList keywords, SqlNodeList extendedKeywords,

28

SqlNode targetTable, SqlNode source, SqlNodeList columnList,

29

SqlNodeList staticPartitions, SqlNodeList allPartKeys);

30

}

31

```

32

33

**Usage Examples:**

34

35

```java

36

// Basic INSERT into partitioned table

37

String basicInsertSql = """

38

INSERT INTO TABLE sales_data

39

PARTITION (year=2023, month=12)

40

SELECT id, customer_id, product_name, amount, transaction_date

41

FROM raw_sales_data

42

WHERE YEAR(transaction_date) = 2023

43

AND MONTH(transaction_date) = 12

44

""";

45

46

// INSERT OVERWRITE with static partitions

47

String insertOverwriteSql = """

48

INSERT OVERWRITE TABLE sales_data

49

PARTITION (year=2023, month=12)

50

SELECT id, customer_id, product_name, amount, transaction_date

51

FROM updated_sales_data

52

WHERE YEAR(transaction_date) = 2023

53

AND MONTH(transaction_date) = 12

54

""";

55

56

// Dynamic partition INSERT

57

String dynamicPartitionSql = """

58

INSERT INTO TABLE sales_data

59

PARTITION (year, month)

60

SELECT

61

id,

62

customer_id,

63

product_name,

64

amount,

65

transaction_date,

66

YEAR(transaction_date) as year,

67

MONTH(transaction_date) as month

68

FROM raw_sales_data

69

""";

70

71

// Mixed static and dynamic partitions

72

String mixedPartitionSql = """

73

INSERT INTO TABLE sales_data

74

PARTITION (year=2023, month)

75

SELECT

76

id,

77

customer_id,

78

product_name,

79

amount,

80

transaction_date,

81

MONTH(transaction_date) as month

82

FROM raw_sales_data

83

WHERE YEAR(transaction_date) = 2023

84

""";

85

86

// INSERT with explicit column list

87

String insertWithColumnsSql = """

88

INSERT INTO TABLE sales_data (id, customer_id, amount)

89

PARTITION (year=2023, month=12)

90

SELECT transaction_id, cust_id, total_amount

91

FROM external_data

92

""";

93

```

94

95

## Advanced INSERT Operations

96

97

### Bulk Data Loading

98

99

Efficient bulk data loading patterns for large datasets:

100

101

```java

102

// Bulk insert from external data source

103

String bulkInsertSql = """

104

INSERT OVERWRITE TABLE sales_data

105

PARTITION (year, month)

106

SELECT

107

CAST(id as BIGINT) as id,

108

TRIM(customer_id) as customer_id,

109

UPPER(product_name) as product_name,

110

CAST(amount as DECIMAL(10,2)) as amount,

111

CAST(transaction_date as DATE) as transaction_date,

112

YEAR(CAST(transaction_date as DATE)) as year,

113

MONTH(CAST(transaction_date as DATE)) as month

114

FROM (

115

SELECT

116

raw_id as id,

117

raw_customer as customer_id,

118

raw_product as product_name,

119

raw_amount as amount,

120

raw_date as transaction_date

121

FROM external_sales_table

122

WHERE raw_date IS NOT NULL

123

AND raw_amount > 0

124

AND raw_customer IS NOT NULL

125

) cleaned_data

126

""";

127

```

128

129

### Multi-Table INSERT

130

131

Insert into multiple tables from a single source:

132

133

```java

134

// Multi-table insert pattern

135

String multiTableInsertSql = """

136

FROM raw_transaction_data rtd

137

INSERT INTO TABLE sales_data

138

PARTITION (year, month)

139

SELECT

140

id, customer_id, product_name, amount, transaction_date,

141

YEAR(transaction_date), MONTH(transaction_date)

142

WHERE transaction_type = 'SALE'

143

144

INSERT INTO TABLE refund_data

145

PARTITION (year, month)

146

SELECT

147

id, customer_id, product_name, ABS(amount), transaction_date,

148

YEAR(transaction_date), MONTH(transaction_date)

149

WHERE transaction_type = 'REFUND'

150

""";

151

```

152

153

### Conditional INSERT Operations

154

155

Use conditional logic within INSERT statements:

156

157

```java

158

// Conditional insert with data transformation

159

String conditionalInsertSql = """

160

INSERT INTO TABLE customer_segments

161

PARTITION (segment_date)

162

SELECT

163

customer_id,

164

customer_name,

165

total_spent,

166

transaction_count,

167

CASE

168

WHEN total_spent > 10000 THEN 'Premium'

169

WHEN total_spent > 5000 THEN 'Gold'

170

WHEN total_spent > 1000 THEN 'Silver'

171

ELSE 'Bronze'

172

END as segment,

173

CURRENT_DATE as segment_date

174

FROM (

175

SELECT

176

c.customer_id,

177

c.customer_name,

178

COALESCE(SUM(s.amount), 0) as total_spent,

179

COALESCE(COUNT(s.id), 0) as transaction_count

180

FROM customer_profile c

181

LEFT JOIN sales_data s ON c.customer_id = s.customer_id

182

GROUP BY c.customer_id, c.customer_name

183

) aggregated_data

184

""";

185

```

186

187

## Partition Management in INSERT Operations

188

189

### Dynamic Partition Configuration

190

191

Configure dynamic partition behavior:

192

193

```java

194

// Enable dynamic partitioning (typically set at session level)

195

String enableDynamicPartitions = """

196

SET hive.exec.dynamic.partition = true;

197

SET hive.exec.dynamic.partition.mode = nonstrict;

198

SET hive.exec.max.dynamic.partitions = 10000;

199

SET hive.exec.max.dynamic.partitions.pernode = 1000;

200

""";

201

202

// Dynamic partition insert with configuration

203

String configuredDynamicInsert = """

204

INSERT INTO TABLE time_series_data

205

PARTITION (year, month, day)

206

SELECT

207

event_id,

208

user_id,

209

event_type,

210

event_timestamp,

211

YEAR(event_timestamp) as year,

212

MONTH(event_timestamp) as month,

213

DAY(event_timestamp) as day

214

FROM raw_events

215

WHERE event_timestamp >= '2023-01-01'

216

""";

217

```

218

219

### Partition Pruning and Optimization

220

221

Optimize INSERT operations with partition pruning:

222

223

```java

224

// Optimized insert with partition pruning

225

String optimizedInsertSql = """

226

INSERT INTO TABLE sales_data_optimized

227

PARTITION (year=2023, month, region)

228

SELECT

229

id,

230

customer_id,

231

product_name,

232

amount,

233

transaction_date,

234

MONTH(transaction_date) as month,

235

customer_region as region

236

FROM sales_data

237

WHERE year = 2023 -- Partition pruning on source

238

AND customer_region IN ('US', 'EU', 'APAC') -- Limit partition creation

239

""";

240

```

241

242

## Data Quality and Validation

243

244

### INSERT with Data Quality Checks

245

246

Implement data quality validation during INSERT:

247

248

```java

249

// Insert with data quality validation

250

String qualityCheckedInsertSql = """

251

INSERT INTO TABLE validated_sales_data

252

PARTITION (year, month, quality_flag)

253

SELECT

254

id,

255

customer_id,

256

product_name,

257

amount,

258

transaction_date,

259

YEAR(transaction_date) as year,

260

MONTH(transaction_date) as month,

261

CASE

262

WHEN amount > 0

263

AND customer_id IS NOT NULL

264

AND product_name IS NOT NULL

265

AND transaction_date IS NOT NULL

266

THEN 'VALID'

267

ELSE 'INVALID'

268

END as quality_flag

269

FROM raw_sales_data

270

WHERE transaction_date >= '2023-01-01'

271

""";

272

```

273

274

### Upsert Pattern Implementation

275

276

Implement upsert (insert or update) pattern:

277

278

```java

279

// Upsert pattern using INSERT OVERWRITE

280

String upsertPatternSql = """

281

-- Step 1: Create temporary table with new/updated data

282

CREATE TEMPORARY TABLE temp_updates AS

283

SELECT * FROM new_sales_data;

284

285

-- Step 2: Insert overwrite with merged data

286

INSERT OVERWRITE TABLE sales_data

287

PARTITION (year, month)

288

SELECT

289

COALESCE(updates.id, existing.id) as id,

290

COALESCE(updates.customer_id, existing.customer_id) as customer_id,

291

COALESCE(updates.product_name, existing.product_name) as product_name,

292

COALESCE(updates.amount, existing.amount) as amount,

293

COALESCE(updates.transaction_date, existing.transaction_date) as transaction_date,

294

YEAR(COALESCE(updates.transaction_date, existing.transaction_date)) as year,

295

MONTH(COALESCE(updates.transaction_date, existing.transaction_date)) as month

296

FROM (

297

SELECT * FROM sales_data

298

WHERE (year, month, id) NOT IN (

299

SELECT YEAR(transaction_date), MONTH(transaction_date), id

300

FROM temp_updates

301

)

302

) existing

303

FULL OUTER JOIN temp_updates updates ON existing.id = updates.id;

304

""";

305

```

306

307

## Performance Optimization

308

309

### Batch INSERT Operations

310

311

Optimize performance with batch operations:

312

313

```java

314

public class HiveDataLoader {

315

private TableEnvironment tableEnv;

316

317

public HiveDataLoader(TableEnvironment tableEnv) {

318

this.tableEnv = tableEnv;

319

}

320

321

/**

322

* Loads data in batches to optimize performance

323

*/

324

public void loadDataInBatches(String targetTable, String sourceQuery,

325

String partitionColumn, List<String> partitionValues) {

326

for (String partitionValue : partitionValues) {

327

String batchInsertSql = String.format("""

328

INSERT INTO TABLE %s

329

PARTITION (%s='%s')

330

%s

331

WHERE %s = '%s'

332

""", targetTable, partitionColumn, partitionValue,

333

sourceQuery, partitionColumn, partitionValue);

334

335

try {

336

tableEnv.executeSql(batchInsertSql);

337

System.out.println("Loaded partition: " + partitionValue);

338

} catch (Exception e) {

339

System.err.println("Failed to load partition " + partitionValue + ": " + e.getMessage());

340

}

341

}

342

}

343

344

/**

345

* Performs incremental data loading

346

*/

347

public void incrementalLoad(String targetTable, String sourceTable,

348

String timestampColumn, String lastLoadTimestamp) {

349

String incrementalInsertSql = String.format("""

350

INSERT INTO TABLE %s

351

PARTITION (load_date)

352

SELECT

353

*,

354

CURRENT_DATE as load_date

355

FROM %s

356

WHERE %s > '%s'

357

""", targetTable, sourceTable, timestampColumn, lastLoadTimestamp);

358

359

try {

360

tableEnv.executeSql(incrementalInsertSql);

361

System.out.println("Completed incremental load from " + lastLoadTimestamp);

362

} catch (Exception e) {

363

System.err.println("Incremental load failed: " + e.getMessage());

364

}

365

}

366

367

/**

368

* Handles duplicate detection and deduplication

369

*/

370

public void insertWithDeduplication(String targetTable, String sourceTable, String keyColumns) {

371

String deduplicatedInsertSql = String.format("""

372

INSERT OVERWRITE TABLE %s

373

PARTITION (load_date)

374

SELECT

375

t1.*,

376

CURRENT_DATE as load_date

377

FROM %s t1

378

JOIN (

379

SELECT %s, MAX(transaction_date) as max_date

380

FROM %s

381

GROUP BY %s

382

) t2 ON t1.%s = t2.%s

383

AND t1.transaction_date = t2.max_date

384

""", targetTable, sourceTable, keyColumns, sourceTable,

385

keyColumns, keyColumns, keyColumns);

386

387

try {

388

tableEnv.executeSql(deduplicatedInsertSql);

389

System.out.println("Completed insert with deduplication");

390

} catch (Exception e) {

391

System.err.println("Deduplication insert failed: " + e.getMessage());

392

}

393

}

394

}

395

396

// Usage example

397

HiveDataLoader loader = new HiveDataLoader(tableEnv);

398

399

// Load data in monthly batches

400

List<String> months = List.of("2023-01", "2023-02", "2023-03", "2023-04");

401

loader.loadDataInBatches("sales_data",

402

"SELECT * FROM raw_sales",

403

"year_month",

404

months);

405

406

// Perform incremental load

407

loader.incrementalLoad("sales_data", "raw_sales", "transaction_date", "2023-12-01");

408

409

// Insert with deduplication

410

loader.insertWithDeduplication("unique_sales_data", "raw_sales", "id, customer_id");

411

```