or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-hive-integration.mdexecution-engine.mdfile-formats.mdindex.mdmetastore-operations.mdudf-integration.md

metastore-operations.mddocs/

0

# Metastore Operations

1

2

The Apache Spark Hive integration provides comprehensive metastore operations through the HiveClient interface, enabling full interaction with Hive metastore for database, table, partition, and function management.

3

4

## HiveClient Interface

5

6

The HiveClient trait provides the primary interface for all metastore operations, abstracting the underlying Hive metastore implementation.

7

8

```scala { .api }

9

trait HiveClient {

10

// Version and configuration

11

def version: HiveVersion

12

def getConf(key: String, defaultValue: String): String

13

def setConf(key: String, value: String): Unit

14

15

// SQL execution

16

def runSqlHive(sql: String): Seq[String]

17

18

// Session management

19

def newSession(): HiveClient

20

def reset(): Unit

21

def close(): Unit

22

23

// Database operations

24

def listDatabases(pattern: String): Seq[String]

25

def getDatabase(name: String): CatalogDatabase

26

def databaseExists(dbName: String): Boolean

27

def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit

28

def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

29

def alterDatabase(database: CatalogDatabase): Unit

30

def setCurrentDatabase(databaseName: String): Unit

31

def getCurrentDatabase: String

32

33

// Table operations

34

def listTables(dbName: String): Seq[String]

35

def listTables(dbName: String, pattern: String): Seq[String]

36

def getTable(dbName: String, tableName: String): CatalogTable

37

def tableExists(dbName: String, tableName: String): Boolean

38

def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit

39

def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

40

def alterTable(tableName: String, table: CatalogTable): Unit

41

def renameTable(dbName: String, oldName: String, newName: String): Unit

42

43

// Partition operations

44

def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]

45

def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String]

46

def getPartition(dbName: String, tableName: String, spec: TablePartitionSpec): CatalogTablePartition

47

def getPartitions(db: String, table: String, specs: Seq[TablePartitionSpec]): Seq[CatalogTablePartition]

48

def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit

49

def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit

50

def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit

51

def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit

52

53

// Function operations

54

def listFunctions(db: String, pattern: String): Seq[String]

55

def getFunction(db: String, name: String): CatalogFunction

56

def functionExists(db: String, name: String): Boolean

57

def createFunction(db: String, func: CatalogFunction): Unit

58

def dropFunction(db: String, name: String): Unit

59

def alterFunction(db: String, func: CatalogFunction): Unit

60

61

// Data loading operations

62

def loadTable(loadPath: String, tableName: String, replace: Boolean, isSrcLocal: Boolean): Unit

63

def loadPartition(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit

64

def loadDynamicPartitions(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, numDP: Int): Unit

65

}

66

```

67

68

## Database Operations

69

70

### Creating Databases

71

72

```scala

73

import org.apache.spark.sql.catalyst.catalog.CatalogDatabase

74

75

// Create database through SparkSession

76

spark.sql("CREATE DATABASE IF NOT EXISTS my_database")

77

78

// Create database with properties

79

spark.sql("""

80

CREATE DATABASE analytics_db

81

COMMENT 'Analytics database for reporting'

82

LOCATION '/user/hive/warehouse/analytics.db'

83

WITH DBPROPERTIES ('owner'='analytics_team', 'created'='2023-01-01')

84

""")

85

86

// Create database programmatically (internal API)

87

val database = CatalogDatabase(

88

name = "my_db",

89

description = "My database",

90

locationUri = "/user/hive/warehouse/my_db.db",

91

properties = Map("owner" -> "user", "team" -> "data")

92

)

93

// Note: Direct HiveClient usage is internal API

94

```

95

96

### Managing Databases

97

98

```scala

99

// List databases

100

spark.sql("SHOW DATABASES").show()

101

102

// List databases with pattern

103

spark.sql("SHOW DATABASES LIKE 'analytics_*'").show()

104

105

// Get current database

106

val currentDb = spark.sql("SELECT current_database()").collect()(0).getString(0)

107

108

// Switch database

109

spark.sql("USE analytics_db")

110

111

// Drop database

112

spark.sql("DROP DATABASE IF EXISTS old_database CASCADE")

113

```

114

115

### Database Properties

116

117

```scala

118

// Show database details

119

spark.sql("DESCRIBE DATABASE EXTENDED analytics_db").show()

120

121

// Alter database properties

122

spark.sql("""

123

ALTER DATABASE analytics_db SET DBPROPERTIES ('modified'='2023-12-01', 'version'='2.0')

124

""")

125

126

// Set database location

127

spark.sql("ALTER DATABASE analytics_db SET LOCATION '/new/location/analytics.db'")

128

```

129

130

## Table Operations

131

132

### Creating Tables

133

134

```scala

135

// Create managed table

136

spark.sql("""

137

CREATE TABLE employee (

138

id INT,

139

name STRING,

140

department STRING,

141

salary DOUBLE,

142

hire_date DATE

143

) USING HIVE

144

STORED AS ORC

145

""")

146

147

// Create external table

148

spark.sql("""

149

CREATE TABLE external_employee (

150

id INT,

151

name STRING,

152

department STRING

153

) USING HIVE

154

STORED AS PARQUET

155

LOCATION '/data/external/employee'

156

""")

157

158

// Create partitioned table

159

spark.sql("""

160

CREATE TABLE partitioned_sales (

161

transaction_id STRING,

162

amount DOUBLE,

163

customer_id STRING

164

) USING HIVE

165

PARTITIONED BY (year INT, month INT)

166

STORED AS ORC

167

""")

168

```

169

170

### Table Management

171

172

```scala

173

// List tables

174

spark.sql("SHOW TABLES").show()

175

176

// List tables in specific database

177

spark.sql("SHOW TABLES IN analytics_db").show()

178

179

// List tables with pattern

180

spark.sql("SHOW TABLES LIKE 'employee_*'").show()

181

182

// Check if table exists

183

val tableExists = spark.catalog.tableExists("analytics_db", "employee")

184

185

// Get table information

186

spark.sql("DESCRIBE EXTENDED employee").show()

187

188

// Show table properties

189

spark.sql("SHOW TBLPROPERTIES employee").show()

190

```

191

192

### Altering Tables

193

194

```scala

195

// Add column

196

spark.sql("ALTER TABLE employee ADD COLUMN email STRING")

197

198

// Rename column (Hive 3.0+)

199

spark.sql("ALTER TABLE employee CHANGE COLUMN email email_address STRING")

200

201

// Drop column (Hive 3.0+)

202

spark.sql("ALTER TABLE employee DROP COLUMN email_address")

203

204

// Rename table

205

spark.sql("ALTER TABLE old_employee RENAME TO employee_backup")

206

207

// Set table properties

208

spark.sql("ALTER TABLE employee SET TBLPROPERTIES ('last_modified'='2023-12-01')")

209

210

// Change table location

211

spark.sql("ALTER TABLE external_employee SET LOCATION '/new/data/path'")

212

```

213

214

### Table Statistics

215

216

```scala

217

// Analyze table statistics

218

spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS")

219

220

// Analyze column statistics

221

spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS FOR COLUMNS id, salary")

222

223

// Show table stats

224

spark.sql("DESCRIBE EXTENDED employee").show()

225

```

226

227

## Partition Operations

228

229

### Creating Partitions

230

231

```scala

232

// Add partition

233

spark.sql("ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=12)")

234

235

// Add partition with location

236

spark.sql("""

237

ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=11)

238

LOCATION '/data/sales/2023/11'

239

""")

240

241

// Add multiple partitions

242

spark.sql("""

243

ALTER TABLE partitioned_sales ADD

244

PARTITION (year=2023, month=10)

245

PARTITION (year=2023, month=9)

246

""")

247

```

248

249

### Managing Partitions

250

251

```scala

252

// Show partitions

253

spark.sql("SHOW PARTITIONS partitioned_sales").show()

254

255

// Show partitions with filter

256

spark.sql("SHOW PARTITIONS partitioned_sales PARTITION(year=2023)").show()

257

258

// Drop partition

259

spark.sql("ALTER TABLE partitioned_sales DROP PARTITION (year=2022, month=1)")

260

261

// Drop partition if exists

262

spark.sql("ALTER TABLE partitioned_sales DROP IF EXISTS PARTITION (year=2022, month=2)")

263

```

264

265

### Dynamic Partitioning

266

267

```scala

268

// Enable dynamic partitioning

269

spark.conf.set("hive.exec.dynamic.partition", "true")

270

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

271

272

// Insert with dynamic partitioning

273

spark.sql("""

274

INSERT INTO TABLE partitioned_sales PARTITION(year, month)

275

SELECT transaction_id, amount, customer_id, year(date_col), month(date_col)

276

FROM source_sales

277

""")

278

279

// Overwrite partitions

280

spark.sql("""

281

INSERT OVERWRITE TABLE partitioned_sales PARTITION(year=2023, month)

282

SELECT transaction_id, amount, customer_id, month(date_col)

283

FROM source_sales

284

WHERE year(date_col) = 2023

285

""")

286

```

287

288

### Partition Properties

289

290

```scala

291

// Set partition properties

292

spark.sql("""

293

ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)

294

SET SERDEPROPERTIES ('field.delim'='\t')

295

""")

296

297

// Change partition location

298

spark.sql("""

299

ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)

300

SET LOCATION '/new/partition/location'

301

""")

302

```

303

304

## Function Operations

305

306

### Creating Functions

307

308

```scala

309

// Create temporary function

310

spark.sql("""

311

CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'

312

""")

313

314

// Create function with JAR

315

spark.sql("""

316

CREATE FUNCTION my_database.complex_function AS 'com.example.ComplexUDF'

317

USING JAR '/path/to/udf.jar'

318

""")

319

320

// Create function with multiple resources

321

spark.sql("""

322

CREATE FUNCTION analytics.ml_predict AS 'com.example.MLPredictUDF'

323

USING JAR '/path/to/ml-udf.jar',

324

FILE '/path/to/model.pkl'

325

""")

326

```

327

328

### Managing Functions

329

330

```scala

331

// List functions

332

spark.sql("SHOW FUNCTIONS").show()

333

334

// List functions with pattern

335

spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()

336

337

// List user-defined functions only

338

spark.sql("SHOW USER FUNCTIONS").show()

339

340

// Describe function

341

spark.sql("DESCRIBE FUNCTION my_upper").show()

342

343

// Show extended function info

344

spark.sql("DESCRIBE FUNCTION EXTENDED my_upper").show()

345

```

346

347

### Function Usage

348

349

```scala

350

// Use UDF in query

351

val result = spark.sql("""

352

SELECT my_upper(name) as upper_name

353

FROM employee

354

WHERE my_upper(name) LIKE 'JOHN%'

355

""")

356

357

// Use aggregate UDF

358

val avgResult = spark.sql("""

359

SELECT department, my_avg(salary) as avg_salary

360

FROM employee

361

GROUP BY department

362

""")

363

```

364

365

## Data Loading Operations

366

367

### Loading Data into Tables

368

369

```scala

370

// Load data from local file

371

spark.sql("""

372

LOAD DATA LOCAL INPATH '/local/path/data.txt'

373

INTO TABLE employee

374

""")

375

376

// Load data from HDFS

377

spark.sql("""

378

LOAD DATA INPATH '/hdfs/path/data.txt'

379

INTO TABLE employee

380

""")

381

382

// Load data with overwrite

383

spark.sql("""

384

LOAD DATA INPATH '/hdfs/path/new_data.txt'

385

OVERWRITE INTO TABLE employee

386

""")

387

```

388

389

### Loading Partitioned Data

390

391

```scala

392

// Load into specific partition

393

spark.sql("""

394

LOAD DATA INPATH '/data/2023/12/sales.txt'

395

INTO TABLE partitioned_sales PARTITION (year=2023, month=12)

396

""")

397

398

// Load with partition overwrite

399

spark.sql("""

400

LOAD DATA INPATH '/data/2023/11/sales.txt'

401

OVERWRITE INTO TABLE partitioned_sales PARTITION (year=2023, month=11)

402

""")

403

```

404

405

## Version Compatibility

406

407

### Supported Hive Versions

408

409

The metastore client supports multiple Hive versions:

410

411

```scala { .api }

412

abstract class HiveVersion {

413

def fullVersion: String

414

def extraDeps: Seq[String]

415

def exclusions: Seq[String]

416

}

417

418

object HiveVersion {

419

val v12 = hive.v12 // 0.12.0

420

val v13 = hive.v13 // 0.13.1

421

val v14 = hive.v14 // 0.14.0

422

val v1_0 = hive.v1_0 // 1.0.0

423

val v1_1 = hive.v1_1 // 1.1.0

424

val v1_2 = hive.v1_2 // 1.2.1

425

val v2_0 = hive.v2_0 // 2.0.1

426

val v2_1 = hive.v2_1 // 2.1.1

427

}

428

```

429

430

### Configuration for Different Versions

431

432

```scala

433

// Set metastore version

434

spark.conf.set("spark.sql.hive.metastore.version", "2.1.1")

435

436

// Configure for specific Hive installation

437

spark.conf.set("spark.sql.hive.metastore.jars", "/path/to/hive/lib/*")

438

439

// Use Maven for version-specific JARs

440

spark.conf.set("spark.sql.hive.metastore.jars", "maven")

441

```

442

443

## Error Handling

444

445

### Common Metastore Errors

446

447

**Connection Issues:**

448

```scala

449

// Configure metastore URI

450

spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")

451

452

// Set connection timeout

453

spark.conf.set("hive.metastore.client.connect.retry.delay", "5s")

454

spark.conf.set("hive.metastore.client.socket.timeout", "1800s")

455

```

456

457

**Permission Errors:**

458

```scala

459

// Configure metastore authentication

460

spark.conf.set("hive.metastore.sasl.enabled", "true")

461

spark.conf.set("hive.metastore.kerberos.principal", "hive/_HOST@REALM")

462

```

463

464

**Schema Validation:**

465

```scala

466

// Disable schema validation for development

467

spark.conf.set("hive.metastore.schema.verification", "false")

468

469

// Enable auto schema migration

470

spark.conf.set("hive.metastore.schema.verification.record.version", "false")

471

```

472

473

## Performance Optimization

474

475

### Metastore Performance

476

477

```scala

478

// Enable metastore caching

479

spark.conf.set("hive.metastore.cache.pinobjtypes", "Table,Database,Type,FieldSchema,Order")

480

481

// Configure connection pooling

482

spark.conf.set("hive.metastore.ds.connection.url.max.connections", "10")

483

484

// Set batch fetch size

485

spark.conf.set("hive.metastore.batch.retrieve.max", "300")

486

```

487

488

### Partition Pruning

489

490

```scala

491

// Enable partition pruning

492

spark.conf.set("spark.sql.hive.metastorePartitionPruning", "true")

493

494

// Configure partition discovery

495

spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.threshold", "32")

496

```

497

498

## Types

499

500

```scala { .api }

501

// Database catalog entry

502

case class CatalogDatabase(

503

name: String,

504

description: String,

505

locationUri: String,

506

properties: Map[String, String]

507

)

508

509

// Table catalog entry

510

case class CatalogTable(

511

identifier: TableIdentifier,

512

tableType: CatalogTableType,

513

storage: CatalogStorageFormat,

514

schema: StructType,

515

partitionColumnNames: Seq[String] = Seq.empty,

516

bucketSpec: Option[BucketSpec] = None,

517

owner: String = "",

518

createTime: Long = System.currentTimeMillis,

519

lastAccessTime: Long = -1,

520

createVersion: String = "",

521

properties: Map[String, String] = Map.empty,

522

stats: Option[CatalogStatistics] = None,

523

viewText: Option[String] = None,

524

comment: Option[String] = None,

525

unsupportedFeatures: Seq[String] = Seq.empty,

526

tracksPartitionsInCatalog: Boolean = false,

527

schemaPreservesCase: Boolean = true,

528

ignoredProperties: Map[String, String] = Map.empty

529

)

530

531

// Partition catalog entry

532

case class CatalogTablePartition(

533

spec: TablePartitionSpec,

534

storage: CatalogStorageFormat,

535

parameters: Map[String, String] = Map.empty,

536

stats: Option[CatalogStatistics] = None

537

)

538

539

// Function catalog entry

540

case class CatalogFunction(

541

identifier: FunctionIdentifier,

542

className: String,

543

resources: Seq[FunctionResource],

544

description: Option[String] = None

545

)

546

547

// Table partition specification

548

type TablePartitionSpec = Map[String, String]

549

550

// Table identifier

551

case class TableIdentifier(table: String, database: Option[String] = None)

552

553

// Function identifier

554

case class FunctionIdentifier(funcName: String, database: Option[String] = None)

555

556

// Function resource

557

case class FunctionResource(resourceType: FunctionResourceType, uri: String)

558

559

// Storage format specification

560

case class CatalogStorageFormat(

561

locationUri: Option[String] = None,

562

inputFormat: Option[String] = None,

563

outputFormat: Option[String] = None,

564

serde: Option[String] = None,

565

compressed: Boolean = false,

566

properties: Map[String, String] = Map.empty

567

)

568

569

// Table statistics

570

case class CatalogStatistics(

571

sizeInBytes: Long,

572

rowCount: Option[Long] = None,

573

colStats: Map[String, CatalogColumnStat] = Map.empty

574

)

575

```