or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-utilities.mdfile-formats.mdhive-client.mdindex.mdmetastore-integration.mdquery-execution.mdsession-configuration.mdudf-support.md

metastore-integration.mddocs/

0

# Metastore Integration

1

2

Integration with Hive metastore for table metadata, database operations, and catalog management.

3

4

## Core Imports

5

6

```scala

7

import org.apache.spark.sql.hive.HiveExternalCatalog

8

import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl, HiveVersion}

9

import org.apache.spark.sql.hive.HiveMetastoreCatalog

10

import org.apache.spark.sql.catalyst.catalog._

11

import org.apache.spark.sql.execution.datasources.LogicalRelation

12

```

13

14

## Capabilities

15

16

### External Catalog Operations

17

18

Access to Hive metastore through the external catalog interface.

19

20

```scala { .api }

21

class HiveExternalCatalog(

22

conf: SparkConf,

23

hadoopConf: Configuration

24

) extends ExternalCatalog {

25

26

/** Create database in Hive metastore */

27

override def createDatabase(

28

dbDefinition: CatalogDatabase,

29

ignoreIfExists: Boolean

30

): Unit

31

32

/** Drop database from Hive metastore */

33

override def dropDatabase(

34

db: String,

35

ignoreIfNotExists: Boolean,

36

cascade: Boolean

37

): Unit

38

39

/** Create table in Hive metastore */

40

override def createTable(

41

tableDefinition: CatalogTable,

42

ignoreIfExists: Boolean

43

): Unit

44

45

/** Drop table from Hive metastore */

46

override def dropTable(

47

db: String,

48

table: String,

49

ignoreIfNotExists: Boolean,

50

purge: Boolean

51

): Unit

52

53

/** Get table metadata from Hive metastore */

54

override def getTable(db: String, table: String): CatalogTable

55

56

/** List all tables in database */

57

override def listTables(db: String, pattern: Option[String]): Seq[String]

58

}

59

```

60

61

### Hive Client Interface

62

63

Low-level interface for Hive metastore client operations.

64

65

```scala { .api }

66

private[hive] trait HiveClient {

67

/** Get Hive version information */

68

def version: HiveVersion

69

70

/** Get database metadata */

71

def getDatabase(name: String): CatalogDatabase

72

73

/** List all databases */

74

def listDatabases(pattern: String): Seq[String]

75

76

/** Get table metadata with detailed schema information */

77

def getTable(dbName: String, tableName: String): CatalogTable

78

79

/** List tables in database matching pattern */

80

def listTables(dbName: String, pattern: String): Seq[String]

81

82

/** Create new table in metastore */

83

def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit

84

85

/** Drop table from metastore */

86

def dropTable(

87

dbName: String,

88

tableName: String,

89

ignoreIfNotExists: Boolean,

90

purge: Boolean

91

): Unit

92

93

/** Get partitions for partitioned table */

94

def getPartitions(

95

table: CatalogTable,

96

spec: Option[TablePartitionSpec]

97

): Seq[CatalogTablePartition]

98

99

/** Create partition in partitioned table */

100

def createPartitions(

101

table: CatalogTable,

102

parts: Seq[CatalogTablePartition],

103

ignoreIfExists: Boolean

104

): Unit

105

}

106

```

107

108

### Metastore Catalog Conversion

109

110

Converts Hive table relations for optimized Spark processing.

111

112

```scala { .api }

113

class HiveMetastoreCatalog(sparkSession: SparkSession) {

114

/**

115

* Convert Hive table relation to optimized Spark relation

116

* @param relation Hive table relation to convert

117

* @param isWrite Whether this conversion is for write operations

118

* @returns Converted logical relation

119

*/

120

def convert(relation: HiveTableRelation, isWrite: Boolean): LogicalRelation

121

122

/**

123

* Convert storage format for data source operations

124

* @param storage Catalog storage format to convert

125

* @returns Converted storage format

126

*/

127

def convertStorageFormat(storage: CatalogStorageFormat): CatalogStorageFormat

128

}

129

```

130

131

## Configuration and Connection

132

133

### Client Implementation

134

135

```scala { .api }

136

private[hive] class HiveClientImpl(

137

version: HiveVersion,

138

sparkConf: SparkConf,

139

hadoopConf: Configuration,

140

extraClassPath: Seq[URL],

141

classLoader: ClassLoader,

142

config: Map[String, String]

143

) extends HiveClient {

144

145

/** Initialize connection to Hive metastore */

146

def initialize(): Unit

147

148

/** Close connection to Hive metastore */

149

def close(): Unit

150

151

/** Execute raw Hive metastore thrift call */

152

def runSqlHive(sql: String): Seq[String]

153

}

154

```

155

156

### Isolated Client Loader

157

158

Manages class loading isolation for different Hive versions.

159

160

```scala { .api }

161

private[hive] class IsolatedClientLoader(

162

version: HiveVersion,

163

sparkConf: SparkConf,

164

execJars: Seq[URL],

165

hadoopConf: Configuration,

166

config: Map[String, String]

167

) extends Logging {

168

169

/** Create isolated Hive client instance */

170

def createClient(): HiveClient

171

172

/** Get underlying class loader for this Hive version */

173

def classLoader: ClassLoader

174

}

175

```

176

177

## Usage Examples

178

179

### Basic Metastore Operations

180

181

```scala

182

import org.apache.spark.sql.SparkSession

183

184

val spark = SparkSession.builder()

185

.enableHiveSupport()

186

.getOrCreate()

187

188

// List databases

189

spark.sql("SHOW DATABASES").show()

190

191

// Create database

192

spark.sql("CREATE DATABASE IF NOT EXISTS my_database")

193

194

// Use database

195

spark.sql("USE my_database")

196

197

// Create external table pointing to Hive data

198

spark.sql("""

199

CREATE TABLE IF NOT EXISTS users (

200

id INT,

201

name STRING,

202

age INT

203

)

204

USING HIVE

205

LOCATION '/user/hive/warehouse/users'

206

""")

207

208

// Access table metadata

209

val tableMetadata = spark.catalog.getTable("my_database", "users")

210

println(s"Table location: ${tableMetadata}")

211

```

212

213

### Working with Partitioned Tables

214

215

```scala

216

// Create partitioned table

217

spark.sql("""

218

CREATE TABLE IF NOT EXISTS sales (

219

product_id INT,

220

quantity INT,

221

revenue DOUBLE

222

)

223

PARTITIONED BY (year INT, month INT)

224

USING HIVE

225

""")

226

227

// Add partition

228

spark.sql("""

229

ALTER TABLE sales

230

ADD PARTITION (year=2023, month=12)

231

LOCATION '/user/hive/warehouse/sales/year=2023/month=12'

232

""")

233

234

// Query specific partition

235

val decemberSales = spark.sql("""

236

SELECT * FROM sales

237

WHERE year = 2023 AND month = 12

238

""")

239

```

240

241

## Error Handling

242

243

Common metastore integration exceptions:

244

245

- **MetaException**: General Hive metastore errors

246

- **NoSuchObjectException**: When database or table doesn't exist

247

- **AlreadyExistsException**: When trying to create existing database/table

248

- **InvalidOperationException**: For invalid metastore operations

249

250

```scala

251

import org.apache.hadoop.hive.metastore.api.MetaException

252

253

try {

254

spark.sql("DROP TABLE non_existent_table")

255

} catch {

256

case e: AnalysisException if e.getMessage.contains("Table or view not found") =>

257

println("Table does not exist")

258

case e: MetaException =>

259

println(s"Metastore error: ${e.getMessage}")

260

}

261

```

262

263

## Types

264

265

### Hive Version Support

266

267

```scala { .api }

268

case class HiveVersion(

269

fullVersion: String,

270

majorVersion: Int,

271

minorVersion: Int

272

) {

273

def supportsFeature(feature: String): Boolean

274

}

275

```

276

277

### Table and Database Types

278

279

```scala { .api }

280

case class CatalogDatabase(

281

name: String,

282

description: String,

283

locationUri: String,

284

properties: Map[String, String]

285

)

286

287

case class CatalogTable(

288

identifier: TableIdentifier,

289

tableType: CatalogTableType,

290

storage: CatalogStorageFormat,

291

schema: StructType,

292

provider: Option[String],

293

partitionColumnNames: Seq[String],

294

bucketSpec: Option[BucketSpec],

295

properties: Map[String, String]

296

)

297

298

case class CatalogTablePartition(

299

spec: TablePartitionSpec,

300

storage: CatalogStorageFormat,

301

parameters: Map[String, String]

302

) {

303

def location: Option[URI]

304

def toRow: InternalRow

305

}

306

307

case class CatalogFunction(

308

identifier: FunctionIdentifier,

309

className: String,

310

resources: Seq[FunctionResource]

311

)

312

```