0
# Metastore Operations
1
2
Direct access to Hive metastore for programmatic database, table, partition, and function management through the HiveClient interface. This provides low-level control over Hive metastore operations beyond what's available through SQL.
3
4
## Capabilities
5
6
### HiveClient Interface
7
8
Core interface for interacting with Hive metastore programmatically.
9
10
```scala { .api }
11
/**
12
* Interface to Hive client for metastore operations
13
* Shared across internal and external classloaders
14
*/
15
private[hive] trait HiveClient {
16
17
/** Returns the Hive Version of this client */
18
def version: HiveVersion
19
20
/** Returns configuration value for the given key */
21
def getConf(key: String, defaultValue: String): String
22
23
/** Returns the associated Hive SessionState */
24
def getState: Any
25
26
/** Execute HiveQL command and return results as strings */
27
def runSqlHive(sql: String): Seq[String]
28
29
/** Set output streams for Hive operations */
30
def setOut(stream: PrintStream): Unit
31
def setInfo(stream: PrintStream): Unit
32
def setError(stream: PrintStream): Unit
33
34
/** Add JAR to class loader */
35
def addJar(path: String): Unit
36
37
/** Create new client session sharing class loader and Hive client */
38
def newSession(): HiveClient
39
40
/** Run function within Hive state (SessionState, HiveConf, Hive client and class loader) */
41
def withHiveState[A](f: => A): A
42
43
/** Remove all metadata - for testing only */
44
def reset(): Unit
45
}
46
```
47
48
### Database Operations
49
50
Comprehensive database management operations.
51
52
```scala { .api }
53
/**
54
* Database management operations
55
*/
56
trait HiveClient {
57
/** List database names matching pattern */
58
def listDatabases(pattern: String): Seq[String]
59
60
/** Get database metadata - throws exception if not found */
61
def getDatabase(name: String): CatalogDatabase
62
63
/** Check if database exists */
64
def databaseExists(dbName: String): Boolean
65
66
/** Set current database */
67
def setCurrentDatabase(databaseName: String): Unit
68
69
/** Create new database */
70
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
71
72
/**
73
* Drop database
74
* @param name - Database name to drop
75
* @param ignoreIfNotExists - Don't throw error if database doesn't exist
76
* @param cascade - Remove all associated objects (tables, functions)
77
*/
78
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
79
80
/** Alter existing database */
81
def alterDatabase(database: CatalogDatabase): Unit
82
}
83
```
84
85
**Usage Examples:**
86
87
```scala
88
// Assuming you have access to HiveClient instance
89
val client: HiveClient = // obtained from HiveExternalCatalog or similar
90
91
// List all databases
92
val databases = client.listDatabases("*")
93
println(s"Available databases: ${databases.mkString(", ")}")
94
95
// Create new database
96
val newDb = CatalogDatabase(
97
name = "analytics_db",
98
description = "Analytics database",
99
locationUri = "hdfs://cluster/user/hive/warehouse/analytics_db.db",
100
properties = Map("owner" -> "analytics_team")
101
)
102
client.createDatabase(newDb, ignoreIfExists = true)
103
104
// Switch to database
105
client.setCurrentDatabase("analytics_db")
106
107
// Get database info
108
val dbInfo = client.getDatabase("analytics_db")
109
println(s"Database location: ${dbInfo.locationUri}")
110
```
111
112
### Table Operations
113
114
Complete table management functionality.
115
116
```scala { .api }
117
/**
118
* Table management operations
119
*/
120
trait HiveClient {
121
/** List all tables in database */
122
def listTables(dbName: String): Seq[String]
123
124
/** List tables matching pattern in database */
125
def listTables(dbName: String, pattern: String): Seq[String]
126
127
/** Check if table exists */
128
def tableExists(dbName: String, tableName: String): Boolean
129
130
/** Get table metadata - throws NoSuchTableException if not found */
131
def getTable(dbName: String, tableName: String): CatalogTable
132
133
/** Get table metadata - returns None if not found */
134
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
135
136
/** Create new table */
137
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
138
139
/** Drop table */
140
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
141
142
/** Alter existing table */
143
def alterTable(table: CatalogTable): Unit
144
def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
145
146
/** Update table schema and properties */
147
def alterTableDataSchema(
148
dbName: String,
149
tableName: String,
150
newDataSchema: StructType,
151
schemaProps: Map[String, String]
152
): Unit
153
154
/** Get partitions matching partial spec with optional filtering */
155
def getPartitionsByFilter(
156
catalogTable: CatalogTable,
157
predicates: Seq[Expression]
158
): Seq[CatalogTablePartition]
159
160
/** Add JAR to class loader for UDF/SerDe usage */
161
def addJar(path: String): Unit
162
163
/** Remove all metadata - used for testing only */
164
def reset(): Unit
165
}
166
```
167
168
**Usage Examples:**
169
170
```scala
171
import org.apache.spark.sql.catalyst.TableIdentifier
172
import org.apache.spark.sql.catalyst.catalog._
173
import org.apache.spark.sql.types._
174
175
// List tables
176
val tables = client.listTables("default")
177
println(s"Tables in default database: ${tables.mkString(", ")}")
178
179
// Create table
180
val tableSchema = StructType(Seq(
181
StructField("id", IntegerType, nullable = false),
182
StructField("name", StringType, nullable = true),
183
StructField("age", IntegerType, nullable = true)
184
))
185
186
val newTable = CatalogTable(
187
identifier = TableIdentifier("users", Some("default")),
188
tableType = CatalogTableType.MANAGED,
189
storage = CatalogStorageFormat.empty,
190
schema = tableSchema,
191
partitionColumnNames = Seq.empty,
192
properties = Map("comment" -> "User information table")
193
)
194
195
client.createTable(newTable, ignoreIfExists = true)
196
197
// Get table info
198
val tableInfo = client.getTable("default", "users")
199
println(s"Table schema: ${tableInfo.schema}")
200
println(s"Table type: ${tableInfo.tableType}")
201
202
// Check if table exists
203
if (client.tableExists("default", "users")) {
204
println("Table exists")
205
}
206
```
207
208
### Partition Operations
209
210
Comprehensive partition management for partitioned tables.
211
212
```scala { .api }
213
/**
214
* Partition management operations
215
*/
216
trait HiveClient {
217
/** Create one or more partitions */
218
def createPartitions(
219
db: String,
220
table: String,
221
parts: Seq[CatalogTablePartition],
222
ignoreIfExists: Boolean
223
): Unit
224
225
/** Drop one or more partitions */
226
def dropPartitions(
227
db: String,
228
table: String,
229
specs: Seq[TablePartitionSpec],
230
ignoreIfNotExists: Boolean,
231
purge: Boolean,
232
retainData: Boolean
233
): Unit
234
235
/** Rename existing partitions */
236
def renamePartitions(
237
db: String,
238
table: String,
239
specs: Seq[TablePartitionSpec],
240
newSpecs: Seq[TablePartitionSpec]
241
): Unit
242
243
/** Alter existing partitions */
244
def alterPartitions(
245
db: String,
246
table: String,
247
newParts: Seq[CatalogTablePartition]
248
): Unit
249
250
/** Get partition - throws NoSuchPartitionException if not found */
251
def getPartition(
252
dbName: String,
253
tableName: String,
254
spec: TablePartitionSpec
255
): CatalogTablePartition
256
257
/** Get partition - returns None if not found */
258
def getPartitionOption(
259
table: CatalogTable,
260
spec: TablePartitionSpec
261
): Option[CatalogTablePartition]
262
263
/** Get partitions matching partial spec */
264
def getPartitions(
265
catalogTable: CatalogTable,
266
partialSpec: Option[TablePartitionSpec] = None
267
): Seq[CatalogTablePartition]
268
269
/** Get partition names matching partial spec */
270
def getPartitionNames(
271
table: CatalogTable,
272
partialSpec: Option[TablePartitionSpec] = None
273
): Seq[String]
274
275
/** Get partitions filtered by predicates */
276
def getPartitionsByFilter(
277
catalogTable: CatalogTable,
278
predicates: Seq[Expression]
279
): Seq[CatalogTablePartition]
280
}
281
```
282
283
**Usage Examples:**
284
285
```scala
286
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
287
288
// Get table for partition operations
289
val table = client.getTable("sales_db", "monthly_sales")
290
291
// List all partitions
292
val allPartitions = client.getPartitions(table)
293
println(s"Total partitions: ${allPartitions.length}")
294
295
// Get specific partition
296
val partSpec: TablePartitionSpec = Map("year" -> "2023", "month" -> "12")
297
val partition = client.getPartitionOption(table, partSpec)
298
partition match {
299
case Some(part) => println(s"Partition location: ${part.storage.locationUri}")
300
case None => println("Partition not found")
301
}
302
303
// Create new partition
304
val newPartition = CatalogTablePartition(
305
spec = Map("year" -> "2024", "month" -> "01"),
306
storage = CatalogStorageFormat(
307
locationUri = Some("hdfs://cluster/user/hive/warehouse/sales_db.db/monthly_sales/year=2024/month=01"),
308
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
309
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
310
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
311
)
312
)
313
314
client.createPartitions("sales_db", "monthly_sales", Seq(newPartition), ignoreIfExists = true)
315
```
316
317
### Data Loading Operations
318
319
Operations for loading data into tables and partitions.
320
321
```scala { .api }
322
/**
323
* Data loading operations
324
*/
325
trait HiveClient {
326
/** Load data into static partition */
327
def loadPartition(
328
loadPath: String,
329
dbName: String,
330
tableName: String,
331
partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
332
replace: Boolean,
333
inheritTableSpecs: Boolean,
334
isSrcLocal: Boolean
335
): Unit
336
337
/** Load data into existing table */
338
def loadTable(
339
loadPath: String,
340
tableName: String,
341
replace: Boolean,
342
isSrcLocal: Boolean
343
): Unit
344
345
/** Load data creating dynamic partitions */
346
def loadDynamicPartitions(
347
loadPath: String,
348
dbName: String,
349
tableName: String,
350
partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering
351
replace: Boolean,
352
numDP: Int
353
): Unit
354
}
355
```
356
357
### Function Operations
358
359
Management of user-defined functions in Hive metastore.
360
361
```scala { .api }
362
/**
363
* Function management operations
364
*/
365
trait HiveClient {
366
/** Create function in database */
367
def createFunction(db: String, func: CatalogFunction): Unit
368
369
/** Drop existing function */
370
def dropFunction(db: String, name: String): Unit
371
372
/** Rename existing function */
373
def renameFunction(db: String, oldName: String, newName: String): Unit
374
375
/** Alter existing function */
376
def alterFunction(db: String, func: CatalogFunction): Unit
377
378
/** Get function - throws NoSuchPermanentFunctionException if not found */
379
def getFunction(db: String, name: String): CatalogFunction
380
381
/** Get function - returns None if not found */
382
def getFunctionOption(db: String, name: String): Option[CatalogFunction]
383
384
/** Check if function exists */
385
def functionExists(db: String, name: String): Boolean
386
387
/** List functions matching pattern */
388
def listFunctions(db: String, pattern: String): Seq[String]
389
}
390
```
391
392
**Usage Examples:**
393
394
```scala
395
import org.apache.spark.sql.catalyst.FunctionIdentifier
396
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
397
398
// List functions
399
val functions = client.listFunctions("default", "*")
400
println(s"Available functions: ${functions.mkString(", ")}")
401
402
// Create custom function
403
val customFunction = CatalogFunction(
404
identifier = FunctionIdentifier("my_upper", Some("default")),
405
className = "com.example.MyUpperUDF",
406
resources = Seq(FunctionResource(
407
resourceType = org.apache.spark.sql.catalyst.catalog.FunctionResourceType.jar,
408
uri = "hdfs://cluster/jars/my-udfs.jar"
409
))
410
)
411
412
client.createFunction("default", customFunction)
413
414
// Check if function exists
415
if (client.functionExists("default", "my_upper")) {
416
println("Custom function created successfully")
417
}
418
```
419
420
## Types
421
422
### Hive Version Support
423
424
```scala { .api }
425
/**
426
* Abstract class representing Hive version with dependencies
427
*/
428
abstract class HiveVersion(
429
val fullVersion: String,
430
val extraDeps: Seq[String] = Nil,
431
val exclusions: Seq[String] = Nil
432
)
433
434
// Supported Hive versions
435
val allSupportedHiveVersions: Set[HiveVersion] = Set(
436
v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3
437
)
438
```