0
# Metastore Operations
1
2
The Apache Spark Hive integration provides comprehensive metastore operations through the HiveClient interface, enabling full interaction with Hive metastore for database, table, partition, and function management.
3
4
## HiveClient Interface
5
6
The HiveClient trait provides the primary interface for all metastore operations, abstracting the underlying Hive metastore implementation.
7
8
```scala { .api }
9
trait HiveClient {
10
// Version and configuration
11
def version: HiveVersion
12
def getConf(key: String, defaultValue: String): String
13
def setConf(key: String, value: String): Unit
14
15
// SQL execution
16
def runSqlHive(sql: String): Seq[String]
17
18
// Session management
19
def newSession(): HiveClient
20
def reset(): Unit
21
def close(): Unit
22
23
// Database operations
24
def listDatabases(pattern: String): Seq[String]
25
def getDatabase(name: String): CatalogDatabase
26
def databaseExists(dbName: String): Boolean
27
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
28
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
29
def alterDatabase(database: CatalogDatabase): Unit
30
def setCurrentDatabase(databaseName: String): Unit
31
def getCurrentDatabase: String
32
33
// Table operations
34
def listTables(dbName: String): Seq[String]
35
def listTables(dbName: String, pattern: String): Seq[String]
36
def getTable(dbName: String, tableName: String): CatalogTable
37
def tableExists(dbName: String, tableName: String): Boolean
38
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
39
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
40
def alterTable(tableName: String, table: CatalogTable): Unit
41
def renameTable(dbName: String, oldName: String, newName: String): Unit
42
43
// Partition operations
44
def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
45
def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String]
46
def getPartition(dbName: String, tableName: String, spec: TablePartitionSpec): CatalogTablePartition
47
def getPartitions(db: String, table: String, specs: Seq[TablePartitionSpec]): Seq[CatalogTablePartition]
48
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit
49
def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit
50
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit
51
def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit
52
53
// Function operations
54
def listFunctions(db: String, pattern: String): Seq[String]
55
def getFunction(db: String, name: String): CatalogFunction
56
def functionExists(db: String, name: String): Boolean
57
def createFunction(db: String, func: CatalogFunction): Unit
58
def dropFunction(db: String, name: String): Unit
59
def alterFunction(db: String, func: CatalogFunction): Unit
60
61
// Data loading operations
62
def loadTable(loadPath: String, tableName: String, replace: Boolean, isSrcLocal: Boolean): Unit
63
def loadPartition(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit
64
def loadDynamicPartitions(loadPath: String, dbName: String, tableName: String, partSpec: java.util.LinkedHashMap[String, String], replace: Boolean, numDP: Int): Unit
65
}
66
```
67
68
## Database Operations
69
70
### Creating Databases
71
72
```scala
73
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
74
75
// Create database through SparkSession
76
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")
77
78
// Create database with properties
79
spark.sql("""
80
CREATE DATABASE analytics_db
81
COMMENT 'Analytics database for reporting'
82
LOCATION '/user/hive/warehouse/analytics.db'
83
WITH DBPROPERTIES ('owner'='analytics_team', 'created'='2023-01-01')
84
""")
85
86
// Create database programmatically (internal API)
87
val database = CatalogDatabase(
88
name = "my_db",
89
description = "My database",
90
locationUri = "/user/hive/warehouse/my_db.db",
91
properties = Map("owner" -> "user", "team" -> "data")
92
)
93
// Note: Direct HiveClient usage is internal API
94
```
95
96
### Managing Databases
97
98
```scala
99
// List databases
100
spark.sql("SHOW DATABASES").show()
101
102
// List databases with pattern
103
spark.sql("SHOW DATABASES LIKE 'analytics_*'").show()
104
105
// Get current database
106
val currentDb = spark.sql("SELECT current_database()").collect()(0).getString(0)
107
108
// Switch database
109
spark.sql("USE analytics_db")
110
111
// Drop database
112
spark.sql("DROP DATABASE IF EXISTS old_database CASCADE")
113
```
114
115
### Database Properties
116
117
```scala
118
// Show database details
119
spark.sql("DESCRIBE DATABASE EXTENDED analytics_db").show()
120
121
// Alter database properties
122
spark.sql("""
123
ALTER DATABASE analytics_db SET DBPROPERTIES ('modified'='2023-12-01', 'version'='2.0')
124
""")
125
126
// Set database location
127
spark.sql("ALTER DATABASE analytics_db SET LOCATION '/new/location/analytics.db'")
128
```
129
130
## Table Operations
131
132
### Creating Tables
133
134
```scala
135
// Create managed table
136
spark.sql("""
137
CREATE TABLE employee (
138
id INT,
139
name STRING,
140
department STRING,
141
salary DOUBLE,
142
hire_date DATE
143
) USING HIVE
144
STORED AS ORC
145
""")
146
147
// Create external table
148
spark.sql("""
149
CREATE TABLE external_employee (
150
id INT,
151
name STRING,
152
department STRING
153
) USING HIVE
154
STORED AS PARQUET
155
LOCATION '/data/external/employee'
156
""")
157
158
// Create partitioned table
159
spark.sql("""
160
CREATE TABLE partitioned_sales (
161
transaction_id STRING,
162
amount DOUBLE,
163
customer_id STRING
164
) USING HIVE
165
PARTITIONED BY (year INT, month INT)
166
STORED AS ORC
167
""")
168
```
169
170
### Table Management
171
172
```scala
173
// List tables
174
spark.sql("SHOW TABLES").show()
175
176
// List tables in specific database
177
spark.sql("SHOW TABLES IN analytics_db").show()
178
179
// List tables with pattern
180
spark.sql("SHOW TABLES LIKE 'employee_*'").show()
181
182
// Check if table exists
183
val tableExists = spark.catalog.tableExists("analytics_db", "employee")
184
185
// Get table information
186
spark.sql("DESCRIBE EXTENDED employee").show()
187
188
// Show table properties
189
spark.sql("SHOW TBLPROPERTIES employee").show()
190
```
191
192
### Altering Tables
193
194
```scala
195
// Add column
196
spark.sql("ALTER TABLE employee ADD COLUMN email STRING")
197
198
// Rename column (Hive 3.0+)
199
spark.sql("ALTER TABLE employee CHANGE COLUMN email email_address STRING")
200
201
// Drop column (Hive 3.0+)
202
spark.sql("ALTER TABLE employee DROP COLUMN email_address")
203
204
// Rename table
205
spark.sql("ALTER TABLE old_employee RENAME TO employee_backup")
206
207
// Set table properties
208
spark.sql("ALTER TABLE employee SET TBLPROPERTIES ('last_modified'='2023-12-01')")
209
210
// Change table location
211
spark.sql("ALTER TABLE external_employee SET LOCATION '/new/data/path'")
212
```
213
214
### Table Statistics
215
216
```scala
217
// Analyze table statistics
218
spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS")
219
220
// Analyze column statistics
221
spark.sql("ANALYZE TABLE employee COMPUTE STATISTICS FOR COLUMNS id, salary")
222
223
// Show table stats
224
spark.sql("DESCRIBE EXTENDED employee").show()
225
```
226
227
## Partition Operations
228
229
### Creating Partitions
230
231
```scala
232
// Add partition
233
spark.sql("ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=12)")
234
235
// Add partition with location
236
spark.sql("""
237
ALTER TABLE partitioned_sales ADD PARTITION (year=2023, month=11)
238
LOCATION '/data/sales/2023/11'
239
""")
240
241
// Add multiple partitions
242
spark.sql("""
243
ALTER TABLE partitioned_sales ADD
244
PARTITION (year=2023, month=10)
245
PARTITION (year=2023, month=9)
246
""")
247
```
248
249
### Managing Partitions
250
251
```scala
252
// Show partitions
253
spark.sql("SHOW PARTITIONS partitioned_sales").show()
254
255
// Show partitions with filter
256
spark.sql("SHOW PARTITIONS partitioned_sales PARTITION(year=2023)").show()
257
258
// Drop partition
259
spark.sql("ALTER TABLE partitioned_sales DROP PARTITION (year=2022, month=1)")
260
261
// Drop partition if exists
262
spark.sql("ALTER TABLE partitioned_sales DROP IF EXISTS PARTITION (year=2022, month=2)")
263
```
264
265
### Dynamic Partitioning
266
267
```scala
268
// Enable dynamic partitioning
269
spark.conf.set("hive.exec.dynamic.partition", "true")
270
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
271
272
// Insert with dynamic partitioning
273
spark.sql("""
274
INSERT INTO TABLE partitioned_sales PARTITION(year, month)
275
SELECT transaction_id, amount, customer_id, year(date_col), month(date_col)
276
FROM source_sales
277
""")
278
279
// Overwrite partitions
280
spark.sql("""
281
INSERT OVERWRITE TABLE partitioned_sales PARTITION(year=2023, month)
282
SELECT transaction_id, amount, customer_id, month(date_col)
283
FROM source_sales
284
WHERE year(date_col) = 2023
285
""")
286
```
287
288
### Partition Properties
289
290
```scala
291
// Set partition properties
292
spark.sql("""
293
ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)
294
SET SERDEPROPERTIES ('field.delim'='\t')
295
""")
296
297
// Change partition location
298
spark.sql("""
299
ALTER TABLE partitioned_sales PARTITION (year=2023, month=12)
300
SET LOCATION '/new/partition/location'
301
""")
302
```
303
304
## Function Operations
305
306
### Creating Functions
307
308
```scala
309
// Create temporary function
310
spark.sql("""
311
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.UpperCaseUDF'
312
""")
313
314
// Create function with JAR
315
spark.sql("""
316
CREATE FUNCTION my_database.complex_function AS 'com.example.ComplexUDF'
317
USING JAR '/path/to/udf.jar'
318
""")
319
320
// Create function with multiple resources
321
spark.sql("""
322
CREATE FUNCTION analytics.ml_predict AS 'com.example.MLPredictUDF'
323
USING JAR '/path/to/ml-udf.jar',
324
FILE '/path/to/model.pkl'
325
""")
326
```
327
328
### Managing Functions
329
330
```scala
331
// List functions
332
spark.sql("SHOW FUNCTIONS").show()
333
334
// List functions with pattern
335
spark.sql("SHOW FUNCTIONS LIKE 'my_*'").show()
336
337
// List user-defined functions only
338
spark.sql("SHOW USER FUNCTIONS").show()
339
340
// Describe function
341
spark.sql("DESCRIBE FUNCTION my_upper").show()
342
343
// Show extended function info
344
spark.sql("DESCRIBE FUNCTION EXTENDED my_upper").show()
345
```
346
347
### Function Usage
348
349
```scala
350
// Use UDF in query
351
val result = spark.sql("""
352
SELECT my_upper(name) as upper_name
353
FROM employee
354
WHERE my_upper(name) LIKE 'JOHN%'
355
""")
356
357
// Use aggregate UDF
358
val avgResult = spark.sql("""
359
SELECT department, my_avg(salary) as avg_salary
360
FROM employee
361
GROUP BY department
362
""")
363
```
364
365
## Data Loading Operations
366
367
### Loading Data into Tables
368
369
```scala
370
// Load data from local file
371
spark.sql("""
372
LOAD DATA LOCAL INPATH '/local/path/data.txt'
373
INTO TABLE employee
374
""")
375
376
// Load data from HDFS
377
spark.sql("""
378
LOAD DATA INPATH '/hdfs/path/data.txt'
379
INTO TABLE employee
380
""")
381
382
// Load data with overwrite
383
spark.sql("""
384
LOAD DATA INPATH '/hdfs/path/new_data.txt'
385
OVERWRITE INTO TABLE employee
386
""")
387
```
388
389
### Loading Partitioned Data
390
391
```scala
392
// Load into specific partition
393
spark.sql("""
394
LOAD DATA INPATH '/data/2023/12/sales.txt'
395
INTO TABLE partitioned_sales PARTITION (year=2023, month=12)
396
""")
397
398
// Load with partition overwrite
399
spark.sql("""
400
LOAD DATA INPATH '/data/2023/11/sales.txt'
401
OVERWRITE INTO TABLE partitioned_sales PARTITION (year=2023, month=11)
402
""")
403
```
404
405
## Version Compatibility
406
407
### Supported Hive Versions
408
409
The metastore client supports multiple Hive versions:
410
411
```scala { .api }
412
abstract class HiveVersion {
413
def fullVersion: String
414
def extraDeps: Seq[String]
415
def exclusions: Seq[String]
416
}
417
418
object HiveVersion {
419
val v12 = hive.v12 // 0.12.0
420
val v13 = hive.v13 // 0.13.1
421
val v14 = hive.v14 // 0.14.0
422
val v1_0 = hive.v1_0 // 1.0.0
423
val v1_1 = hive.v1_1 // 1.1.0
424
val v1_2 = hive.v1_2 // 1.2.1
425
val v2_0 = hive.v2_0 // 2.0.1
426
val v2_1 = hive.v2_1 // 2.1.1
427
}
428
```
429
430
### Configuration for Different Versions
431
432
```scala
433
// Set metastore version
434
spark.conf.set("spark.sql.hive.metastore.version", "2.1.1")
435
436
// Configure for specific Hive installation
437
spark.conf.set("spark.sql.hive.metastore.jars", "/path/to/hive/lib/*")
438
439
// Use Maven for version-specific JARs
440
spark.conf.set("spark.sql.hive.metastore.jars", "maven")
441
```
442
443
## Error Handling
444
445
### Common Metastore Errors
446
447
**Connection Issues:**
448
```scala
449
// Configure metastore URI
450
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")
451
452
// Set connection timeout
453
spark.conf.set("hive.metastore.client.connect.retry.delay", "5s")
454
spark.conf.set("hive.metastore.client.socket.timeout", "1800s")
455
```
456
457
**Permission Errors:**
458
```scala
459
// Configure metastore authentication
460
spark.conf.set("hive.metastore.sasl.enabled", "true")
461
spark.conf.set("hive.metastore.kerberos.principal", "hive/_HOST@REALM")
462
```
463
464
**Schema Validation:**
465
```scala
466
// Disable schema validation for development
467
spark.conf.set("hive.metastore.schema.verification", "false")
468
469
// Enable auto schema migration
470
spark.conf.set("hive.metastore.schema.verification.record.version", "false")
471
```
472
473
## Performance Optimization
474
475
### Metastore Performance
476
477
```scala
478
// Enable metastore caching
479
spark.conf.set("hive.metastore.cache.pinobjtypes", "Table,Database,Type,FieldSchema,Order")
480
481
// Configure connection pooling
482
spark.conf.set("hive.metastore.ds.connection.url.max.connections", "10")
483
484
// Set batch fetch size
485
spark.conf.set("hive.metastore.batch.retrieve.max", "300")
486
```
487
488
### Partition Pruning
489
490
```scala
491
// Enable partition pruning
492
spark.conf.set("spark.sql.hive.metastorePartitionPruning", "true")
493
494
// Configure partition discovery
495
spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.threshold", "32")
496
```
497
498
## Types
499
500
```scala { .api }
501
// Database catalog entry
502
case class CatalogDatabase(
503
name: String,
504
description: String,
505
locationUri: String,
506
properties: Map[String, String]
507
)
508
509
// Table catalog entry
510
case class CatalogTable(
511
identifier: TableIdentifier,
512
tableType: CatalogTableType,
513
storage: CatalogStorageFormat,
514
schema: StructType,
515
partitionColumnNames: Seq[String] = Seq.empty,
516
bucketSpec: Option[BucketSpec] = None,
517
owner: String = "",
518
createTime: Long = System.currentTimeMillis,
519
lastAccessTime: Long = -1,
520
createVersion: String = "",
521
properties: Map[String, String] = Map.empty,
522
stats: Option[CatalogStatistics] = None,
523
viewText: Option[String] = None,
524
comment: Option[String] = None,
525
unsupportedFeatures: Seq[String] = Seq.empty,
526
tracksPartitionsInCatalog: Boolean = false,
527
schemaPreservesCase: Boolean = true,
528
ignoredProperties: Map[String, String] = Map.empty
529
)
530
531
// Partition catalog entry
532
case class CatalogTablePartition(
533
spec: TablePartitionSpec,
534
storage: CatalogStorageFormat,
535
parameters: Map[String, String] = Map.empty,
536
stats: Option[CatalogStatistics] = None
537
)
538
539
// Function catalog entry
540
case class CatalogFunction(
541
identifier: FunctionIdentifier,
542
className: String,
543
resources: Seq[FunctionResource],
544
description: Option[String] = None
545
)
546
547
// Table partition specification
548
type TablePartitionSpec = Map[String, String]
549
550
// Table identifier
551
case class TableIdentifier(table: String, database: Option[String] = None)
552
553
// Function identifier
554
case class FunctionIdentifier(funcName: String, database: Option[String] = None)
555
556
// Function resource
557
case class FunctionResource(resourceType: FunctionResourceType, uri: String)
558
559
// Storage format specification
560
case class CatalogStorageFormat(
561
locationUri: Option[String] = None,
562
inputFormat: Option[String] = None,
563
outputFormat: Option[String] = None,
564
serde: Option[String] = None,
565
compressed: Boolean = false,
566
properties: Map[String, String] = Map.empty
567
)
568
569
// Table statistics
570
case class CatalogStatistics(
571
sizeInBytes: Long,
572
rowCount: Option[Long] = None,
573
colStats: Map[String, CatalogColumnStat] = Map.empty
574
)
575
```