or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md

catalog.mddocs/

0

# Catalog and Metadata Management

1

2

The Spark SQL Catalog provides a programmatic interface for managing databases, tables, functions, and cached data. It serves as the central metadata repository and enables runtime introspection of the Spark SQL environment.

3

4

## Catalog Interface

5

6

```scala { .api }

7

class Catalog {

8

// Database operations

9

def currentDatabase: String

10

def setCurrentDatabase(dbName: String): Unit

11

def listDatabases(): Dataset[Database]

12

def databaseExists(dbName: String): Boolean

13

def getDatabase(dbName: String): Database

14

15

// Table operations

16

def listTables(): Dataset[Table]

17

def listTables(dbName: String): Dataset[Table]

18

def tableExists(tableName: String): Boolean

19

def tableExists(dbName: String, tableName: String): Boolean

20

def getTable(tableName: String): Table

21

def getTable(dbName: String, tableName: String): Table

22

def listColumns(tableName: String): Dataset[Column]

23

def listColumns(dbName: String, tableName: String): Dataset[Column]

24

25

// Function operations

26

def listFunctions(): Dataset[Function]

27

def listFunctions(dbName: String): Dataset[Function]

28

def functionExists(functionName: String): Boolean

29

def functionExists(dbName: String, functionName: String): Boolean

30

def getFunction(functionName: String): Function

31

def getFunction(dbName: String, functionName: String): Function

32

33

// Temporary view operations

34

def dropTempView(viewName: String): Boolean

35

def dropGlobalTempView(viewName: String): Boolean

36

37

// Table management (Experimental)

38

def createTable(tableName: String): DataFrameWriter[Row]

39

def createTable(tableName: String, path: String): DataFrameWriter[Row]

40

def createTable(tableName: String, source: String): DataFrameWriter[Row]

41

def createTable(tableName: String, path: String, source: String): DataFrameWriter[Row]

42

43

// Maintenance operations

44

def recoverPartitions(tableName: String): Unit

45

46

// Caching operations

47

def isCached(tableName: String): Boolean

48

def cacheTable(tableName: String): Unit

49

def cacheTable(tableName: String, storageLevel: StorageLevel): Unit

50

def uncacheTable(tableName: String): Unit

51

def clearCache(): Unit

52

def refreshTable(tableName: String): Unit

53

def refreshByPath(path: String): Unit

54

}

55

```

56

57

## Database Management

58

59

### Database Operations

60

61

**Usage Examples:**

62

63

```scala

64

val catalog = spark.catalog

65

66

// Get current database

67

val currentDb = catalog.currentDatabase

68

println(s"Current database: $currentDb")

69

70

// Change database

71

catalog.setCurrentDatabase("my_database")

72

73

// List all databases

74

val databases = catalog.listDatabases()

75

databases.show()

76

77

// Check if database exists

78

val exists = catalog.databaseExists("production")

79

if (!exists) {

80

spark.sql("CREATE DATABASE production")

81

}

82

83

// Get database information

84

val dbInfo = catalog.getDatabase("production")

85

println(s"Database: ${dbInfo.name}, Description: ${dbInfo.description}")

86

```

87

88

### Database Metadata

89

90

```scala { .api }

91

case class Database(

92

name: String,

93

description: String,

94

locationUri: String

95

)

96

```

97

98

**Usage Examples:**

99

100

```scala

101

// Inspect database details

102

catalog.listDatabases().collect().foreach { db =>

103

println(s"Database: ${db.name}")

104

println(s"Description: ${db.description}")

105

println(s"Location: ${db.locationUri}")

106

println("---")

107

}

108

109

// Filter databases

110

val prodDatabases = catalog.listDatabases()

111

.filter(col("name").like("%prod%"))

112

.select("name", "description")

113

.show()

114

```

115

116

## Table Management

117

118

### Table Operations

119

120

```scala { .api }

121

case class Table(

122

name: String,

123

database: String,

124

description: String,

125

tableType: String,

126

isTemporary: Boolean

127

)

128

```

129

130

**Usage Examples:**

131

132

```scala

133

// List all tables in current database

134

val tables = catalog.listTables()

135

tables.show()

136

137

// List tables in specific database

138

val prodTables = catalog.listTables("production")

139

prodTables.filter(col("tableType") === "MANAGED").show()

140

141

// Check if table exists

142

val tableExists = catalog.tableExists("users")

143

val specificExists = catalog.tableExists("production", "sales")

144

145

// Get table information

146

val tableInfo = catalog.getTable("users")

147

println(s"Table: ${tableInfo.name}")

148

println(s"Database: ${tableInfo.database}")

149

println(s"Type: ${tableInfo.tableType}")

150

println(s"Is Temporary: ${tableInfo.isTemporary}")

151

152

// List all temporary tables

153

catalog.listTables()

154

.filter(col("isTemporary") === true)

155

.select("name", "database")

156

.show()

157

```

158

159

### Column Metadata

160

161

```scala { .api }

162

case class Column(

163

name: String,

164

description: String,

165

dataType: String,

166

nullable: Boolean,

167

isPartition: Boolean,

168

isBucket: Boolean

169

)

170

```

171

172

**Usage Examples:**

173

174

```scala

175

// List columns for a table

176

val columns = catalog.listColumns("users")

177

columns.show()

178

179

// Get detailed column information

180

columns.collect().foreach { col =>

181

println(s"Column: ${col.name}")

182

println(s"Type: ${col.dataType}")

183

println(s"Nullable: ${col.nullable}")

184

println(s"Partition: ${col.isPartition}")

185

println(s"Bucket: ${col.isBucket}")

186

println("---")

187

}

188

189

// Find partition columns

190

val partitionCols = catalog.listColumns("partitioned_table")

191

.filter(col("isPartition") === true)

192

.select("name", "dataType")

193

.show()

194

195

// Analyze table schema

196

def analyzeTableSchema(tableName: String): Unit = {

197

val columns = catalog.listColumns(tableName)

198

199

println(s"Schema for table: $tableName")

200

println("=" * 50)

201

202

columns.collect().foreach { col =>

203

val nullable = if (col.nullable) "NULL" else "NOT NULL"

204

val partition = if (col.isPartition) " (PARTITION)" else ""

205

val bucket = if (col.isBucket) " (BUCKET)" else ""

206

207

println(f"${col.name}%-20s ${col.dataType}%-15s $nullable$partition$bucket")

208

}

209

}

210

211

analyzeTableSchema("my_table")

212

```

213

214

## Function Management

215

216

### Function Operations

217

218

```scala { .api }

219

case class Function(

220

name: String,

221

database: String,

222

description: String,

223

className: String,

224

isTemporary: Boolean

225

)

226

```

227

228

**Usage Examples:**

229

230

```scala

231

// List all functions

232

val functions = catalog.listFunctions()

233

functions.show()

234

235

// List functions in specific database

236

val dbFunctions = catalog.listFunctions("my_database")

237

dbFunctions.show()

238

239

// Check if function exists

240

val funcExists = catalog.functionExists("my_udf")

241

val specificFuncExists = catalog.functionExists("my_database", "custom_agg")

242

243

// Get function information

244

val funcInfo = catalog.getFunction("upper")

245

println(s"Function: ${funcInfo.name}")

246

println(s"Database: ${funcInfo.database}")

247

println(s"Class: ${funcInfo.className}")

248

println(s"Is Temporary: ${funcInfo.isTemporary}")

249

250

// Filter user-defined functions

251

val udfs = catalog.listFunctions()

252

.filter(col("isTemporary") === true)

253

.select("name", "description")

254

.show()

255

256

// Categorize functions

257

def categorizeFunction(func: Function): String = {

258

if (func.isTemporary) "UDF"

259

else if (func.className.startsWith("org.apache.spark.sql.catalyst.expressions")) "Built-in"

260

else "System"

261

}

262

263

val categorized = catalog.listFunctions().collect().groupBy(categorizeFunction)

264

categorized.foreach { case (category, funcs) =>

265

println(s"$category functions: ${funcs.length}")

266

}

267

```

268

269

## Temporary View Management

270

271

**Usage Examples:**

272

273

```scala

274

// Create temporary views

275

df.createOrReplaceTempView("temp_users")

276

df.createGlobalTempView("global_temp_data")

277

278

// List temporary tables

279

val tempTables = catalog.listTables()

280

.filter(col("isTemporary") === true)

281

.show()

282

283

// Drop temporary views

284

val dropped = catalog.dropTempView("temp_users")

285

println(s"View dropped: $dropped")

286

287

val globalDropped = catalog.dropGlobalTempView("global_temp_data")

288

println(s"Global view dropped: $globalDropped")

289

290

// Manage temporary views

291

def cleanupTempViews(): Unit = {

292

val tempViews = catalog.listTables()

293

.filter(col("isTemporary") === true)

294

.select("name")

295

.collect()

296

.map(_.getString(0))

297

298

tempViews.foreach { viewName =>

299

val success = catalog.dropTempView(viewName)

300

println(s"Dropped temp view $viewName: $success")

301

}

302

}

303

```

304

305

## Caching Operations

306

307

### Table Caching

308

309

**Usage Examples:**

310

311

```scala

312

import org.apache.spark.storage.StorageLevel

313

314

// Cache a table

315

catalog.cacheTable("frequently_used_table")

316

317

// Cache with specific storage level

318

catalog.cacheTable("large_table", StorageLevel.MEMORY_AND_DISK_SER)

319

320

// Check if table is cached

321

val isCached = catalog.isCached("my_table")

322

println(s"Table cached: $isCached")

323

324

// Uncache a table

325

catalog.uncacheTable("my_table")

326

327

// Clear all cached tables

328

catalog.clearCache()

329

330

// Cache management workflow

331

def manageCaching(tableName: String): Unit = {

332

if (!catalog.isCached(tableName)) {

333

println(s"Caching table: $tableName")

334

catalog.cacheTable(tableName)

335

} else {

336

println(s"Table $tableName is already cached")

337

}

338

}

339

340

// Cache frequently accessed tables

341

val frequentTables = Seq("dim_users", "fact_sales", "lookup_regions")

342

frequentTables.foreach(manageCaching)

343

```

344

345

### Cache Monitoring

346

347

```scala

348

// Monitor cached tables

349

def showCachedTables(): Unit = {

350

val allTables = catalog.listTables().collect()

351

352

println("Cached Tables:")

353

println("=" * 40)

354

355

allTables.foreach { table =>

356

val cached = catalog.isCached(table.name)

357

if (cached) {

358

println(s"${table.database}.${table.name}")

359

}

360

}

361

}

362

363

showCachedTables()

364

365

// Refresh cached table metadata

366

catalog.refreshTable("my_cached_table")

367

368

// Refresh by file path (for file-based tables)

369

catalog.refreshByPath("/path/to/parquet/files")

370

```

371

372

## Table Creation and Management

373

374

### Creating Tables

375

376

**Usage Examples:**

377

378

```scala

379

// Create table from DataFrame (Experimental API)

380

val writer = catalog.createTable("new_table")

381

df.write

382

.mode("overwrite")

383

.option("path", "/path/to/table")

384

.saveAsTable("new_table")

385

386

// Create external table

387

val externalWriter = catalog.createTable("external_table", "/external/path")

388

df.write

389

.mode("overwrite")

390

.format("parquet")

391

.save("/external/path")

392

393

// Create table with specific format

394

val formatWriter = catalog.createTable("json_table", "json")

395

df.write

396

.mode("overwrite")

397

.format("json")

398

.saveAsTable("json_table")

399

400

// Create partitioned table using SQL

401

spark.sql("""

402

CREATE TABLE partitioned_sales (

403

id BIGINT,

404

amount DOUBLE,

405

customer_id STRING,

406

sale_date DATE

407

)

408

USING PARQUET

409

PARTITIONED BY (year INT, month INT)

410

LOCATION '/data/sales'

411

""")

412

```

413

414

### Partition Recovery

415

416

```scala

417

// Recover partitions for external tables

418

catalog.recoverPartitions("external_partitioned_table")

419

420

// Workflow for partition management

421

def managePartitions(tableName: String): Unit = {

422

println(s"Recovering partitions for: $tableName")

423

424

try {

425

catalog.recoverPartitions(tableName)

426

println("Partition recovery completed successfully")

427

428

// Refresh the table to update metadata

429

catalog.refreshTable(tableName)

430

println("Table metadata refreshed")

431

432

} catch {

433

case e: Exception =>

434

println(s"Partition recovery failed: ${e.getMessage}")

435

}

436

}

437

438

managePartitions("my_partitioned_table")

439

```

440

441

## Metadata Introspection

442

443

### Schema Discovery

444

445

```scala

446

// Comprehensive metadata inspection

447

def inspectCatalog(): Unit = {

448

println("=== CATALOG INSPECTION ===")

449

450

// Current context

451

println(s"Current Database: ${catalog.currentDatabase}")

452

println()

453

454

// Database summary

455

val databases = catalog.listDatabases().collect()

456

println(s"Total Databases: ${databases.length}")

457

databases.foreach(db => println(s" - ${db.name}: ${db.description}"))

458

println()

459

460

// Table summary

461

val tables = catalog.listTables().collect()

462

val managedTables = tables.count(_.tableType == "MANAGED")

463

val externalTables = tables.count(_.tableType == "EXTERNAL")

464

val tempTables = tables.count(_.isTemporary)

465

466

println(s"Total Tables: ${tables.length}")

467

println(s" - Managed: $managedTables")

468

println(s" - External: $externalTables")

469

println(s" - Temporary: $tempTables")

470

println()

471

472

// Function summary

473

val functions = catalog.listFunctions().collect()

474

val builtinFuncs = functions.count(!_.isTemporary)

475

val udfs = functions.count(_.isTemporary)

476

477

println(s"Total Functions: ${functions.length}")

478

println(s" - Built-in: $builtinFuncs")

479

println(s" - UDFs: $udfs")

480

println()

481

482

// Cache summary

483

val cachedTables = tables.count(table => catalog.isCached(table.name))

484

println(s"Cached Tables: $cachedTables")

485

}

486

487

inspectCatalog()

488

```

489

490

### Table Lineage and Dependencies

491

492

```scala

493

// Find table dependencies

494

def findTableDependencies(tableName: String): Unit = {

495

try {

496

val table = catalog.getTable(tableName)

497

println(s"Table: ${table.name}")

498

println(s"Database: ${table.database}")

499

println(s"Type: ${table.tableType}")

500

501

// Get columns with their characteristics

502

val columns = catalog.listColumns(tableName).collect()

503

val partitionCols = columns.filter(_.isPartition).map(_.name)

504

val bucketCols = columns.filter(_.isBucket).map(_.name)

505

506

if (partitionCols.nonEmpty) {

507

println(s"Partitioned by: ${partitionCols.mkString(", ")}")

508

}

509

510

if (bucketCols.nonEmpty) {

511

println(s"Bucketed by: ${bucketCols.mkString(", ")}")

512

}

513

514

// Check if cached

515

if (catalog.isCached(tableName)) {

516

println("Status: CACHED")

517

}

518

519

} catch {

520

case e: Exception =>

521

println(s"Error inspecting table $tableName: ${e.getMessage}")

522

}

523

}

524

525

// Usage

526

findTableDependencies("my_important_table")

527

```

528

529

## Integration with SQL DDL

530

531

### DDL Operations through Catalog

532

533

```scala

534

// Create database

535

spark.sql("CREATE DATABASE IF NOT EXISTS analytics")

536

catalog.setCurrentDatabase("analytics")

537

538

// Verify creation

539

if (catalog.databaseExists("analytics")) {

540

println("Analytics database created successfully")

541

}

542

543

// Create managed table

544

spark.sql("""

545

CREATE TABLE user_analytics (

546

user_id STRING,

547

session_count BIGINT,

548

total_duration DOUBLE,

549

last_activity TIMESTAMP

550

)

551

USING DELTA

552

TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')

553

""")

554

555

// Verify table creation

556

if (catalog.tableExists("user_analytics")) {

557

val table = catalog.getTable("user_analytics")

558

println(s"Created table: ${table.name} (${table.tableType})")

559

560

// Show schema

561

catalog.listColumns("user_analytics").show()

562

}

563

564

// Create view

565

spark.sql("""

566

CREATE VIEW active_users AS

567

SELECT user_id, session_count

568

FROM user_analytics

569

WHERE last_activity > current_date() - INTERVAL 30 DAYS

570

""")

571

572

// List all objects in the database

573

println("Tables and Views:")

574

catalog.listTables("analytics").show()

575

```

576

577

## Error Handling and Best Practices

578

579

### Robust Catalog Operations

580

581

```scala

582

// Safe catalog operations with error handling

583

def safeTableOperation(tableName: String)(operation: => Unit): Unit = {

584

try {

585

if (catalog.tableExists(tableName)) {

586

operation

587

} else {

588

println(s"Table $tableName does not exist")

589

}

590

} catch {

591

case e: Exception =>

592

println(s"Error operating on table $tableName: ${e.getMessage}")

593

}

594

}

595

596

// Safe caching

597

def safeCacheTable(tableName: String): Unit = {

598

safeTableOperation(tableName) {

599

if (!catalog.isCached(tableName)) {

600

catalog.cacheTable(tableName)

601

println(s"Cached table: $tableName")

602

} else {

603

println(s"Table $tableName is already cached")

604

}

605

}

606

}

607

608

// Safe cleanup

609

def safeCleanup(tempViewName: String): Unit = {

610

try {

611

val dropped = catalog.dropTempView(tempViewName)

612

if (dropped) {

613

println(s"Dropped temporary view: $tempViewName")

614

} else {

615

println(s"Temporary view $tempViewName was not found")

616

}

617

} catch {

618

case e: Exception =>

619

println(s"Error dropping view $tempViewName: ${e.getMessage}")

620

}

621

}

622

623

// Batch operations with error handling

624

def batchTableOperations(tableNames: Seq[String]): Unit = {

625

tableNames.foreach { tableName =>

626

safeTableOperation(tableName) {

627

// Refresh and cache frequently used tables

628

catalog.refreshTable(tableName)

629

catalog.cacheTable(tableName)

630

println(s"Processed: $tableName")

631

}

632

}

633

}

634

```