or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddata-conversion.mdexternal-catalog.mdhive-client.mdindex.mdudf-support.md

external-catalog.mddocs/

0

# External Catalog Operations

1

2

Complete Hive metastore integration providing database, table, partition, and function operations through the Spark catalog interface. The `HiveExternalCatalog` serves as the primary bridge between Spark's catalog system and Hive's metastore.

3

4

## Capabilities

5

6

### HiveExternalCatalog Class

7

8

Main implementation of Spark's ExternalCatalog interface backed by Hive metastore.

9

10

```scala { .api }

11

/**

12

* Hive-backed external catalog implementation for metastore operations

13

* @param conf Spark configuration

14

* @param hadoopConf Hadoop configuration

15

*/

16

class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) extends ExternalCatalog {

17

lazy val client: HiveClient

18

}

19

```

20

21

**Usage Example:**

22

23

```scala

24

import org.apache.spark.sql.hive.HiveExternalCatalog

25

import org.apache.spark.SparkConf

26

import org.apache.hadoop.conf.Configuration

27

28

val conf = new SparkConf()

29

val hadoopConf = new Configuration()

30

val catalog = new HiveExternalCatalog(conf, hadoopConf)

31

32

// Access the underlying Hive client

33

val hiveClient = catalog.client

34

```

35

36

### Database Operations

37

38

Manage Hive databases through the external catalog interface.

39

40

```scala { .api }

41

/**

42

* Create a new database in the Hive metastore

43

* @param dbDefinition Database definition with metadata

44

* @param ignoreIfExists If true, don't fail if database already exists

45

*/

46

def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit

47

48

/**

49

* Drop a database from the Hive metastore

50

* @param db Database name

51

* @param ignoreIfNotExists If true, don't fail if database doesn't exist

52

* @param cascade If true, drop all tables in the database

53

*/

54

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

55

56

/**

57

* Modify an existing database's metadata

58

* @param dbDefinition Updated database definition

59

*/

60

def alterDatabase(dbDefinition: CatalogDatabase): Unit

61

62

/**

63

* Get database metadata

64

* @param db Database name

65

* @return Database definition

66

*/

67

def getDatabase(db: String): CatalogDatabase

68

69

/**

70

* Check if a database exists

71

* @param db Database name

72

* @return True if database exists

73

*/

74

def databaseExists(db: String): Boolean

75

76

/**

77

* List all databases

78

* @return Sequence of database names

79

*/

80

def listDatabases(): Seq[String]

81

82

/**

83

* List databases matching a pattern

84

* @param pattern Pattern to match (SQL LIKE pattern)

85

* @return Sequence of matching database names

86

*/

87

def listDatabases(pattern: String): Seq[String]

88

89

/**

90

* Set the current database

91

* @param db Database name to set as current

92

*/

93

def setCurrentDatabase(db: String): Unit

94

```

95

96

**Usage Examples:**

97

98

```scala

99

import org.apache.spark.sql.catalyst.catalog.CatalogDatabase

100

101

// Create a database

102

val dbDef = CatalogDatabase(

103

name = "my_database",

104

description = "Test database",

105

locationUri = new URI("hdfs://namenode:9000/user/hive/warehouse/my_database.db"),

106

properties = Map.empty

107

)

108

catalog.createDatabase(dbDef, ignoreIfExists = true)

109

110

// List databases

111

val databases = catalog.listDatabases()

112

println(s"Available databases: ${databases.mkString(", ")}")

113

114

// Get database info

115

val dbInfo = catalog.getDatabase("my_database")

116

println(s"Database location: ${dbInfo.locationUri}")

117

```

118

119

### Table Operations

120

121

Comprehensive table management including creation, modification, and metadata access.

122

123

```scala { .api }

124

/**

125

* Create a new table in the Hive metastore

126

* @param tableDefinition Complete table definition

127

* @param ignoreIfExists If true, don't fail if table already exists

128

*/

129

def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit

130

131

/**

132

* Drop a table from the Hive metastore

133

* @param db Database name

134

* @param table Table name

135

* @param ignoreIfNotExists If true, don't fail if table doesn't exist

136

* @param purge If true, delete table data immediately

137

*/

138

def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit

139

140

/**

141

* Rename a table

142

* @param db Database name

143

* @param oldName Current table name

144

* @param newName New table name

145

*/

146

def renameTable(db: String, oldName: String, newName: String): Unit

147

148

/**

149

* Modify table metadata

150

* @param tableDefinition Updated table definition

151

*/

152

def alterTable(tableDefinition: CatalogTable): Unit

153

154

/**

155

* Alter table's data schema

156

* @param db Database name

157

* @param table Table name

158

* @param newDataSchema New schema structure

159

*/

160

def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit

161

162

/**

163

* Update table statistics

164

* @param db Database name

165

* @param table Table name

166

* @param stats Optional statistics to set

167

*/

168

def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit

169

170

/**

171

* Get table metadata

172

* @param db Database name

173

* @param table Table name

174

* @return Complete table definition

175

*/

176

def getTable(db: String, table: String): CatalogTable

177

178

/**

179

* Get multiple tables by name

180

* @param db Database name

181

* @param tables Sequence of table names

182

* @return Sequence of table definitions

183

*/

184

def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable]

185

186

/**

187

* Check if a table exists

188

* @param db Database name

189

* @param table Table name

190

* @return True if table exists

191

*/

192

def tableExists(db: String, table: String): Boolean

193

194

/**

195

* List all tables in a database

196

* @param db Database name

197

* @return Sequence of table names

198

*/

199

def listTables(db: String): Seq[String]

200

201

/**

202

* List tables matching a pattern

203

* @param db Database name

204

* @param pattern Pattern to match (SQL LIKE pattern)

205

* @return Sequence of matching table names

206

*/

207

def listTables(db: String, pattern: String): Seq[String]

208

209

/**

210

* List all views in a database

211

* @param db Database name

212

* @return Sequence of view names

213

*/

214

def listViews(db: String): Seq[String]

215

216

/**

217

* List views matching a pattern

218

* @param db Database name

219

* @param pattern Pattern to match (SQL LIKE pattern)

220

* @return Sequence of matching view names

221

*/

222

def listViews(db: String, pattern: String): Seq[String]

223

```

224

225

**Usage Examples:**

226

227

```scala

228

import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}

229

import org.apache.spark.sql.types._

230

231

// Create a table

232

val schema = StructType(Seq(

233

StructField("id", IntegerType, nullable = false),

234

StructField("name", StringType, nullable = true),

235

StructField("created_at", TimestampType, nullable = true)

236

))

237

238

val tableDef = CatalogTable(

239

identifier = TableIdentifier("users", Some("my_database")),

240

tableType = CatalogTableType.MANAGED,

241

storage = CatalogStorageFormat.empty.copy(

242

inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),

243

outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),

244

serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")

245

),

246

schema = schema

247

)

248

249

catalog.createTable(tableDef, ignoreIfExists = true)

250

251

// List tables

252

val tables = catalog.listTables("my_database")

253

println(s"Tables: ${tables.mkString(", ")}")

254

255

// Get table info

256

val tableInfo = catalog.getTable("my_database", "users")

257

println(s"Table schema: ${tableInfo.schema}")

258

```

259

260

### Data Loading Operations

261

262

Load data into Hive tables and partitions.

263

264

```scala { .api }

265

/**

266

* Load data into a table

267

* @param db Database name

268

* @param table Table name

269

* @param loadPath Path to data files

270

* @param isOverwrite If true, overwrite existing data

271

* @param isSrcLocal If true, source is local filesystem

272

*/

273

def loadTable(db: String, table: String, loadPath: String,

274

isOverwrite: Boolean, isSrcLocal: Boolean): Unit

275

276

/**

277

* Load data into a partition

278

* @param db Database name

279

* @param table Table name

280

* @param loadPath Path to data files

281

* @param partition Partition specification

282

* @param isOverwrite If true, overwrite existing data

283

* @param inheritTableSpecs If true, inherit table specifications

284

* @param isSrcLocal If true, source is local filesystem

285

*/

286

def loadPartition(db: String, table: String, loadPath: String,

287

partition: TablePartitionSpec, isOverwrite: Boolean,

288

inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit

289

290

/**

291

* Load data into dynamic partitions

292

* @param db Database name

293

* @param table Table name

294

* @param loadPath Path to data files

295

* @param partition Partition specification

296

* @param replace If true, replace existing partitions

297

* @param numDP Number of dynamic partitions

298

*/

299

def loadDynamicPartitions(db: String, table: String, loadPath: String,

300

partition: TablePartitionSpec, replace: Boolean,

301

numDP: Int): Unit

302

```

303

304

### Partition Operations

305

306

Manage table partitions including creation, modification, and querying.

307

308

```scala { .api }

309

/**

310

* Create table partitions

311

* @param db Database name

312

* @param table Table name

313

* @param parts Sequence of partition definitions

314

* @param ignoreIfExists If true, don't fail if partitions already exist

315

*/

316

def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition],

317

ignoreIfExists: Boolean): Unit

318

319

/**

320

* Drop table partitions

321

* @param db Database name

322

* @param table Table name

323

* @param parts Sequence of partition specifications

324

* @param ignoreIfNotExists If true, don't fail if partitions don't exist

325

* @param purge If true, delete partition data immediately

326

* @param retainData If true, keep partition data files

327

*/

328

def dropPartitions(db: String, table: String, parts: Seq[TablePartitionSpec],

329

ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit

330

331

/**

332

* Rename table partitions

333

* @param db Database name

334

* @param table Table name

335

* @param specs Current partition specifications

336

* @param newSpecs New partition specifications

337

*/

338

def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec],

339

newSpecs: Seq[TablePartitionSpec]): Unit

340

341

/**

342

* Modify partition metadata

343

* @param db Database name

344

* @param table Table name

345

* @param parts Updated partition definitions

346

*/

347

def alterPartitions(db: String, table: String, parts: Seq[CatalogTablePartition]): Unit

348

349

/**

350

* Get partition metadata

351

* @param db Database name

352

* @param table Table name

353

* @param spec Partition specification

354

* @return Partition definition

355

*/

356

def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition

357

358

/**

359

* Get partition metadata optionally

360

* @param db Database name

361

* @param table Table name

362

* @param spec Partition specification

363

* @return Optional partition definition

364

*/

365

def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition]

366

367

/**

368

* List partition names

369

* @param db Database name

370

* @param table Table name

371

* @param partialSpec Optional partial partition specification for filtering

372

* @return Sequence of partition names

373

*/

374

def listPartitionNames(db: String, table: String,

375

partialSpec: Option[TablePartitionSpec]): Seq[String]

376

377

/**

378

* List partitions

379

* @param db Database name

380

* @param table Table name

381

* @param partialSpec Optional partial partition specification for filtering

382

* @return Sequence of partition definitions

383

*/

384

def listPartitions(db: String, table: String,

385

partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]

386

387

/**

388

* List partitions matching filter predicates

389

* @param db Database name

390

* @param table Table name

391

* @param predicates Filter expressions

392

* @param defaultTimeZoneId Default timezone for date/time operations

393

* @return Sequence of matching partition definitions

394

*/

395

def listPartitionsByFilter(db: String, table: String, predicates: Seq[Expression],

396

defaultTimeZoneId: String): Seq[CatalogTablePartition]

397

```

398

399

### Function Operations

400

401

Manage Hive user-defined functions in the metastore.

402

403

```scala { .api }

404

/**

405

* Create a user-defined function

406

* @param db Database name

407

* @param funcDefinition Function definition

408

*/

409

def createFunction(db: String, funcDefinition: CatalogFunction): Unit

410

411

/**

412

* Drop a user-defined function

413

* @param db Database name

414

* @param funcName Function name

415

*/

416

def dropFunction(db: String, funcName: String): Unit

417

418

/**

419

* Modify function metadata

420

* @param db Database name

421

* @param funcDefinition Updated function definition

422

*/

423

def alterFunction(db: String, funcDefinition: CatalogFunction): Unit

424

425

/**

426

* Rename a function

427

* @param db Database name

428

* @param oldName Current function name

429

* @param newName New function name

430

*/

431

def renameFunction(db: String, oldName: String, newName: String): Unit

432

433

/**

434

* Get function metadata

435

* @param db Database name

436

* @param funcName Function name

437

* @return Function definition

438

*/

439

def getFunction(db: String, funcName: String): CatalogFunction

440

441

/**

442

* Check if a function exists

443

* @param db Database name

444

* @param funcName Function name

445

* @return True if function exists

446

*/

447

def functionExists(db: String, funcName: String): Boolean

448

449

/**

450

* List functions matching a pattern

451

* @param db Database name

452

* @param pattern Pattern to match (SQL LIKE pattern)

453

* @return Sequence of matching function names

454

*/

455

def listFunctions(db: String, pattern: String): Seq[String]

456

```

457

458

### Companion Object Utilities

459

460

Utility methods and constants for working with Hive tables.

461

462

```scala { .api }

463

object HiveExternalCatalog {

464

// Metadata key constants

465

val SPARK_SQL_PREFIX: String

466

val DATASOURCE_PREFIX: String

467

val DATASOURCE_PROVIDER: String

468

val DATASOURCE_SCHEMA: String

469

val STATISTICS_PREFIX: String

470

val STATISTICS_TOTAL_SIZE: String

471

val STATISTICS_NUM_ROWS: String

472

val TABLE_PARTITION_PROVIDER: String

473

val CREATED_SPARK_VERSION: String

474

val EMPTY_DATA_SCHEMA: StructType

475

476

/**

477

* Check if a table is a data source table

478

* @param table Table definition

479

* @return True if table uses Spark data source format

480

*/

481

def isDatasourceTable(table: CatalogTable): Boolean

482

483

/**

484

* Check if a data type is compatible with Hive

485

* @param dt Data type to check

486

* @return True if type is Hive-compatible

487

*/

488

def isHiveCompatibleDataType(dt: DataType): Boolean

489

}

490

```

491

492

## Thread Safety

493

494

**Important**: `HiveExternalCatalog` is not thread-safe and requires external synchronization when accessed from multiple threads concurrently. Spark's catalog implementations typically handle this synchronization automatically.