0
# Metadata and Catalog Operations
1
2
Database, table, and metadata management through the catalog interface. Provides programmatic access to metastore operations, table management, and schema inspection capabilities.
3
4
## Capabilities
5
6
### Catalog Interface
7
8
Main interface for catalog operations and metadata management.
9
10
```scala { .api }
11
/**
12
* Interface for catalog operations (databases, tables, functions, columns)
13
*/
14
trait Catalog {
15
/** Get current database name */
16
def currentDatabase(): String
17
18
/** Set current database */
19
def setCurrentDatabase(dbName: String): Unit
20
21
/** List all available databases */
22
def listDatabases(): Dataset[Database]
23
24
/** List tables in current database */
25
def listTables(): Dataset[Table]
26
27
/** List tables in specified database */
28
def listTables(dbName: String): Dataset[Table]
29
30
/** List columns for specified table */
31
def listColumns(tableName: String): Dataset[Column]
32
def listColumns(dbName: String, tableName: String): Dataset[Column]
33
34
/** List all available functions */
35
def listFunctions(): Dataset[Function]
36
37
/** List functions in specified database */
38
def listFunctions(dbName: String): Dataset[Function]
39
40
/** Check if database exists */
41
def databaseExists(dbName: String): Boolean
42
43
/** Check if table exists */
44
def tableExists(tableName: String): Boolean
45
def tableExists(dbName: String, tableName: String): Boolean
46
47
/** Check if function exists */
48
def functionExists(functionName: String): Boolean
49
def functionExists(dbName: String, functionName: String): Boolean
50
51
/** Get table metadata */
52
def getTable(tableName: String): Table
53
def getTable(dbName: String, tableName: String): Table
54
55
/** Get function metadata */
56
def getFunction(functionName: String): Function
57
def getFunction(dbName: String, functionName: String): Function
58
59
/** Cache table in memory */
60
def cacheTable(tableName: String): Unit
61
def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
62
63
/** Uncache table from memory */
64
def uncacheTable(tableName: String): Unit
65
66
/** Check if table is cached */
67
def isCached(tableName: String): Boolean
68
69
/** Clear all cached tables */
70
def clearCache(): Unit
71
72
/** Refresh table metadata */
73
def refreshTable(tableName: String): Unit
74
75
/** Refresh function metadata */
76
def refreshFunction(functionName: String): Unit
77
78
/** Create database */
79
def createDatabase(dbName: String, ignoreIfExists: Boolean): Unit
80
def createDatabase(dbName: String, ignoreIfExists: Boolean, path: String): Unit
81
82
/** Drop database */
83
def dropDatabase(dbName: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
84
85
/** Create table */
86
def createTable(tableName: String, path: String): DataFrame
87
def createTable(tableName: String, path: String, source: String): DataFrame
88
def createTable(tableName: String, source: String, options: Map[String, String]): DataFrame
89
def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
90
91
/** Drop table */
92
def dropTempView(viewName: String): Boolean
93
def dropGlobalTempView(viewName: String): Boolean
94
95
/** Recover partitions for table */
96
def recoverPartitions(tableName: String): Unit
97
}
98
```
99
100
### Database Metadata
101
102
Represents database information in the catalog.
103
104
```scala { .api }
105
/**
106
* Database metadata from the catalog
107
*/
108
case class Database(
109
name: String, // Database name
110
description: String, // Database description
111
locationUri: String // Database location URI
112
)
113
```
114
115
### Table Metadata
116
117
Represents table information in the catalog.
118
119
```scala { .api }
120
/**
121
* Table metadata from the catalog
122
*/
123
case class Table(
124
name: String, // Table name
125
database: String, // Database name
126
description: String, // Table description
127
tableType: String, // Table type (MANAGED, EXTERNAL, VIEW, etc.)
128
isTemporary: Boolean // Whether table is temporary
129
)
130
```
131
132
### Column Metadata
133
134
Represents column information in the catalog.
135
136
```scala { .api }
137
/**
138
* Column metadata from the catalog
139
*/
140
case class Column(
141
name: String, // Column name
142
description: String, // Column description
143
dataType: String, // Column data type
144
nullable: Boolean, // Whether column can be null
145
isPartition: Boolean, // Whether column is partition key
146
isBucket: Boolean // Whether column is bucket key
147
)
148
```
149
150
### Function Metadata
151
152
Represents function information in the catalog.
153
154
```scala { .api }
155
/**
156
* Function metadata from the catalog
157
*/
158
case class Function(
159
name: String, // Function name
160
database: String, // Database name
161
description: String, // Function description
162
className: String, // Implementation class name
163
isTemporary: Boolean // Whether function is temporary
164
)
165
```
166
167
**Usage Examples:**
168
169
```scala
170
val catalog = spark.catalog
171
172
// Database operations
173
println(s"Current database: ${catalog.currentDatabase()}")
174
175
val databases = catalog.listDatabases().collect()
176
databases.foreach(db => println(s"Database: ${db.name} at ${db.locationUri}"))
177
178
catalog.setCurrentDatabase("my_database")
179
180
// Table operations
181
val tables = catalog.listTables().collect()
182
tables.foreach(table =>
183
println(s"Table: ${table.name} (${table.tableType}) in ${table.database}")
184
)
185
186
// Check existence
187
if (catalog.tableExists("employees")) {
188
val tableInfo = catalog.getTable("employees")
189
println(s"Table type: ${tableInfo.tableType}")
190
191
// Get column information
192
val columns = catalog.listColumns("employees").collect()
193
columns.foreach(col =>
194
println(s"Column: ${col.name} (${col.dataType}, nullable: ${col.nullable})")
195
)
196
}
197
198
// Function operations
199
val functions = catalog.listFunctions().collect()
200
functions.foreach(func =>
201
println(s"Function: ${func.name} (${func.className})")
202
)
203
204
// Caching operations
205
catalog.cacheTable("frequently_used_table")
206
println(s"Table cached: ${catalog.isCached("frequently_used_table")}")
207
208
catalog.uncacheTable("frequently_used_table")
209
```
210
211
### Table Management Operations
212
213
Advanced table management and DDL operations.
214
215
**Creating and dropping tables:**
216
217
```scala
218
// Create external table
219
catalog.createTable(
220
tableName = "external_data",
221
path = "s3://bucket/path/to/data",
222
source = "parquet"
223
)
224
225
// Create table with schema and options
226
val schema = StructType(Seq(
227
StructField("id", LongType, nullable = false),
228
StructField("name", StringType, nullable = true),
229
StructField("created_date", DateType, nullable = true)
230
))
231
232
catalog.createTable(
233
tableName = "structured_data",
234
source = "delta",
235
schema = schema,
236
options = Map(
237
"path" -> "s3://bucket/delta/table",
238
"overwriteSchema" -> "true"
239
)
240
)
241
242
// Create database
243
catalog.createDatabase("analytics", ignoreIfExists = true)
244
catalog.createDatabase("data_lake", ignoreIfExists = true, path = "s3://bucket/databases/data_lake")
245
246
// Drop database (cascade removes all tables)
247
catalog.dropDatabase("old_database", ignoreIfNotExists = true, cascade = true)
248
249
// Create temporary views
250
spark.sql("""
251
CREATE OR REPLACE TEMPORARY VIEW active_users AS
252
SELECT * FROM users WHERE status = 'active'
253
""")
254
255
// Drop temporary views
256
catalog.dropTempView("active_users")
257
```
258
259
### Caching and Performance
260
261
Table caching for improved query performance.
262
263
```scala
264
import org.apache.spark.storage.StorageLevel
265
266
// Cache with default storage level (MEMORY_AND_DISK)
267
catalog.cacheTable("hot_data")
268
269
// Cache with specific storage level
270
catalog.cacheTable("lookup_table", StorageLevel.MEMORY_ONLY)
271
272
// Check cache status
273
val cachedTables = catalog.listTables()
274
.filter(_.isTemporary == false)
275
.collect()
276
.filter(table => catalog.isCached(table.name))
277
278
cachedTables.foreach(table =>
279
println(s"Cached table: ${table.name}")
280
)
281
282
// Clear specific table from cache
283
catalog.uncacheTable("hot_data")
284
285
// Clear all cached tables
286
catalog.clearCache()
287
288
// Refresh table metadata after external changes
289
catalog.refreshTable("external_table")
290
```
291
292
### Partition Management
293
294
Working with partitioned tables.
295
296
```scala
297
// Recover partitions for external tables
298
catalog.recoverPartitions("partitioned_external_table")
299
300
// List partitions (using SQL for detailed partition info)
301
val partitions = spark.sql("SHOW PARTITIONS partitioned_table").collect()
302
partitions.foreach(row => println(s"Partition: ${row.getString(0)}"))
303
304
// Add specific partitions
305
spark.sql("""
306
ALTER TABLE partitioned_table
307
ADD PARTITION (year=2023, month=12)
308
LOCATION 's3://bucket/data/year=2023/month=12'
309
""")
310
311
// Drop partition
312
spark.sql("""
313
ALTER TABLE partitioned_table
314
DROP PARTITION (year=2022, month=01)
315
""")
316
```
317
318
### Advanced Catalog Queries
319
320
Complex metadata queries and analysis.
321
322
```scala
323
// Find large tables
324
val largeTables = catalog.listTables()
325
.select("name", "database", "tableType")
326
.collect()
327
.filter(_.getString(2) != "VIEW") // Exclude views
328
329
// Analyze table schemas
330
def analyzeTableSchema(tableName: String): Unit = {
331
val columns = catalog.listColumns(tableName).collect()
332
333
println(s"Schema analysis for $tableName:")
334
println(s"Total columns: ${columns.length}")
335
336
val partitionCols = columns.filter(_.isPartition)
337
if (partitionCols.nonEmpty) {
338
println(s"Partition columns: ${partitionCols.map(_.name).mkString(", ")}")
339
}
340
341
val bucketCols = columns.filter(_.isBucket)
342
if (bucketCols.nonEmpty) {
343
println(s"Bucket columns: ${bucketCols.map(_.name).mkString(", ")}")
344
}
345
346
val dataTypes = columns.groupBy(_.dataType).mapValues(_.length)
347
println("Data type distribution:")
348
dataTypes.foreach { case (dataType, count) =>
349
println(s" $dataType: $count columns")
350
}
351
}
352
353
// Find tables with specific patterns
354
val userTables = catalog.listTables()
355
.filter(col("name").like("%user%"))
356
.collect()
357
358
// Cross-database analysis
359
val allDatabases = catalog.listDatabases().collect()
360
allDatabases.foreach { db =>
361
val tableCount = catalog.listTables(db.name).count()
362
println(s"Database ${db.name}: $tableCount tables")
363
}
364
```
365
366
### Integration with External Metastores
367
368
Working with Hive metastore and external catalog systems.
369
370
```scala
371
// Enable Hive support for metastore integration
372
val spark = SparkSession.builder()
373
.appName("Catalog Operations")
374
.enableHiveSupport()
375
.getOrCreate()
376
377
// Set Hive metastore location
378
spark.conf.set("spark.sql.warehouse.dir", "/path/to/warehouse")
379
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")
380
381
// Create Hive-managed table
382
spark.sql("""
383
CREATE TABLE IF NOT EXISTS hive_table (
384
id BIGINT,
385
name STRING,
386
created_date DATE
387
)
388
USING HIVE
389
PARTITIONED BY (year INT, month INT)
390
STORED AS PARQUET
391
""")
392
393
// Register external table in Hive metastore
394
spark.sql("""
395
CREATE TABLE external_parquet_table (
396
id BIGINT,
397
name STRING,
398
value DOUBLE
399
)
400
USING PARQUET
401
LOCATION 's3://bucket/path/to/parquet'
402
""")
403
404
// Work with Hive databases
405
spark.sql("CREATE DATABASE IF NOT EXISTS hive_db")
406
catalog.setCurrentDatabase("hive_db")
407
408
// MSCK repair for Hive tables
409
spark.sql("MSCK REPAIR TABLE partitioned_hive_table")
410
```
411
412
### Catalog Utility Functions
413
414
Helper functions for common catalog operations.
415
416
```scala
417
object CatalogUtils {
418
def tableExists(spark: SparkSession, dbName: String, tableName: String): Boolean = {
419
try {
420
spark.catalog.getTable(dbName, tableName)
421
true
422
} catch {
423
case _: AnalysisException => false
424
}
425
}
426
427
def createDatabaseIfNotExists(spark: SparkSession, dbName: String, location: Option[String] = None): Unit = {
428
if (!spark.catalog.databaseExists(dbName)) {
429
location match {
430
case Some(path) => spark.catalog.createDatabase(dbName, ignoreIfExists = true, path)
431
case None => spark.catalog.createDatabase(dbName, ignoreIfExists = true)
432
}
433
println(s"Created database: $dbName")
434
} else {
435
println(s"Database already exists: $dbName")
436
}
437
}
438
439
def getTableStatistics(spark: SparkSession, tableName: String): Map[String, Any] = {
440
val table = spark.catalog.getTable(tableName)
441
val columns = spark.catalog.listColumns(tableName).collect()
442
443
Map(
444
"tableName" -> table.name,
445
"database" -> table.database,
446
"tableType" -> table.tableType,
447
"isTemporary" -> table.isTemporary,
448
"columnCount" -> columns.length,
449
"partitionColumns" -> columns.filter(_.isPartition).map(_.name),
450
"bucketColumns" -> columns.filter(_.isBucket).map(_.name),
451
"dataTypes" -> columns.map(_.dataType).distinct
452
)
453
}
454
455
def copyTableSchema(spark: SparkSession, sourceTable: String, targetTable: String): Unit = {
456
val sourceColumns = spark.catalog.listColumns(sourceTable).collect()
457
val schema = StructType(sourceColumns.map { col =>
458
val dataType = DataType.fromDDL(col.dataType)
459
StructField(col.name, dataType, col.nullable)
460
})
461
462
// Create empty table with same schema
463
val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
464
emptyDf.write.saveAsTable(targetTable)
465
}
466
}
467
468
// Usage examples
469
CatalogUtils.createDatabaseIfNotExists(spark, "analytics", Some("s3://bucket/analytics"))
470
471
val stats = CatalogUtils.getTableStatistics(spark, "employees")
472
println(s"Table statistics: $stats")
473
474
if (CatalogUtils.tableExists(spark, "default", "source_table")) {
475
CatalogUtils.copyTableSchema(spark, "source_table", "target_table")
476
}
477
```