0
# Hive Client Interface
1
2
Low-level interface to Hive metastore client with version abstraction, supporting direct SQL execution and raw Hive operations. The `HiveClient` trait provides a unified interface across different Hive versions (2.0.x through 4.0.x) while `HiveClientImpl` provides the concrete implementation.
3
4
## Capabilities
5
6
### HiveClient Interface
7
8
Core trait defining the interface to Hive metastore operations.
9
10
```scala { .api }
11
/**
12
* External interface to Hive client for all metastore operations
13
* Provides version abstraction and unified API across Hive versions
14
*/
15
trait HiveClient {
16
def version: HiveVersion
17
def getConf(key: String, defaultValue: String): String
18
def getState: Any
19
def userName: String
20
}
21
```
22
23
### SQL Execution
24
25
Execute Hive SQL directly through the client.
26
27
```scala { .api }
28
/**
29
* Execute Hive SQL and return results
30
* @param sql SQL statement to execute
31
* @return Sequence of result strings
32
*/
33
def runSqlHive(sql: String): Seq[String]
34
```
35
36
**Usage Example:**
37
38
```scala
39
val client: HiveClient = // obtain client instance
40
val results = client.runSqlHive("SHOW TABLES")
41
results.foreach(println)
42
43
// Execute DDL
44
client.runSqlHive("CREATE TABLE test (id INT, name STRING)")
45
```
46
47
### I/O Stream Management
48
49
Control output streams for Hive operations.
50
51
```scala { .api }
52
/**
53
* Set output stream for Hive operations
54
* @param stream PrintStream for output
55
*/
56
def setOut(stream: PrintStream): Unit
57
58
/**
59
* Set info stream for Hive operations
60
* @param stream PrintStream for info messages
61
*/
62
def setInfo(stream: PrintStream): Unit
63
64
/**
65
* Set error stream for Hive operations
66
* @param stream PrintStream for error messages
67
*/
68
def setError(stream: PrintStream): Unit
69
```
70
71
### Database Operations
72
73
Direct database management through Hive client.
74
75
```scala { .api }
76
/**
77
* List all tables in a database
78
* @param dbName Database name
79
* @return Sequence of table names
80
*/
81
def listTables(dbName: String): Seq[String]
82
83
/**
84
* List tables matching a pattern
85
* @param dbName Database name
86
* @param pattern Pattern to match (glob pattern)
87
* @return Sequence of matching table names
88
*/
89
def listTables(dbName: String, pattern: String): Seq[String]
90
91
/**
92
* Set the current database
93
* @param databaseName Database name to set as current
94
*/
95
def setCurrentDatabase(databaseName: String): Unit
96
97
/**
98
* Get database metadata
99
* @param name Database name
100
* @return Database definition
101
*/
102
def getDatabase(name: String): CatalogDatabase
103
104
/**
105
* Check if a database exists
106
* @param dbName Database name
107
* @return True if database exists
108
*/
109
def databaseExists(dbName: String): Boolean
110
111
/**
112
* List databases matching a pattern
113
* @param pattern Pattern to match (glob pattern)
114
* @return Sequence of matching database names
115
*/
116
def listDatabases(pattern: String): Seq[String]
117
118
/**
119
* Create a new database
120
* @param database Database definition
121
* @param ignoreIfExists If true, don't fail if database already exists
122
*/
123
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
124
125
/**
126
* Drop a database
127
* @param name Database name
128
* @param ignoreIfNotExists If true, don't fail if database doesn't exist
129
* @param cascade If true, drop all tables in the database
130
*/
131
def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
132
133
/**
134
* Modify database metadata
135
* @param database Updated database definition
136
*/
137
def alterDatabase(database: CatalogDatabase): Unit
138
```
139
140
**Usage Examples:**
141
142
```scala
143
// List databases
144
val databases = client.listDatabases("*")
145
println(s"Available databases: ${databases.mkString(", ")}")
146
147
// Set current database
148
client.setCurrentDatabase("my_database")
149
150
// Create database
151
val dbDef = CatalogDatabase(
152
name = "test_db",
153
description = "Test database",
154
locationUri = new URI("hdfs://namenode:9000/user/hive/warehouse/test_db.db"),
155
properties = Map.empty
156
)
157
client.createDatabase(dbDef, ignoreIfExists = true)
158
```
159
160
### Table Operations
161
162
Comprehensive table management including raw Hive table access.
163
164
```scala { .api }
165
/**
166
* Check if a table exists
167
* @param dbName Database name
168
* @param tableName Table name
169
* @return True if table exists
170
*/
171
def tableExists(dbName: String, tableName: String): Boolean
172
173
/**
174
* Get table metadata
175
* @param dbName Database name
176
* @param tableName Table name
177
* @return Table definition
178
*/
179
def getTable(dbName: String, tableName: String): CatalogTable
180
181
/**
182
* Get table metadata optionally
183
* @param dbName Database name
184
* @param tableName Table name
185
* @return Optional table definition
186
*/
187
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
188
189
/**
190
* Get raw Hive table object
191
* @param dbName Database name
192
* @param tableName Table name
193
* @return Raw Hive table wrapper
194
*/
195
def getRawHiveTable(dbName: String, tableName: String): RawHiveTable
196
197
/**
198
* Get raw Hive table object optionally
199
* @param dbName Database name
200
* @param tableName Table name
201
* @return Optional raw Hive table wrapper
202
*/
203
def getRawHiveTableOption(dbName: String, tableName: String): Option[RawHiveTable]
204
205
/**
206
* Get multiple tables by name
207
* @param dbName Database name
208
* @param tableNames Sequence of table names
209
* @return Sequence of table definitions
210
*/
211
def getTablesByName(dbName: String, tableNames: Seq[String]): Seq[CatalogTable]
212
213
/**
214
* Create a new table
215
* @param table Table definition
216
* @param ignoreIfExists If true, don't fail if table already exists
217
*/
218
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
219
220
/**
221
* Drop a table
222
* @param dbName Database name
223
* @param tableName Table name
224
* @param ignoreIfNotExists If true, don't fail if table doesn't exist
225
* @param purge If true, delete table data immediately
226
*/
227
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
228
229
/**
230
* Modify table metadata
231
* @param table Updated table definition
232
*/
233
def alterTable(table: CatalogTable): Unit
234
235
/**
236
* Alter table properties
237
* @param dbName Database name
238
* @param tableName Table name
239
* @param props Properties to set
240
*/
241
def alterTableProps(dbName: String, tableName: String, props: Map[String, String]): Unit
242
243
/**
244
* Alter table data schema
245
* @param dbName Database name
246
* @param tableName Table name
247
* @param newDataSchema New schema structure
248
*/
249
def alterTableDataSchema(dbName: String, tableName: String, newDataSchema: StructType): Unit
250
```
251
252
**Usage Examples:**
253
254
```scala
255
// Check if table exists
256
if (client.tableExists("my_db", "my_table")) {
257
println("Table exists")
258
}
259
260
// Get table metadata
261
val table = client.getTable("my_db", "my_table")
262
println(s"Table schema: ${table.schema}")
263
264
// Get raw Hive table for advanced operations
265
val rawTable = client.getRawHiveTable("my_db", "my_table")
266
val hiveProps = rawTable.hiveTableProps()
267
println(s"Hive table properties: $hiveProps")
268
```
269
270
### Partition Operations
271
272
Manage table partitions through the Hive client.
273
274
```scala { .api }
275
/**
276
* Create table partitions
277
* @param db Database name
278
* @param table Table name
279
* @param parts Sequence of partition definitions
280
* @param ignoreIfExists If true, don't fail if partitions already exist
281
*/
282
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition],
283
ignoreIfExists: Boolean): Unit
284
285
/**
286
* Drop table partitions
287
* @param db Database name
288
* @param table Table name
289
* @param specs Sequence of partition specifications
290
* @param ignoreIfNotExists If true, don't fail if partitions don't exist
291
* @param purge If true, delete partition data immediately
292
* @param retainData If true, keep partition data files
293
*/
294
def dropPartitions(db: String, table: String, specs: Seq[TablePartitionSpec],
295
ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit
296
297
/**
298
* Rename table partitions
299
* @param db Database name
300
* @param table Table name
301
* @param specs Current partition specifications
302
* @param newSpecs New partition specifications
303
*/
304
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec],
305
newSpecs: Seq[TablePartitionSpec]): Unit
306
307
/**
308
* Modify partition metadata
309
* @param db Database name
310
* @param table Table name
311
* @param newParts Updated partition definitions
312
*/
313
def alterPartitions(db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit
314
315
/**
316
* Get partition metadata
317
* @param db Database name
318
* @param table Table name
319
* @param spec Partition specification
320
* @return Partition definition
321
*/
322
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
323
324
/**
325
* Get partition metadata optionally
326
* @param db Database name
327
* @param table Table name
328
* @param spec Partition specification
329
* @return Optional partition definition
330
*/
331
def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition]
332
333
/**
334
* Get partition names
335
* @param db Database name
336
* @param table Table name
337
* @param partialSpec Optional partial partition specification for filtering
338
* @return Sequence of partition names
339
*/
340
def getPartitionNames(db: String, table: String,
341
partialSpec: Option[TablePartitionSpec]): Seq[String]
342
343
/**
344
* Get partitions
345
* @param db Database name
346
* @param table Table name
347
* @param partialSpec Optional partial partition specification for filtering
348
* @return Sequence of partition definitions
349
*/
350
def getPartitions(db: String, table: String,
351
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]
352
353
/**
354
* Get partitions matching filter predicates
355
* @param db Database name
356
* @param table Table name
357
* @param predicates Filter expressions
358
* @return Sequence of matching partition definitions
359
*/
360
def getPartitionsByFilter(db: String, table: String,
361
predicates: Seq[Expression]): Seq[CatalogTablePartition]
362
```
363
364
### Data Loading Operations
365
366
Load data into tables and partitions through Hive client.
367
368
```scala { .api }
369
/**
370
* Load data into a partition
371
* @param db Database name
372
* @param table Table name
373
* @param loadPath Path to data files
374
* @param partition Partition specification
375
* @param isOverwrite If true, overwrite existing data
376
* @param inheritTableSpecs If true, inherit table specifications
377
* @param isSrcLocal If true, source is local filesystem
378
*/
379
def loadPartition(db: String, table: String, loadPath: String,
380
partition: TablePartitionSpec, isOverwrite: Boolean,
381
inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit
382
383
/**
384
* Load data into a table
385
* @param db Database name
386
* @param table Table name
387
* @param loadPath Path to data files
388
* @param isOverwrite If true, overwrite existing data
389
* @param isSrcLocal If true, source is local filesystem
390
*/
391
def loadTable(db: String, table: String, loadPath: String,
392
isOverwrite: Boolean, isSrcLocal: Boolean): Unit
393
394
/**
395
* Load data into dynamic partitions
396
* @param db Database name
397
* @param table Table name
398
* @param loadPath Path to data files
399
* @param partition Partition specification
400
* @param replace If true, replace existing partitions
401
* @param numDP Number of dynamic partitions
402
*/
403
def loadDynamicPartitions(db: String, table: String, loadPath: String,
404
partition: TablePartitionSpec, replace: Boolean,
405
numDP: Int): Unit
406
```
407
408
### Function Operations
409
410
Manage user-defined functions through the Hive client.
411
412
```scala { .api }
413
/**
414
* Create a user-defined function
415
* @param db Database name
416
* @param func Function definition
417
*/
418
def createFunction(db: String, func: CatalogFunction): Unit
419
420
/**
421
* Drop a user-defined function
422
* @param db Database name
423
* @param name Function name
424
*/
425
def dropFunction(db: String, name: String): Unit
426
427
/**
428
* Rename a function
429
* @param db Database name
430
* @param oldName Current function name
431
* @param newName New function name
432
*/
433
def renameFunction(db: String, oldName: String, newName: String): Unit
434
435
/**
436
* Modify function metadata
437
* @param db Database name
438
* @param func Updated function definition
439
*/
440
def alterFunction(db: String, func: CatalogFunction): Unit
441
442
/**
443
* Get function metadata
444
* @param db Database name
445
* @param funcName Function name
446
* @return Function definition
447
*/
448
def getFunction(db: String, funcName: String): CatalogFunction
449
450
/**
451
* Get function metadata optionally
452
* @param db Database name
453
* @param funcName Function name
454
* @return Optional function definition
455
*/
456
def getFunctionOption(db: String, funcName: String): Option[CatalogFunction]
457
458
/**
459
* Check if a function exists
460
* @param db Database name
461
* @param funcName Function name
462
* @return True if function exists
463
*/
464
def functionExists(db: String, funcName: String): Boolean
465
466
/**
467
* List functions matching a pattern
468
* @param db Database name
469
* @param pattern Pattern to match (glob pattern)
470
* @return Sequence of matching function names
471
*/
472
def listFunctions(db: String, pattern: String): Seq[String]
473
```
474
475
### Session Management
476
477
Manage client sessions and resources.
478
479
```scala { .api }
480
/**
481
* Add JAR file to the Hive session
482
* @param path Path to JAR file
483
*/
484
def addJar(path: String): Unit
485
486
/**
487
* Create a new client session
488
* @return New HiveClient instance
489
*/
490
def newSession(): HiveClient
491
492
/**
493
* Execute code with Hive state context
494
* @param f Function to execute
495
* @return Result of function execution
496
*/
497
def withHiveState[A](f: => A): A
498
499
/**
500
* Reset client state
501
*/
502
def reset(): Unit
503
```
504
505
### Raw Hive Table Interface
506
507
Interface for accessing raw Hive table objects.
508
509
```scala { .api }
510
/**
511
* Raw Hive table abstraction providing access to underlying Hive objects
512
*/
513
trait RawHiveTable {
514
/**
515
* Get the raw Hive table object
516
* @return Raw Hive table
517
*/
518
def rawTable: Object
519
520
/**
521
* Convert to Spark catalog table
522
* @return Spark CatalogTable representation
523
*/
524
def toCatalogTable: CatalogTable
525
526
/**
527
* Get Hive-specific table properties
528
* @return Map of Hive table properties
529
*/
530
def hiveTableProps(): Map[String, String]
531
}
532
```
533
534
**Usage Example:**
535
536
```scala
537
val rawTable = client.getRawHiveTable("my_db", "my_table")
538
539
// Access raw Hive object for advanced operations
540
val hiveTable = rawTable.rawTable
541
// Cast to appropriate Hive type for specific operations
542
543
// Convert to Spark representation
544
val catalogTable = rawTable.toCatalogTable
545
println(s"Table type: ${catalogTable.tableType}")
546
547
// Get Hive-specific properties
548
val hiveProps = rawTable.hiveTableProps()
549
hiveProps.foreach { case (key, value) =>
550
println(s"$key = $value")
551
}
552
```
553
554
### Version Management
555
556
Hive version abstraction and supported versions.
557
558
```scala { .api }
559
/**
560
* Represents a Hive version with dependencies and exclusions
561
* @param fullVersion Full version string (e.g., "2.3.10")
562
* @param extraDeps Extra dependencies required for this version
563
* @param exclusions Dependencies to exclude for this version
564
*/
565
abstract class HiveVersion(
566
val fullVersion: String,
567
val extraDeps: Seq[String] = Nil,
568
val exclusions: Seq[String] = Nil
569
) extends Ordered[HiveVersion] {
570
def compare(that: HiveVersion): Int
571
}
572
573
object hive {
574
case object v2_0 extends HiveVersion("2.0.1")
575
case object v2_1 extends HiveVersion("2.1.1")
576
case object v2_2 extends HiveVersion("2.2.0")
577
case object v2_3 extends HiveVersion("2.3.10")
578
case object v3_0 extends HiveVersion("3.0.0")
579
case object v3_1 extends HiveVersion("3.1.3")
580
case object v4_0 extends HiveVersion("4.0.1")
581
582
val allSupportedHiveVersions: Set[HiveVersion]
583
}
584
```
585
586
**Usage Example:**
587
588
```scala
589
import org.apache.spark.sql.hive.client.hive
590
591
// Check client version
592
val clientVersion = client.version
593
println(s"Using Hive version: ${clientVersion.fullVersion}")
594
595
// Compare versions
596
if (clientVersion >= hive.v3_0) {
597
println("Using Hive 3.0 or later")
598
}
599
600
// List all supported versions
601
hive.allSupportedHiveVersions.foreach { version =>
602
println(s"Supported: ${version.fullVersion}")
603
}
604
```
605
606
### Client Implementation
607
608
The concrete implementation provided by Spark.
609
610
```scala { .api }
611
/**
612
* Concrete implementation of HiveClient
613
* Created through HiveUtils factory methods
614
*/
615
class HiveClientImpl extends HiveClient {
616
// Implementation details are internal
617
}
618
```
619
620
**Usage Example:**
621
622
```scala
623
import org.apache.spark.sql.hive.HiveUtils
624
625
// Create client for execution context
626
val executionClient = HiveUtils.newClientForExecution(sparkConf, hadoopConf)
627
628
// Create client for metadata operations
629
val metadataClient = HiveUtils.newClientForMetadata(
630
sparkConf,
631
hadoopConf,
632
Map("hive.metastore.uris" -> "thrift://localhost:9083")
633
)
634
635
// Use client for operations
636
val tables = metadataClient.listTables("default")
637
println(s"Tables in default database: ${tables.mkString(", ")}")
638
```