0
# Hive Client Interface
1
2
Core interface for direct interaction with Hive metastore, providing low-level access to databases, tables, partitions, and functions.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.hive.client.HiveClient
8
import org.apache.spark.sql.hive.client.HiveClientImpl
9
import org.apache.spark.sql.hive.client.HiveVersion
10
import org.apache.spark.sql.catalyst.catalog._
11
```
12
13
## Capabilities
14
15
### HiveClient Interface
16
17
Core abstraction for Hive metastore client operations with version compatibility.
18
19
```scala { .api }
20
/**
21
* Interface for communicating with Hive metastore
22
*/
23
private[hive] trait HiveClient {
24
/** Get Hive version information */
25
def version: HiveVersion
26
27
/**
28
* Get configuration property from Hive
29
* @param key Configuration key
30
* @param defaultValue Default value if key not found
31
* @returns Configuration value
32
*/
33
def getConf(key: String, defaultValue: String): String
34
35
/**
36
* Execute raw HiveQL statement
37
* @param sql HiveQL statement to execute
38
* @returns Sequence of result strings
39
*/
40
def runSqlHive(sql: String): Seq[String]
41
}
42
```
43
44
### Database Operations
45
46
Complete database management operations through Hive metastore.
47
48
```scala { .api }
49
trait HiveClient {
50
/**
51
* Get database metadata
52
* @param name Database name
53
* @returns CatalogDatabase with complete metadata
54
*/
55
def getDatabase(name: String): CatalogDatabase
56
57
/**
58
* List all databases matching pattern
59
* @param pattern SQL LIKE pattern for database names
60
* @returns Sequence of database names
61
*/
62
def listDatabases(pattern: String): Seq[String]
63
64
/**
65
* Create new database in metastore
66
* @param database Database definition to create
67
* @param ignoreIfExists Skip creation if database exists
68
*/
69
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
70
71
/**
72
* Drop database from metastore
73
* @param name Database name to drop
74
* @param ignoreIfNotExists Skip error if database doesn't exist
75
* @param cascade Drop all tables in database first
76
*/
77
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
78
79
/**
80
* Alter database properties
81
* @param name Database name
82
* @param database Updated database definition
83
*/
84
def alterDatabase(name: String, database: CatalogDatabase): Unit
85
}
86
```
87
88
### Table Operations
89
90
Complete table management operations with schema and metadata support.
91
92
```scala { .api }
93
trait HiveClient {
94
/**
95
* Get table metadata with complete schema information
96
* @param dbName Database name
97
* @param tableName Table name
98
* @returns CatalogTable with full metadata
99
*/
100
def getTable(dbName: String, tableName: String): CatalogTable
101
102
/**
103
* List tables in database matching pattern
104
* @param dbName Database name
105
* @param pattern SQL LIKE pattern for table names
106
* @returns Sequence of table names
107
*/
108
def listTables(dbName: String, pattern: String): Seq[String]
109
110
/**
111
* Create new table in metastore
112
* @param table Table definition to create
113
* @param ignoreIfExists Skip creation if table exists
114
*/
115
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
116
117
/**
118
* Drop table from metastore
119
* @param dbName Database name
120
* @param tableName Table name to drop
121
* @param ignoreIfNotExists Skip error if table doesn't exist
122
* @param purge Delete data files immediately
123
*/
124
def dropTable(
125
dbName: String,
126
tableName: String,
127
ignoreIfNotExists: Boolean,
128
purge: Boolean
129
): Unit
130
131
/**
132
* Alter table schema and properties
133
* @param dbName Database name
134
* @param tableName Table name
135
* @param table Updated table definition
136
*/
137
def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit
138
139
/**
140
* Rename table
141
* @param dbName Database name
142
* @param oldName Current table name
143
* @param newName New table name
144
*/
145
def renameTable(dbName: String, oldName: String, newName: String): Unit
146
}
147
```
148
149
### Partition Operations
150
151
Operations for managing table partitions with dynamic partition support.
152
153
```scala { .api }
154
trait HiveClient {
155
/**
156
* Get partitions for partitioned table
157
* @param table Table to get partitions for
158
* @param spec Optional partition specification to filter
159
* @returns Sequence of matching table partitions
160
*/
161
def getPartitions(
162
table: CatalogTable,
163
spec: Option[TablePartitionSpec]
164
): Seq[CatalogTablePartition]
165
166
/**
167
* Get partition names matching specification
168
* @param table Table to get partition names for
169
* @param spec Optional partition specification to filter
170
* @returns Sequence of partition names
171
*/
172
def getPartitionNames(
173
table: CatalogTable,
174
spec: Option[TablePartitionSpec]
175
): Seq[String]
176
177
/**
178
* Create partitions in partitioned table
179
* @param table Table to create partitions in
180
* @param parts Partition definitions to create
181
* @param ignoreIfExists Skip creation if partitions exist
182
*/
183
def createPartitions(
184
table: CatalogTable,
185
parts: Seq[CatalogTablePartition],
186
ignoreIfExists: Boolean
187
): Unit
188
189
/**
190
* Drop partitions from partitioned table
191
* @param db Database name
192
* @param table Table name
193
* @param specs Partition specifications to drop
194
* @param ignoreIfNotExists Skip error if partitions don't exist
195
* @param purge Delete partition data immediately
196
* @param retainData Keep partition data files
197
*/
198
def dropPartitions(
199
db: String,
200
table: String,
201
specs: Seq[TablePartitionSpec],
202
ignoreIfNotExists: Boolean,
203
purge: Boolean,
204
retainData: Boolean
205
): Unit
206
207
/**
208
* Alter partition properties
209
* @param db Database name
210
* @param table Table name
211
* @param spec Partition specification
212
* @param partition Updated partition definition
213
*/
214
def alterPartitions(
215
db: String,
216
table: String,
217
spec: TablePartitionSpec,
218
partition: CatalogTablePartition
219
): Unit
220
}
221
```
222
223
### Function Operations
224
225
Operations for managing user-defined functions in Hive.
226
227
```scala { .api }
228
trait HiveClient {
229
/**
230
* Create user-defined function
231
* @param db Database name
232
* @param func Function definition to create
233
*/
234
def createFunction(db: String, func: CatalogFunction): Unit
235
236
/**
237
* Drop user-defined function
238
* @param db Database name
239
* @param name Function name to drop
240
*/
241
def dropFunction(db: String, name: String): Unit
242
243
/**
244
* List functions in database matching pattern
245
* @param db Database name
246
* @param pattern SQL LIKE pattern for function names
247
* @returns Sequence of function names
248
*/
249
def listFunctions(db: String, pattern: String): Seq[String]
250
251
/**
252
* Get function metadata
253
* @param db Database name
254
* @param name Function name
255
* @returns CatalogFunction with complete metadata
256
*/
257
def getFunction(db: String, name: String): CatalogFunction
258
259
/**
260
* Check if function exists
261
* @param db Database name
262
* @param name Function name
263
* @returns true if function exists
264
*/
265
def functionExists(db: String, name: String): Boolean
266
}
267
```
268
269
## Implementation Classes
270
271
### HiveClientImpl
272
273
Concrete implementation of HiveClient interface.
274
275
```scala { .api }
276
/**
277
* Implementation of HiveClient using Hive metastore APIs
278
*/
279
private[hive] class HiveClientImpl(
280
version: HiveVersion,
281
sparkConf: SparkConf,
282
hadoopConf: Configuration,
283
extraClassPath: Seq[URL],
284
classLoader: ClassLoader,
285
config: Map[String, String]
286
) extends HiveClient {
287
288
/** Initialize connection to Hive metastore */
289
def initialize(): Unit
290
291
/** Close connection to Hive metastore */
292
def close(): Unit
293
294
/** Reset connection state */
295
def reset(): Unit
296
297
/** Get underlying Hive metastore client */
298
def client: IMetaStoreClient
299
}
300
```
301
302
### Version Management
303
304
```scala { .api }
305
/**
306
* Hive version information and compatibility
307
*/
308
case class HiveVersion(
309
fullVersion: String,
310
majorVersion: Int,
311
minorVersion: Int
312
) {
313
/**
314
* Check if this version supports a specific feature
315
* @param feature Feature name to check
316
* @returns true if feature is supported
317
*/
318
def supportsFeature(feature: String): Boolean
319
320
/** Get version as comparable string */
321
def versionString: String = s"$majorVersion.$minorVersion"
322
}
323
```
324
325
## Usage Examples
326
327
### Basic Database Operations
328
329
```scala
330
import org.apache.spark.sql.SparkSession
331
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
332
333
val spark = SparkSession.builder()
334
.enableHiveSupport()
335
.getOrCreate()
336
337
// Access HiveClient through internal APIs (advanced usage)
338
val hiveClient = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
339
.metastoreCatalog.client
340
341
// List all databases
342
val databases = hiveClient.listDatabases("*")
343
println(s"Databases: ${databases.mkString(", ")}")
344
345
// Create database
346
val newDb = CatalogDatabase(
347
name = "analytics_db",
348
description = "Analytics database",
349
locationUri = "/user/hive/warehouse/analytics_db.db",
350
properties = Map("created_by" -> "spark_user")
351
)
352
hiveClient.createDatabase(newDb, ignoreIfExists = true)
353
354
// Get database metadata
355
val dbMetadata = hiveClient.getDatabase("analytics_db")
356
println(s"Database location: ${dbMetadata.locationUri}")
357
```
358
359
### Table Management
360
361
```scala
362
// List tables in database
363
val tables = hiveClient.listTables("analytics_db", "*")
364
println(s"Tables: ${tables.mkString(", ")}")
365
366
// Get table metadata
367
try {
368
val tableMetadata = hiveClient.getTable("analytics_db", "user_activity")
369
println(s"Table type: ${tableMetadata.tableType}")
370
println(s"Schema: ${tableMetadata.schema}")
371
println(s"Partitions: ${tableMetadata.partitionColumnNames}")
372
} catch {
373
case _: NoSuchTableException =>
374
println("Table does not exist")
375
}
376
377
// Check table statistics
378
val table = hiveClient.getTable("analytics_db", "large_table")
379
println(s"Table storage: ${table.storage}")
380
```
381
382
### Partition Management
383
384
```scala
385
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
386
387
// Get partitions for partitioned table
388
val partitionedTable = hiveClient.getTable("analytics_db", "daily_events")
389
val partitions = hiveClient.getPartitions(partitionedTable, None)
390
391
println(s"Found ${partitions.length} partitions")
392
partitions.foreach { partition =>
393
println(s"Partition: ${partition.spec}, Location: ${partition.storage.locationUri}")
394
}
395
396
// Get specific partition
397
val todayPartition = Map("year" -> "2023", "month" -> "12", "day" -> "15")
398
val specificPartitions = hiveClient.getPartitions(
399
partitionedTable,
400
Some(todayPartition)
401
)
402
403
// Create new partition
404
val newPartition = CatalogTablePartition(
405
spec = Map("year" -> "2023", "month" -> "12", "day" -> "16"),
406
storage = partitionedTable.storage.copy(
407
locationUri = Some("/user/hive/warehouse/daily_events/year=2023/month=12/day=16")
408
),
409
parameters = Map("created_by" -> "spark_job")
410
)
411
412
hiveClient.createPartitions(
413
partitionedTable,
414
Seq(newPartition),
415
ignoreIfExists = true
416
)
417
```
418
419
## Error Handling
420
421
Common exceptions when working with HiveClient:
422
423
- **NoSuchDatabaseException**: When database doesn't exist
424
- **NoSuchTableException**: When table doesn't exist
425
- **AlreadyExistsException**: When creating existing objects
426
- **MetaException**: General metastore errors
427
428
```scala
429
import org.apache.hadoop.hive.metastore.api._
430
431
try {
432
val table = hiveClient.getTable("nonexistent_db", "some_table")
433
} catch {
434
case _: NoSuchDatabaseException =>
435
println("Database does not exist")
436
case _: NoSuchTableException =>
437
println("Table does not exist")
438
case e: MetaException =>
439
println(s"Metastore error: ${e.getMessage}")
440
}
441
```
442
443
## Types
444
445
### Client Configuration Types
446
447
```scala { .api }
448
type TablePartitionSpec = Map[String, String]
449
450
case class HiveClientConfig(
451
version: HiveVersion,
452
extraClassPath: Seq[URL],
453
config: Map[String, String]
454
)
455
```