0
# Metastore Integration
1
2
Integration with Hive metastore for table metadata, database operations, and catalog management.
3
4
## Core Imports
5
6
```scala
7
import org.apache.spark.sql.hive.HiveExternalCatalog
8
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl, HiveVersion}
9
import org.apache.spark.sql.hive.HiveMetastoreCatalog
10
import org.apache.spark.sql.catalyst.catalog._
11
import org.apache.spark.sql.execution.datasources.LogicalRelation
12
```
13
14
## Capabilities
15
16
### External Catalog Operations
17
18
Access to Hive metastore through the external catalog interface.
19
20
```scala { .api }
21
class HiveExternalCatalog(
22
conf: SparkConf,
23
hadoopConf: Configuration
24
) extends ExternalCatalog {
25
26
/** Create database in Hive metastore */
27
override def createDatabase(
28
dbDefinition: CatalogDatabase,
29
ignoreIfExists: Boolean
30
): Unit
31
32
/** Drop database from Hive metastore */
33
override def dropDatabase(
34
db: String,
35
ignoreIfNotExists: Boolean,
36
cascade: Boolean
37
): Unit
38
39
/** Create table in Hive metastore */
40
override def createTable(
41
tableDefinition: CatalogTable,
42
ignoreIfExists: Boolean
43
): Unit
44
45
/** Drop table from Hive metastore */
46
override def dropTable(
47
db: String,
48
table: String,
49
ignoreIfNotExists: Boolean,
50
purge: Boolean
51
): Unit
52
53
/** Get table metadata from Hive metastore */
54
override def getTable(db: String, table: String): CatalogTable
55
56
/** List all tables in database */
57
override def listTables(db: String, pattern: Option[String]): Seq[String]
58
}
59
```
60
61
### Hive Client Interface
62
63
Low-level interface for Hive metastore client operations.
64
65
```scala { .api }
66
private[hive] trait HiveClient {
67
/** Get Hive version information */
68
def version: HiveVersion
69
70
/** Get database metadata */
71
def getDatabase(name: String): CatalogDatabase
72
73
/** List all databases */
74
def listDatabases(pattern: String): Seq[String]
75
76
/** Get table metadata with detailed schema information */
77
def getTable(dbName: String, tableName: String): CatalogTable
78
79
/** List tables in database matching pattern */
80
def listTables(dbName: String, pattern: String): Seq[String]
81
82
/** Create new table in metastore */
83
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
84
85
/** Drop table from metastore */
86
def dropTable(
87
dbName: String,
88
tableName: String,
89
ignoreIfNotExists: Boolean,
90
purge: Boolean
91
): Unit
92
93
/** Get partitions for partitioned table */
94
def getPartitions(
95
table: CatalogTable,
96
spec: Option[TablePartitionSpec]
97
): Seq[CatalogTablePartition]
98
99
/** Create partition in partitioned table */
100
def createPartitions(
101
table: CatalogTable,
102
parts: Seq[CatalogTablePartition],
103
ignoreIfExists: Boolean
104
): Unit
105
}
106
```
107
108
### Metastore Catalog Conversion
109
110
Converts Hive table relations for optimized Spark processing.
111
112
```scala { .api }
113
class HiveMetastoreCatalog(sparkSession: SparkSession) {
114
/**
115
* Convert Hive table relation to optimized Spark relation
116
* @param relation Hive table relation to convert
117
* @param isWrite Whether this conversion is for write operations
118
* @returns Converted logical relation
119
*/
120
def convert(relation: HiveTableRelation, isWrite: Boolean): LogicalRelation
121
122
/**
123
* Convert storage format for data source operations
124
* @param storage Catalog storage format to convert
125
* @returns Converted storage format
126
*/
127
def convertStorageFormat(storage: CatalogStorageFormat): CatalogStorageFormat
128
}
129
```
130
131
## Configuration and Connection
132
133
### Client Implementation
134
135
```scala { .api }
136
private[hive] class HiveClientImpl(
137
version: HiveVersion,
138
sparkConf: SparkConf,
139
hadoopConf: Configuration,
140
extraClassPath: Seq[URL],
141
classLoader: ClassLoader,
142
config: Map[String, String]
143
) extends HiveClient {
144
145
/** Initialize connection to Hive metastore */
146
def initialize(): Unit
147
148
/** Close connection to Hive metastore */
149
def close(): Unit
150
151
/** Execute raw Hive metastore thrift call */
152
def runSqlHive(sql: String): Seq[String]
153
}
154
```
155
156
### Isolated Client Loader
157
158
Manages class loading isolation for different Hive versions.
159
160
```scala { .api }
161
private[hive] class IsolatedClientLoader(
162
version: HiveVersion,
163
sparkConf: SparkConf,
164
execJars: Seq[URL],
165
hadoopConf: Configuration,
166
config: Map[String, String]
167
) extends Logging {
168
169
/** Create isolated Hive client instance */
170
def createClient(): HiveClient
171
172
/** Get underlying class loader for this Hive version */
173
def classLoader: ClassLoader
174
}
175
```
176
177
## Usage Examples
178
179
### Basic Metastore Operations
180
181
```scala
182
import org.apache.spark.sql.SparkSession
183
184
val spark = SparkSession.builder()
185
.enableHiveSupport()
186
.getOrCreate()
187
188
// List databases
189
spark.sql("SHOW DATABASES").show()
190
191
// Create database
192
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")
193
194
// Use database
195
spark.sql("USE my_database")
196
197
// Create external table pointing to Hive data
198
spark.sql("""
199
CREATE TABLE IF NOT EXISTS users (
200
id INT,
201
name STRING,
202
age INT
203
)
204
USING HIVE
205
LOCATION '/user/hive/warehouse/users'
206
""")
207
208
// Access table metadata
209
val tableMetadata = spark.catalog.getTable("my_database", "users")
210
println(s"Table location: ${tableMetadata}")
211
```
212
213
### Working with Partitioned Tables
214
215
```scala
216
// Create partitioned table
217
spark.sql("""
218
CREATE TABLE IF NOT EXISTS sales (
219
product_id INT,
220
quantity INT,
221
revenue DOUBLE
222
)
223
PARTITIONED BY (year INT, month INT)
224
USING HIVE
225
""")
226
227
// Add partition
228
spark.sql("""
229
ALTER TABLE sales
230
ADD PARTITION (year=2023, month=12)
231
LOCATION '/user/hive/warehouse/sales/year=2023/month=12'
232
""")
233
234
// Query specific partition
235
val decemberSales = spark.sql("""
236
SELECT * FROM sales
237
WHERE year = 2023 AND month = 12
238
""")
239
```
240
241
## Error Handling
242
243
Common metastore integration exceptions:
244
245
- **MetaException**: General Hive metastore errors
246
- **NoSuchObjectException**: When database or table doesn't exist
247
- **AlreadyExistsException**: When trying to create existing database/table
248
- **InvalidOperationException**: For invalid metastore operations
249
250
```scala
251
import org.apache.hadoop.hive.metastore.api.MetaException
252
253
try {
254
spark.sql("DROP TABLE non_existent_table")
255
} catch {
256
case e: AnalysisException if e.getMessage.contains("Table or view not found") =>
257
println("Table does not exist")
258
case e: MetaException =>
259
println(s"Metastore error: ${e.getMessage}")
260
}
261
```
262
263
## Types
264
265
### Hive Version Support
266
267
```scala { .api }
268
case class HiveVersion(
269
fullVersion: String,
270
majorVersion: Int,
271
minorVersion: Int
272
) {
273
def supportsFeature(feature: String): Boolean
274
}
275
```
276
277
### Table and Database Types
278
279
```scala { .api }
280
case class CatalogDatabase(
281
name: String,
282
description: String,
283
locationUri: String,
284
properties: Map[String, String]
285
)
286
287
case class CatalogTable(
288
identifier: TableIdentifier,
289
tableType: CatalogTableType,
290
storage: CatalogStorageFormat,
291
schema: StructType,
292
provider: Option[String],
293
partitionColumnNames: Seq[String],
294
bucketSpec: Option[BucketSpec],
295
properties: Map[String, String]
296
)
297
298
case class CatalogTablePartition(
299
spec: TablePartitionSpec,
300
storage: CatalogStorageFormat,
301
parameters: Map[String, String]
302
) {
303
def location: Option[URI]
304
def toRow: InternalRow
305
}
306
307
case class CatalogFunction(
308
identifier: FunctionIdentifier,
309
className: String,
310
resources: Seq[FunctionResource]
311
)
312
```