0
# External Catalog Operations
1
2
Complete Hive metastore integration providing database, table, partition, and function operations through the Spark catalog interface. The `HiveExternalCatalog` serves as the primary bridge between Spark's catalog system and Hive's metastore.
3
4
## Capabilities
5
6
### HiveExternalCatalog Class
7
8
Main implementation of Spark's ExternalCatalog interface backed by Hive metastore.
9
10
```scala { .api }
11
/**
12
* Hive-backed external catalog implementation for metastore operations
13
* @param conf Spark configuration
14
* @param hadoopConf Hadoop configuration
15
*/
16
class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) extends ExternalCatalog {
17
lazy val client: HiveClient
18
}
19
```
20
21
**Usage Example:**
22
23
```scala
24
import org.apache.spark.sql.hive.HiveExternalCatalog
25
import org.apache.spark.SparkConf
26
import org.apache.hadoop.conf.Configuration
27
28
val conf = new SparkConf()
29
val hadoopConf = new Configuration()
30
val catalog = new HiveExternalCatalog(conf, hadoopConf)
31
32
// Access the underlying Hive client
33
val hiveClient = catalog.client
34
```
35
36
### Database Operations
37
38
Manage Hive databases through the external catalog interface.
39
40
```scala { .api }
41
/**
42
* Create a new database in the Hive metastore
43
* @param dbDefinition Database definition with metadata
44
* @param ignoreIfExists If true, don't fail if database already exists
45
*/
46
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
47
48
/**
49
* Drop a database from the Hive metastore
50
* @param db Database name
51
* @param ignoreIfNotExists If true, don't fail if database doesn't exist
52
* @param cascade If true, drop all tables in the database
53
*/
54
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
55
56
/**
57
* Modify an existing database's metadata
58
* @param dbDefinition Updated database definition
59
*/
60
def alterDatabase(dbDefinition: CatalogDatabase): Unit
61
62
/**
63
* Get database metadata
64
* @param db Database name
65
* @return Database definition
66
*/
67
def getDatabase(db: String): CatalogDatabase
68
69
/**
70
* Check if a database exists
71
* @param db Database name
72
* @return True if database exists
73
*/
74
def databaseExists(db: String): Boolean
75
76
/**
77
* List all databases
78
* @return Sequence of database names
79
*/
80
def listDatabases(): Seq[String]
81
82
/**
83
* List databases matching a pattern
84
* @param pattern Pattern to match (SQL LIKE pattern)
85
* @return Sequence of matching database names
86
*/
87
def listDatabases(pattern: String): Seq[String]
88
89
/**
90
* Set the current database
91
* @param db Database name to set as current
92
*/
93
def setCurrentDatabase(db: String): Unit
94
```
95
96
**Usage Examples:**
97
98
```scala
99
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
100
101
// Create a database
102
val dbDef = CatalogDatabase(
103
name = "my_database",
104
description = "Test database",
105
locationUri = new URI("hdfs://namenode:9000/user/hive/warehouse/my_database.db"),
106
properties = Map.empty
107
)
108
catalog.createDatabase(dbDef, ignoreIfExists = true)
109
110
// List databases
111
val databases = catalog.listDatabases()
112
println(s"Available databases: ${databases.mkString(", ")}")
113
114
// Get database info
115
val dbInfo = catalog.getDatabase("my_database")
116
println(s"Database location: ${dbInfo.locationUri}")
117
```
118
119
### Table Operations
120
121
Comprehensive table management including creation, modification, and metadata access.
122
123
```scala { .api }
124
/**
125
* Create a new table in the Hive metastore
126
* @param tableDefinition Complete table definition
127
* @param ignoreIfExists If true, don't fail if table already exists
128
*/
129
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
130
131
/**
132
* Drop a table from the Hive metastore
133
* @param db Database name
134
* @param table Table name
135
* @param ignoreIfNotExists If true, don't fail if table doesn't exist
136
* @param purge If true, delete table data immediately
137
*/
138
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
139
140
/**
141
* Rename a table
142
* @param db Database name
143
* @param oldName Current table name
144
* @param newName New table name
145
*/
146
def renameTable(db: String, oldName: String, newName: String): Unit
147
148
/**
149
* Modify table metadata
150
* @param tableDefinition Updated table definition
151
*/
152
def alterTable(tableDefinition: CatalogTable): Unit
153
154
/**
155
* Alter table's data schema
156
* @param db Database name
157
* @param table Table name
158
* @param newDataSchema New schema structure
159
*/
160
def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
161
162
/**
163
* Update table statistics
164
* @param db Database name
165
* @param table Table name
166
* @param stats Optional statistics to set
167
*/
168
def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
169
170
/**
171
* Get table metadata
172
* @param db Database name
173
* @param table Table name
174
* @return Complete table definition
175
*/
176
def getTable(db: String, table: String): CatalogTable
177
178
/**
179
* Get multiple tables by name
180
* @param db Database name
181
* @param tables Sequence of table names
182
* @return Sequence of table definitions
183
*/
184
def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable]
185
186
/**
187
* Check if a table exists
188
* @param db Database name
189
* @param table Table name
190
* @return True if table exists
191
*/
192
def tableExists(db: String, table: String): Boolean
193
194
/**
195
* List all tables in a database
196
* @param db Database name
197
* @return Sequence of table names
198
*/
199
def listTables(db: String): Seq[String]
200
201
/**
202
* List tables matching a pattern
203
* @param db Database name
204
* @param pattern Pattern to match (SQL LIKE pattern)
205
* @return Sequence of matching table names
206
*/
207
def listTables(db: String, pattern: String): Seq[String]
208
209
/**
210
* List all views in a database
211
* @param db Database name
212
* @return Sequence of view names
213
*/
214
def listViews(db: String): Seq[String]
215
216
/**
217
* List views matching a pattern
218
* @param db Database name
219
* @param pattern Pattern to match (SQL LIKE pattern)
220
* @return Sequence of matching view names
221
*/
222
def listViews(db: String, pattern: String): Seq[String]
223
```
224
225
**Usage Examples:**
226
227
```scala
228
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
229
import org.apache.spark.sql.types._
230
231
// Create a table
232
val schema = StructType(Seq(
233
StructField("id", IntegerType, nullable = false),
234
StructField("name", StringType, nullable = true),
235
StructField("created_at", TimestampType, nullable = true)
236
))
237
238
val tableDef = CatalogTable(
239
identifier = TableIdentifier("users", Some("my_database")),
240
tableType = CatalogTableType.MANAGED,
241
storage = CatalogStorageFormat.empty.copy(
242
inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
243
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"),
244
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
245
),
246
schema = schema
247
)
248
249
catalog.createTable(tableDef, ignoreIfExists = true)
250
251
// List tables
252
val tables = catalog.listTables("my_database")
253
println(s"Tables: ${tables.mkString(", ")}")
254
255
// Get table info
256
val tableInfo = catalog.getTable("my_database", "users")
257
println(s"Table schema: ${tableInfo.schema}")
258
```
259
260
### Data Loading Operations
261
262
Load data into Hive tables and partitions.
263
264
```scala { .api }
265
/**
266
* Load data into a table
267
* @param db Database name
268
* @param table Table name
269
* @param loadPath Path to data files
270
* @param isOverwrite If true, overwrite existing data
271
* @param isSrcLocal If true, source is local filesystem
272
*/
273
def loadTable(db: String, table: String, loadPath: String,
274
isOverwrite: Boolean, isSrcLocal: Boolean): Unit
275
276
/**
277
* Load data into a partition
278
* @param db Database name
279
* @param table Table name
280
* @param loadPath Path to data files
281
* @param partition Partition specification
282
* @param isOverwrite If true, overwrite existing data
283
* @param inheritTableSpecs If true, inherit table specifications
284
* @param isSrcLocal If true, source is local filesystem
285
*/
286
def loadPartition(db: String, table: String, loadPath: String,
287
partition: TablePartitionSpec, isOverwrite: Boolean,
288
inheritTableSpecs: Boolean, isSrcLocal: Boolean): Unit
289
290
/**
291
* Load data into dynamic partitions
292
* @param db Database name
293
* @param table Table name
294
* @param loadPath Path to data files
295
* @param partition Partition specification
296
* @param replace If true, replace existing partitions
297
* @param numDP Number of dynamic partitions
298
*/
299
def loadDynamicPartitions(db: String, table: String, loadPath: String,
300
partition: TablePartitionSpec, replace: Boolean,
301
numDP: Int): Unit
302
```
303
304
### Partition Operations
305
306
Manage table partitions including creation, modification, and querying.
307
308
```scala { .api }
309
/**
310
* Create table partitions
311
* @param db Database name
312
* @param table Table name
313
* @param parts Sequence of partition definitions
314
* @param ignoreIfExists If true, don't fail if partitions already exist
315
*/
316
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition],
317
ignoreIfExists: Boolean): Unit
318
319
/**
320
* Drop table partitions
321
* @param db Database name
322
* @param table Table name
323
* @param parts Sequence of partition specifications
324
* @param ignoreIfNotExists If true, don't fail if partitions don't exist
325
* @param purge If true, delete partition data immediately
326
* @param retainData If true, keep partition data files
327
*/
328
def dropPartitions(db: String, table: String, parts: Seq[TablePartitionSpec],
329
ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit
330
331
/**
332
* Rename table partitions
333
* @param db Database name
334
* @param table Table name
335
* @param specs Current partition specifications
336
* @param newSpecs New partition specifications
337
*/
338
def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec],
339
newSpecs: Seq[TablePartitionSpec]): Unit
340
341
/**
342
* Modify partition metadata
343
* @param db Database name
344
* @param table Table name
345
* @param parts Updated partition definitions
346
*/
347
def alterPartitions(db: String, table: String, parts: Seq[CatalogTablePartition]): Unit
348
349
/**
350
* Get partition metadata
351
* @param db Database name
352
* @param table Table name
353
* @param spec Partition specification
354
* @return Partition definition
355
*/
356
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
357
358
/**
359
* Get partition metadata optionally
360
* @param db Database name
361
* @param table Table name
362
* @param spec Partition specification
363
* @return Optional partition definition
364
*/
365
def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition]
366
367
/**
368
* List partition names
369
* @param db Database name
370
* @param table Table name
371
* @param partialSpec Optional partial partition specification for filtering
372
* @return Sequence of partition names
373
*/
374
def listPartitionNames(db: String, table: String,
375
partialSpec: Option[TablePartitionSpec]): Seq[String]
376
377
/**
378
* List partitions
379
* @param db Database name
380
* @param table Table name
381
* @param partialSpec Optional partial partition specification for filtering
382
* @return Sequence of partition definitions
383
*/
384
def listPartitions(db: String, table: String,
385
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]
386
387
/**
388
* List partitions matching filter predicates
389
* @param db Database name
390
* @param table Table name
391
* @param predicates Filter expressions
392
* @param defaultTimeZoneId Default timezone for date/time operations
393
* @return Sequence of matching partition definitions
394
*/
395
def listPartitionsByFilter(db: String, table: String, predicates: Seq[Expression],
396
defaultTimeZoneId: String): Seq[CatalogTablePartition]
397
```
398
399
### Function Operations
400
401
Manage Hive user-defined functions in the metastore.
402
403
```scala { .api }
404
/**
405
* Create a user-defined function
406
* @param db Database name
407
* @param funcDefinition Function definition
408
*/
409
def createFunction(db: String, funcDefinition: CatalogFunction): Unit
410
411
/**
412
* Drop a user-defined function
413
* @param db Database name
414
* @param funcName Function name
415
*/
416
def dropFunction(db: String, funcName: String): Unit
417
418
/**
419
* Modify function metadata
420
* @param db Database name
421
* @param funcDefinition Updated function definition
422
*/
423
def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
424
425
/**
426
* Rename a function
427
* @param db Database name
428
* @param oldName Current function name
429
* @param newName New function name
430
*/
431
def renameFunction(db: String, oldName: String, newName: String): Unit
432
433
/**
434
* Get function metadata
435
* @param db Database name
436
* @param funcName Function name
437
* @return Function definition
438
*/
439
def getFunction(db: String, funcName: String): CatalogFunction
440
441
/**
442
* Check if a function exists
443
* @param db Database name
444
* @param funcName Function name
445
* @return True if function exists
446
*/
447
def functionExists(db: String, funcName: String): Boolean
448
449
/**
450
* List functions matching a pattern
451
* @param db Database name
452
* @param pattern Pattern to match (SQL LIKE pattern)
453
* @return Sequence of matching function names
454
*/
455
def listFunctions(db: String, pattern: String): Seq[String]
456
```
457
458
### Companion Object Utilities
459
460
Utility methods and constants for working with Hive tables.
461
462
```scala { .api }
463
object HiveExternalCatalog {
464
// Metadata key constants
465
val SPARK_SQL_PREFIX: String
466
val DATASOURCE_PREFIX: String
467
val DATASOURCE_PROVIDER: String
468
val DATASOURCE_SCHEMA: String
469
val STATISTICS_PREFIX: String
470
val STATISTICS_TOTAL_SIZE: String
471
val STATISTICS_NUM_ROWS: String
472
val TABLE_PARTITION_PROVIDER: String
473
val CREATED_SPARK_VERSION: String
474
val EMPTY_DATA_SCHEMA: StructType
475
476
/**
477
* Check if a table is a data source table
478
* @param table Table definition
479
* @return True if table uses Spark data source format
480
*/
481
def isDatasourceTable(table: CatalogTable): Boolean
482
483
/**
484
* Check if a data type is compatible with Hive
485
* @param dt Data type to check
486
* @return True if type is Hive-compatible
487
*/
488
def isHiveCompatibleDataType(dt: DataType): Boolean
489
}
490
```
491
492
## Thread Safety
493
494
**Important**: `HiveExternalCatalog` is not thread-safe and requires external synchronization when accessed from multiple threads concurrently. Spark's catalog implementations typically handle this synchronization automatically.