or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddata-type-conversion.mdfile-formats.mdindex.mdmetastore-operations.mdsession-management.mdudf-integration.md

metastore-operations.mddocs/

0

# Metastore Operations

1

2

Direct access to Hive metastore for programmatic database, table, partition, and function management through the HiveClient interface. This provides low-level control over Hive metastore operations beyond what's available through SQL.

3

4

## Capabilities

5

6

### HiveClient Interface

7

8

Core interface for interacting with Hive metastore programmatically.

9

10

```scala { .api }

11

/**

12

* Interface to Hive client for metastore operations

13

* Shared across internal and external classloaders

14

*/

15

private[hive] trait HiveClient {

16

17

/** Returns the Hive Version of this client */

18

def version: HiveVersion

19

20

/** Returns configuration value for the given key */

21

def getConf(key: String, defaultValue: String): String

22

23

/** Returns the associated Hive SessionState */

24

def getState: Any

25

26

/** Execute HiveQL command and return results as strings */

27

def runSqlHive(sql: String): Seq[String]

28

29

/** Set output streams for Hive operations */

30

def setOut(stream: PrintStream): Unit

31

def setInfo(stream: PrintStream): Unit

32

def setError(stream: PrintStream): Unit

33

34

/** Add JAR to class loader */

35

def addJar(path: String): Unit

36

37

/** Create new client session sharing class loader and Hive client */

38

def newSession(): HiveClient

39

40

/** Run function within Hive state (SessionState, HiveConf, Hive client and class loader) */

41

def withHiveState[A](f: => A): A

42

43

/** Remove all metadata - for testing only */

44

def reset(): Unit

45

}

46

```

47

48

### Database Operations

49

50

Comprehensive database management operations.

51

52

```scala { .api }

53

/**

54

* Database management operations

55

*/

56

trait HiveClient {

57

/** List database names matching pattern */

58

def listDatabases(pattern: String): Seq[String]

59

60

/** Get database metadata - throws exception if not found */

61

def getDatabase(name: String): CatalogDatabase

62

63

/** Check if database exists */

64

def databaseExists(dbName: String): Boolean

65

66

/** Set current database */

67

def setCurrentDatabase(databaseName: String): Unit

68

69

/** Create new database */

70

def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit

71

72

/**

73

* Drop database

74

* @param name - Database name to drop

75

* @param ignoreIfNotExists - Don't throw error if database doesn't exist

76

* @param cascade - Remove all associated objects (tables, functions)

77

*/

78

def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

79

80

/** Alter existing database */

81

def alterDatabase(database: CatalogDatabase): Unit

82

}

83

```

84

85

**Usage Examples:**

86

87

```scala

88

// Assuming you have access to HiveClient instance

89

val client: HiveClient = // obtained from HiveExternalCatalog or similar

90

91

// List all databases

92

val databases = client.listDatabases("*")

93

println(s"Available databases: ${databases.mkString(", ")}")

94

95

// Create new database

96

val newDb = CatalogDatabase(

97

name = "analytics_db",

98

description = "Analytics database",

99

locationUri = "hdfs://cluster/user/hive/warehouse/analytics_db.db",

100

properties = Map("owner" -> "analytics_team")

101

)

102

client.createDatabase(newDb, ignoreIfExists = true)

103

104

// Switch to database

105

client.setCurrentDatabase("analytics_db")

106

107

// Get database info

108

val dbInfo = client.getDatabase("analytics_db")

109

println(s"Database location: ${dbInfo.locationUri}")

110

```

111

112

### Table Operations

113

114

Complete table management functionality.

115

116

```scala { .api }

117

/**

118

* Table management operations

119

*/

120

trait HiveClient {

121

/** List all tables in database */

122

def listTables(dbName: String): Seq[String]

123

124

/** List tables matching pattern in database */

125

def listTables(dbName: String, pattern: String): Seq[String]

126

127

/** Check if table exists */

128

def tableExists(dbName: String, tableName: String): Boolean

129

130

/** Get table metadata - throws NoSuchTableException if not found */

131

def getTable(dbName: String, tableName: String): CatalogTable

132

133

/** Get table metadata - returns None if not found */

134

def getTableOption(dbName: String, tableName: String): Option[CatalogTable]

135

136

/** Create new table */

137

def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit

138

139

/** Drop table */

140

def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

141

142

/** Alter existing table */

143

def alterTable(table: CatalogTable): Unit

144

def alterTable(dbName: String, tableName: String, table: CatalogTable): Unit

145

146

/** Update table schema and properties */

147

def alterTableDataSchema(

148

dbName: String,

149

tableName: String,

150

newDataSchema: StructType,

151

schemaProps: Map[String, String]

152

): Unit

153

154

/** Get partitions matching partial spec with optional filtering */

155

def getPartitionsByFilter(

156

catalogTable: CatalogTable,

157

predicates: Seq[Expression]

158

): Seq[CatalogTablePartition]

159

160

/** Add JAR to class loader for UDF/SerDe usage */

161

def addJar(path: String): Unit

162

163

/** Remove all metadata - used for testing only */

164

def reset(): Unit

165

}

166

```

167

168

**Usage Examples:**

169

170

```scala

171

import org.apache.spark.sql.catalyst.TableIdentifier

172

import org.apache.spark.sql.catalyst.catalog._

173

import org.apache.spark.sql.types._

174

175

// List tables

176

val tables = client.listTables("default")

177

println(s"Tables in default database: ${tables.mkString(", ")}")

178

179

// Create table

180

val tableSchema = StructType(Seq(

181

StructField("id", IntegerType, nullable = false),

182

StructField("name", StringType, nullable = true),

183

StructField("age", IntegerType, nullable = true)

184

))

185

186

val newTable = CatalogTable(

187

identifier = TableIdentifier("users", Some("default")),

188

tableType = CatalogTableType.MANAGED,

189

storage = CatalogStorageFormat.empty,

190

schema = tableSchema,

191

partitionColumnNames = Seq.empty,

192

properties = Map("comment" -> "User information table")

193

)

194

195

client.createTable(newTable, ignoreIfExists = true)

196

197

// Get table info

198

val tableInfo = client.getTable("default", "users")

199

println(s"Table schema: ${tableInfo.schema}")

200

println(s"Table type: ${tableInfo.tableType}")

201

202

// Check if table exists

203

if (client.tableExists("default", "users")) {

204

println("Table exists")

205

}

206

```

207

208

### Partition Operations

209

210

Comprehensive partition management for partitioned tables.

211

212

```scala { .api }

213

/**

214

* Partition management operations

215

*/

216

trait HiveClient {

217

/** Create one or more partitions */

218

def createPartitions(

219

db: String,

220

table: String,

221

parts: Seq[CatalogTablePartition],

222

ignoreIfExists: Boolean

223

): Unit

224

225

/** Drop one or more partitions */

226

def dropPartitions(

227

db: String,

228

table: String,

229

specs: Seq[TablePartitionSpec],

230

ignoreIfNotExists: Boolean,

231

purge: Boolean,

232

retainData: Boolean

233

): Unit

234

235

/** Rename existing partitions */

236

def renamePartitions(

237

db: String,

238

table: String,

239

specs: Seq[TablePartitionSpec],

240

newSpecs: Seq[TablePartitionSpec]

241

): Unit

242

243

/** Alter existing partitions */

244

def alterPartitions(

245

db: String,

246

table: String,

247

newParts: Seq[CatalogTablePartition]

248

): Unit

249

250

/** Get partition - throws NoSuchPartitionException if not found */

251

def getPartition(

252

dbName: String,

253

tableName: String,

254

spec: TablePartitionSpec

255

): CatalogTablePartition

256

257

/** Get partition - returns None if not found */

258

def getPartitionOption(

259

table: CatalogTable,

260

spec: TablePartitionSpec

261

): Option[CatalogTablePartition]

262

263

/** Get partitions matching partial spec */

264

def getPartitions(

265

catalogTable: CatalogTable,

266

partialSpec: Option[TablePartitionSpec] = None

267

): Seq[CatalogTablePartition]

268

269

/** Get partition names matching partial spec */

270

def getPartitionNames(

271

table: CatalogTable,

272

partialSpec: Option[TablePartitionSpec] = None

273

): Seq[String]

274

275

/** Get partitions filtered by predicates */

276

def getPartitionsByFilter(

277

catalogTable: CatalogTable,

278

predicates: Seq[Expression]

279

): Seq[CatalogTablePartition]

280

}

281

```

282

283

**Usage Examples:**

284

285

```scala

286

import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec

287

288

// Get table for partition operations

289

val table = client.getTable("sales_db", "monthly_sales")

290

291

// List all partitions

292

val allPartitions = client.getPartitions(table)

293

println(s"Total partitions: ${allPartitions.length}")

294

295

// Get specific partition

296

val partSpec: TablePartitionSpec = Map("year" -> "2023", "month" -> "12")

297

val partition = client.getPartitionOption(table, partSpec)

298

partition match {

299

case Some(part) => println(s"Partition location: ${part.storage.locationUri}")

300

case None => println("Partition not found")

301

}

302

303

// Create new partition

304

val newPartition = CatalogTablePartition(

305

spec = Map("year" -> "2024", "month" -> "01"),

306

storage = CatalogStorageFormat(

307

locationUri = Some("hdfs://cluster/user/hive/warehouse/sales_db.db/monthly_sales/year=2024/month=01"),

308

inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),

309

outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),

310

serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")

311

)

312

)

313

314

client.createPartitions("sales_db", "monthly_sales", Seq(newPartition), ignoreIfExists = true)

315

```

316

317

### Data Loading Operations

318

319

Operations for loading data into tables and partitions.

320

321

```scala { .api }

322

/**

323

* Data loading operations

324

*/

325

trait HiveClient {

326

/** Load data into static partition */

327

def loadPartition(

328

loadPath: String,

329

dbName: String,

330

tableName: String,

331

partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering

332

replace: Boolean,

333

inheritTableSpecs: Boolean,

334

isSrcLocal: Boolean

335

): Unit

336

337

/** Load data into existing table */

338

def loadTable(

339

loadPath: String,

340

tableName: String,

341

replace: Boolean,

342

isSrcLocal: Boolean

343

): Unit

344

345

/** Load data creating dynamic partitions */

346

def loadDynamicPartitions(

347

loadPath: String,

348

dbName: String,

349

tableName: String,

350

partSpec: java.util.LinkedHashMap[String, String], // Hive requires LinkedHashMap ordering

351

replace: Boolean,

352

numDP: Int

353

): Unit

354

}

355

```

356

357

### Function Operations

358

359

Management of user-defined functions in Hive metastore.

360

361

```scala { .api }

362

/**

363

* Function management operations

364

*/

365

trait HiveClient {

366

/** Create function in database */

367

def createFunction(db: String, func: CatalogFunction): Unit

368

369

/** Drop existing function */

370

def dropFunction(db: String, name: String): Unit

371

372

/** Rename existing function */

373

def renameFunction(db: String, oldName: String, newName: String): Unit

374

375

/** Alter existing function */

376

def alterFunction(db: String, func: CatalogFunction): Unit

377

378

/** Get function - throws NoSuchPermanentFunctionException if not found */

379

def getFunction(db: String, name: String): CatalogFunction

380

381

/** Get function - returns None if not found */

382

def getFunctionOption(db: String, name: String): Option[CatalogFunction]

383

384

/** Check if function exists */

385

def functionExists(db: String, name: String): Boolean

386

387

/** List functions matching pattern */

388

def listFunctions(db: String, pattern: String): Seq[String]

389

}

390

```

391

392

**Usage Examples:**

393

394

```scala

395

import org.apache.spark.sql.catalyst.FunctionIdentifier

396

import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}

397

398

// List functions

399

val functions = client.listFunctions("default", "*")

400

println(s"Available functions: ${functions.mkString(", ")}")

401

402

// Create custom function

403

val customFunction = CatalogFunction(

404

identifier = FunctionIdentifier("my_upper", Some("default")),

405

className = "com.example.MyUpperUDF",

406

resources = Seq(FunctionResource(

407

resourceType = org.apache.spark.sql.catalyst.catalog.FunctionResourceType.jar,

408

uri = "hdfs://cluster/jars/my-udfs.jar"

409

))

410

)

411

412

client.createFunction("default", customFunction)

413

414

// Check if function exists

415

if (client.functionExists("default", "my_upper")) {

416

println("Custom function created successfully")

417

}

418

```

419

420

## Types

421

422

### Hive Version Support

423

424

```scala { .api }

425

/**

426

* Abstract class representing Hive version with dependencies

427

*/

428

abstract class HiveVersion(

429

val fullVersion: String,

430

val extraDeps: Seq[String] = Nil,

431

val exclusions: Seq[String] = Nil

432

)

433

434

// Supported Hive versions

435

val allSupportedHiveVersions: Set[HiveVersion] = Set(

436

v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3

437

)

438

```