0
# Catalog and Metadata Management
1
2
The Spark SQL Catalog provides a programmatic interface for managing databases, tables, functions, and cached data. It serves as the central metadata repository and enables runtime introspection of the Spark SQL environment.
3
4
## Catalog Interface
5
6
```scala { .api }
7
class Catalog {
8
// Database operations
9
def currentDatabase: String
10
def setCurrentDatabase(dbName: String): Unit
11
def listDatabases(): Dataset[Database]
12
def databaseExists(dbName: String): Boolean
13
def getDatabase(dbName: String): Database
14
15
// Table operations
16
def listTables(): Dataset[Table]
17
def listTables(dbName: String): Dataset[Table]
18
def tableExists(tableName: String): Boolean
19
def tableExists(dbName: String, tableName: String): Boolean
20
def getTable(tableName: String): Table
21
def getTable(dbName: String, tableName: String): Table
22
def listColumns(tableName: String): Dataset[Column]
23
def listColumns(dbName: String, tableName: String): Dataset[Column]
24
25
// Function operations
26
def listFunctions(): Dataset[Function]
27
def listFunctions(dbName: String): Dataset[Function]
28
def functionExists(functionName: String): Boolean
29
def functionExists(dbName: String, functionName: String): Boolean
30
def getFunction(functionName: String): Function
31
def getFunction(dbName: String, functionName: String): Function
32
33
// Temporary view operations
34
def dropTempView(viewName: String): Boolean
35
def dropGlobalTempView(viewName: String): Boolean
36
37
// Table management (Experimental)
38
def createTable(tableName: String): DataFrameWriter[Row]
39
def createTable(tableName: String, path: String): DataFrameWriter[Row]
40
def createTable(tableName: String, source: String): DataFrameWriter[Row]
41
def createTable(tableName: String, path: String, source: String): DataFrameWriter[Row]
42
43
// Maintenance operations
44
def recoverPartitions(tableName: String): Unit
45
46
// Caching operations
47
def isCached(tableName: String): Boolean
48
def cacheTable(tableName: String): Unit
49
def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
50
def uncacheTable(tableName: String): Unit
51
def clearCache(): Unit
52
def refreshTable(tableName: String): Unit
53
def refreshByPath(path: String): Unit
54
}
55
```
56
57
## Database Management
58
59
### Database Operations
60
61
**Usage Examples:**
62
63
```scala
64
val catalog = spark.catalog
65
66
// Get current database
67
val currentDb = catalog.currentDatabase
68
println(s"Current database: $currentDb")
69
70
// Change database
71
catalog.setCurrentDatabase("my_database")
72
73
// List all databases
74
val databases = catalog.listDatabases()
75
databases.show()
76
77
// Check if database exists
78
val exists = catalog.databaseExists("production")
79
if (!exists) {
80
spark.sql("CREATE DATABASE production")
81
}
82
83
// Get database information
84
val dbInfo = catalog.getDatabase("production")
85
println(s"Database: ${dbInfo.name}, Description: ${dbInfo.description}")
86
```
87
88
### Database Metadata
89
90
```scala { .api }
91
case class Database(
92
name: String,
93
description: String,
94
locationUri: String
95
)
96
```
97
98
**Usage Examples:**
99
100
```scala
101
// Inspect database details
102
catalog.listDatabases().collect().foreach { db =>
103
println(s"Database: ${db.name}")
104
println(s"Description: ${db.description}")
105
println(s"Location: ${db.locationUri}")
106
println("---")
107
}
108
109
// Filter databases
110
val prodDatabases = catalog.listDatabases()
111
.filter(col("name").like("%prod%"))
112
.select("name", "description")
113
.show()
114
```
115
116
## Table Management
117
118
### Table Operations
119
120
```scala { .api }
121
case class Table(
122
name: String,
123
database: String,
124
description: String,
125
tableType: String,
126
isTemporary: Boolean
127
)
128
```
129
130
**Usage Examples:**
131
132
```scala
133
// List all tables in current database
134
val tables = catalog.listTables()
135
tables.show()
136
137
// List tables in specific database
138
val prodTables = catalog.listTables("production")
139
prodTables.filter(col("tableType") === "MANAGED").show()
140
141
// Check if table exists
142
val tableExists = catalog.tableExists("users")
143
val specificExists = catalog.tableExists("production", "sales")
144
145
// Get table information
146
val tableInfo = catalog.getTable("users")
147
println(s"Table: ${tableInfo.name}")
148
println(s"Database: ${tableInfo.database}")
149
println(s"Type: ${tableInfo.tableType}")
150
println(s"Is Temporary: ${tableInfo.isTemporary}")
151
152
// List all temporary tables
153
catalog.listTables()
154
.filter(col("isTemporary") === true)
155
.select("name", "database")
156
.show()
157
```
158
159
### Column Metadata
160
161
```scala { .api }
162
case class Column(
163
name: String,
164
description: String,
165
dataType: String,
166
nullable: Boolean,
167
isPartition: Boolean,
168
isBucket: Boolean
169
)
170
```
171
172
**Usage Examples:**
173
174
```scala
175
// List columns for a table
176
val columns = catalog.listColumns("users")
177
columns.show()
178
179
// Get detailed column information
180
columns.collect().foreach { col =>
181
println(s"Column: ${col.name}")
182
println(s"Type: ${col.dataType}")
183
println(s"Nullable: ${col.nullable}")
184
println(s"Partition: ${col.isPartition}")
185
println(s"Bucket: ${col.isBucket}")
186
println("---")
187
}
188
189
// Find partition columns
190
val partitionCols = catalog.listColumns("partitioned_table")
191
.filter(col("isPartition") === true)
192
.select("name", "dataType")
193
.show()
194
195
// Analyze table schema
196
def analyzeTableSchema(tableName: String): Unit = {
197
val columns = catalog.listColumns(tableName)
198
199
println(s"Schema for table: $tableName")
200
println("=" * 50)
201
202
columns.collect().foreach { col =>
203
val nullable = if (col.nullable) "NULL" else "NOT NULL"
204
val partition = if (col.isPartition) " (PARTITION)" else ""
205
val bucket = if (col.isBucket) " (BUCKET)" else ""
206
207
println(f"${col.name}%-20s ${col.dataType}%-15s $nullable$partition$bucket")
208
}
209
}
210
211
analyzeTableSchema("my_table")
212
```
213
214
## Function Management
215
216
### Function Operations
217
218
```scala { .api }
219
case class Function(
220
name: String,
221
database: String,
222
description: String,
223
className: String,
224
isTemporary: Boolean
225
)
226
```
227
228
**Usage Examples:**
229
230
```scala
231
// List all functions
232
val functions = catalog.listFunctions()
233
functions.show()
234
235
// List functions in specific database
236
val dbFunctions = catalog.listFunctions("my_database")
237
dbFunctions.show()
238
239
// Check if function exists
240
val funcExists = catalog.functionExists("my_udf")
241
val specificFuncExists = catalog.functionExists("my_database", "custom_agg")
242
243
// Get function information
244
val funcInfo = catalog.getFunction("upper")
245
println(s"Function: ${funcInfo.name}")
246
println(s"Database: ${funcInfo.database}")
247
println(s"Class: ${funcInfo.className}")
248
println(s"Is Temporary: ${funcInfo.isTemporary}")
249
250
// Filter user-defined functions
251
val udfs = catalog.listFunctions()
252
.filter(col("isTemporary") === true)
253
.select("name", "description")
254
.show()
255
256
// Categorize functions
257
def categorizeFunction(func: Function): String = {
258
if (func.isTemporary) "UDF"
259
else if (func.className.startsWith("org.apache.spark.sql.catalyst.expressions")) "Built-in"
260
else "System"
261
}
262
263
val categorized = catalog.listFunctions().collect().groupBy(categorizeFunction)
264
categorized.foreach { case (category, funcs) =>
265
println(s"$category functions: ${funcs.length}")
266
}
267
```
268
269
## Temporary View Management
270
271
**Usage Examples:**
272
273
```scala
274
// Create temporary views
275
df.createOrReplaceTempView("temp_users")
276
df.createGlobalTempView("global_temp_data")
277
278
// List temporary tables
279
val tempTables = catalog.listTables()
280
.filter(col("isTemporary") === true)
281
.show()
282
283
// Drop temporary views
284
val dropped = catalog.dropTempView("temp_users")
285
println(s"View dropped: $dropped")
286
287
val globalDropped = catalog.dropGlobalTempView("global_temp_data")
288
println(s"Global view dropped: $globalDropped")
289
290
// Manage temporary views
291
def cleanupTempViews(): Unit = {
292
val tempViews = catalog.listTables()
293
.filter(col("isTemporary") === true)
294
.select("name")
295
.collect()
296
.map(_.getString(0))
297
298
tempViews.foreach { viewName =>
299
val success = catalog.dropTempView(viewName)
300
println(s"Dropped temp view $viewName: $success")
301
}
302
}
303
```
304
305
## Caching Operations
306
307
### Table Caching
308
309
**Usage Examples:**
310
311
```scala
312
import org.apache.spark.storage.StorageLevel
313
314
// Cache a table
315
catalog.cacheTable("frequently_used_table")
316
317
// Cache with specific storage level
318
catalog.cacheTable("large_table", StorageLevel.MEMORY_AND_DISK_SER)
319
320
// Check if table is cached
321
val isCached = catalog.isCached("my_table")
322
println(s"Table cached: $isCached")
323
324
// Uncache a table
325
catalog.uncacheTable("my_table")
326
327
// Clear all cached tables
328
catalog.clearCache()
329
330
// Cache management workflow
331
def manageCaching(tableName: String): Unit = {
332
if (!catalog.isCached(tableName)) {
333
println(s"Caching table: $tableName")
334
catalog.cacheTable(tableName)
335
} else {
336
println(s"Table $tableName is already cached")
337
}
338
}
339
340
// Cache frequently accessed tables
341
val frequentTables = Seq("dim_users", "fact_sales", "lookup_regions")
342
frequentTables.foreach(manageCaching)
343
```
344
345
### Cache Monitoring
346
347
```scala
348
// Monitor cached tables
349
def showCachedTables(): Unit = {
350
val allTables = catalog.listTables().collect()
351
352
println("Cached Tables:")
353
println("=" * 40)
354
355
allTables.foreach { table =>
356
val cached = catalog.isCached(table.name)
357
if (cached) {
358
println(s"${table.database}.${table.name}")
359
}
360
}
361
}
362
363
showCachedTables()
364
365
// Refresh cached table metadata
366
catalog.refreshTable("my_cached_table")
367
368
// Refresh by file path (for file-based tables)
369
catalog.refreshByPath("/path/to/parquet/files")
370
```
371
372
## Table Creation and Management
373
374
### Creating Tables
375
376
**Usage Examples:**
377
378
```scala
379
// Create table from DataFrame (Experimental API)
380
val writer = catalog.createTable("new_table")
381
df.write
382
.mode("overwrite")
383
.option("path", "/path/to/table")
384
.saveAsTable("new_table")
385
386
// Create external table
387
val externalWriter = catalog.createTable("external_table", "/external/path")
388
df.write
389
.mode("overwrite")
390
.format("parquet")
391
.save("/external/path")
392
393
// Create table with specific format
394
val formatWriter = catalog.createTable("json_table", "json")
395
df.write
396
.mode("overwrite")
397
.format("json")
398
.saveAsTable("json_table")
399
400
// Create partitioned table using SQL
401
spark.sql("""
402
CREATE TABLE partitioned_sales (
403
id BIGINT,
404
amount DOUBLE,
405
customer_id STRING,
406
sale_date DATE
407
)
408
USING PARQUET
409
PARTITIONED BY (year INT, month INT)
410
LOCATION '/data/sales'
411
""")
412
```
413
414
### Partition Recovery
415
416
```scala
417
// Recover partitions for external tables
418
catalog.recoverPartitions("external_partitioned_table")
419
420
// Workflow for partition management
421
def managePartitions(tableName: String): Unit = {
422
println(s"Recovering partitions for: $tableName")
423
424
try {
425
catalog.recoverPartitions(tableName)
426
println("Partition recovery completed successfully")
427
428
// Refresh the table to update metadata
429
catalog.refreshTable(tableName)
430
println("Table metadata refreshed")
431
432
} catch {
433
case e: Exception =>
434
println(s"Partition recovery failed: ${e.getMessage}")
435
}
436
}
437
438
managePartitions("my_partitioned_table")
439
```
440
441
## Metadata Introspection
442
443
### Schema Discovery
444
445
```scala
446
// Comprehensive metadata inspection
447
def inspectCatalog(): Unit = {
448
println("=== CATALOG INSPECTION ===")
449
450
// Current context
451
println(s"Current Database: ${catalog.currentDatabase}")
452
println()
453
454
// Database summary
455
val databases = catalog.listDatabases().collect()
456
println(s"Total Databases: ${databases.length}")
457
databases.foreach(db => println(s" - ${db.name}: ${db.description}"))
458
println()
459
460
// Table summary
461
val tables = catalog.listTables().collect()
462
val managedTables = tables.count(_.tableType == "MANAGED")
463
val externalTables = tables.count(_.tableType == "EXTERNAL")
464
val tempTables = tables.count(_.isTemporary)
465
466
println(s"Total Tables: ${tables.length}")
467
println(s" - Managed: $managedTables")
468
println(s" - External: $externalTables")
469
println(s" - Temporary: $tempTables")
470
println()
471
472
// Function summary
473
val functions = catalog.listFunctions().collect()
474
val builtinFuncs = functions.count(!_.isTemporary)
475
val udfs = functions.count(_.isTemporary)
476
477
println(s"Total Functions: ${functions.length}")
478
println(s" - Built-in: $builtinFuncs")
479
println(s" - UDFs: $udfs")
480
println()
481
482
// Cache summary
483
val cachedTables = tables.count(table => catalog.isCached(table.name))
484
println(s"Cached Tables: $cachedTables")
485
}
486
487
inspectCatalog()
488
```
489
490
### Table Lineage and Dependencies
491
492
```scala
493
// Find table dependencies
494
def findTableDependencies(tableName: String): Unit = {
495
try {
496
val table = catalog.getTable(tableName)
497
println(s"Table: ${table.name}")
498
println(s"Database: ${table.database}")
499
println(s"Type: ${table.tableType}")
500
501
// Get columns with their characteristics
502
val columns = catalog.listColumns(tableName).collect()
503
val partitionCols = columns.filter(_.isPartition).map(_.name)
504
val bucketCols = columns.filter(_.isBucket).map(_.name)
505
506
if (partitionCols.nonEmpty) {
507
println(s"Partitioned by: ${partitionCols.mkString(", ")}")
508
}
509
510
if (bucketCols.nonEmpty) {
511
println(s"Bucketed by: ${bucketCols.mkString(", ")}")
512
}
513
514
// Check if cached
515
if (catalog.isCached(tableName)) {
516
println("Status: CACHED")
517
}
518
519
} catch {
520
case e: Exception =>
521
println(s"Error inspecting table $tableName: ${e.getMessage}")
522
}
523
}
524
525
// Usage
526
findTableDependencies("my_important_table")
527
```
528
529
## Integration with SQL DDL
530
531
### DDL Operations through Catalog
532
533
```scala
534
// Create database
535
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
536
catalog.setCurrentDatabase("analytics")
537
538
// Verify creation
539
if (catalog.databaseExists("analytics")) {
540
println("Analytics database created successfully")
541
}
542
543
// Create managed table
544
spark.sql("""
545
CREATE TABLE user_analytics (
546
user_id STRING,
547
session_count BIGINT,
548
total_duration DOUBLE,
549
last_activity TIMESTAMP
550
)
551
USING DELTA
552
TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
553
""")
554
555
// Verify table creation
556
if (catalog.tableExists("user_analytics")) {
557
val table = catalog.getTable("user_analytics")
558
println(s"Created table: ${table.name} (${table.tableType})")
559
560
// Show schema
561
catalog.listColumns("user_analytics").show()
562
}
563
564
// Create view
565
spark.sql("""
566
CREATE VIEW active_users AS
567
SELECT user_id, session_count
568
FROM user_analytics
569
WHERE last_activity > current_date() - INTERVAL 30 DAYS
570
""")
571
572
// List all objects in the database
573
println("Tables and Views:")
574
catalog.listTables("analytics").show()
575
```
576
577
## Error Handling and Best Practices
578
579
### Robust Catalog Operations
580
581
```scala
582
// Safe catalog operations with error handling
583
def safeTableOperation(tableName: String)(operation: => Unit): Unit = {
584
try {
585
if (catalog.tableExists(tableName)) {
586
operation
587
} else {
588
println(s"Table $tableName does not exist")
589
}
590
} catch {
591
case e: Exception =>
592
println(s"Error operating on table $tableName: ${e.getMessage}")
593
}
594
}
595
596
// Safe caching
597
def safeCacheTable(tableName: String): Unit = {
598
safeTableOperation(tableName) {
599
if (!catalog.isCached(tableName)) {
600
catalog.cacheTable(tableName)
601
println(s"Cached table: $tableName")
602
} else {
603
println(s"Table $tableName is already cached")
604
}
605
}
606
}
607
608
// Safe cleanup
609
def safeCleanup(tempViewName: String): Unit = {
610
try {
611
val dropped = catalog.dropTempView(tempViewName)
612
if (dropped) {
613
println(s"Dropped temporary view: $tempViewName")
614
} else {
615
println(s"Temporary view $tempViewName was not found")
616
}
617
} catch {
618
case e: Exception =>
619
println(s"Error dropping view $tempViewName: ${e.getMessage}")
620
}
621
}
622
623
// Batch operations with error handling
624
def batchTableOperations(tableNames: Seq[String]): Unit = {
625
tableNames.foreach { tableName =>
626
safeTableOperation(tableName) {
627
// Refresh and cache frequently used tables
628
catalog.refreshTable(tableName)
629
catalog.cacheTable(tableName)
630
println(s"Processed: $tableName")
631
}
632
}
633
}
634
```