or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md

catalog.mddocs/

0

# Metadata and Catalog Operations

1

2

Database, table, and metadata management through the catalog interface. Provides programmatic access to metastore operations, table management, and schema inspection capabilities.

3

4

## Capabilities

5

6

### Catalog Interface

7

8

Main interface for catalog operations and metadata management.

9

10

```scala { .api }

11

/**

12

* Interface for catalog operations (databases, tables, functions, columns)

13

*/

14

trait Catalog {

15

/** Get current database name */

16

def currentDatabase(): String

17

18

/** Set current database */

19

def setCurrentDatabase(dbName: String): Unit

20

21

/** List all available databases */

22

def listDatabases(): Dataset[Database]

23

24

/** List tables in current database */

25

def listTables(): Dataset[Table]

26

27

/** List tables in specified database */

28

def listTables(dbName: String): Dataset[Table]

29

30

/** List columns for specified table */

31

def listColumns(tableName: String): Dataset[Column]

32

def listColumns(dbName: String, tableName: String): Dataset[Column]

33

34

/** List all available functions */

35

def listFunctions(): Dataset[Function]

36

37

/** List functions in specified database */

38

def listFunctions(dbName: String): Dataset[Function]

39

40

/** Check if database exists */

41

def databaseExists(dbName: String): Boolean

42

43

/** Check if table exists */

44

def tableExists(tableName: String): Boolean

45

def tableExists(dbName: String, tableName: String): Boolean

46

47

/** Check if function exists */

48

def functionExists(functionName: String): Boolean

49

def functionExists(dbName: String, functionName: String): Boolean

50

51

/** Get table metadata */

52

def getTable(tableName: String): Table

53

def getTable(dbName: String, tableName: String): Table

54

55

/** Get function metadata */

56

def getFunction(functionName: String): Function

57

def getFunction(dbName: String, functionName: String): Function

58

59

/** Cache table in memory */

60

def cacheTable(tableName: String): Unit

61

def cacheTable(tableName: String, storageLevel: StorageLevel): Unit

62

63

/** Uncache table from memory */

64

def uncacheTable(tableName: String): Unit

65

66

/** Check if table is cached */

67

def isCached(tableName: String): Boolean

68

69

/** Clear all cached tables */

70

def clearCache(): Unit

71

72

/** Refresh table metadata */

73

def refreshTable(tableName: String): Unit

74

75

/** Refresh function metadata */

76

def refreshFunction(functionName: String): Unit

77

78

/** Create database */

79

def createDatabase(dbName: String, ignoreIfExists: Boolean): Unit

80

def createDatabase(dbName: String, ignoreIfExists: Boolean, path: String): Unit

81

82

/** Drop database */

83

def dropDatabase(dbName: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

84

85

/** Create table */

86

def createTable(tableName: String, path: String): DataFrame

87

def createTable(tableName: String, path: String, source: String): DataFrame

88

def createTable(tableName: String, source: String, options: Map[String, String]): DataFrame

89

def createTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame

90

91

/** Drop table */

92

def dropTempView(viewName: String): Boolean

93

def dropGlobalTempView(viewName: String): Boolean

94

95

/** Recover partitions for table */

96

def recoverPartitions(tableName: String): Unit

97

}

98

```

99

100

### Database Metadata

101

102

Represents database information in the catalog.

103

104

```scala { .api }

105

/**

106

* Database metadata from the catalog

107

*/

108

case class Database(

109

name: String, // Database name

110

description: String, // Database description

111

locationUri: String // Database location URI

112

)

113

```

114

115

### Table Metadata

116

117

Represents table information in the catalog.

118

119

```scala { .api }

120

/**

121

* Table metadata from the catalog

122

*/

123

case class Table(

124

name: String, // Table name

125

database: String, // Database name

126

description: String, // Table description

127

tableType: String, // Table type (MANAGED, EXTERNAL, VIEW, etc.)

128

isTemporary: Boolean // Whether table is temporary

129

)

130

```

131

132

### Column Metadata

133

134

Represents column information in the catalog.

135

136

```scala { .api }

137

/**

138

* Column metadata from the catalog

139

*/

140

case class Column(

141

name: String, // Column name

142

description: String, // Column description

143

dataType: String, // Column data type

144

nullable: Boolean, // Whether column can be null

145

isPartition: Boolean, // Whether column is partition key

146

isBucket: Boolean // Whether column is bucket key

147

)

148

```

149

150

### Function Metadata

151

152

Represents function information in the catalog.

153

154

```scala { .api }

155

/**

156

* Function metadata from the catalog

157

*/

158

case class Function(

159

name: String, // Function name

160

database: String, // Database name

161

description: String, // Function description

162

className: String, // Implementation class name

163

isTemporary: Boolean // Whether function is temporary

164

)

165

```

166

167

**Usage Examples:**

168

169

```scala

170

val catalog = spark.catalog

171

172

// Database operations

173

println(s"Current database: ${catalog.currentDatabase()}")

174

175

val databases = catalog.listDatabases().collect()

176

databases.foreach(db => println(s"Database: ${db.name} at ${db.locationUri}"))

177

178

catalog.setCurrentDatabase("my_database")

179

180

// Table operations

181

val tables = catalog.listTables().collect()

182

tables.foreach(table =>

183

println(s"Table: ${table.name} (${table.tableType}) in ${table.database}")

184

)

185

186

// Check existence

187

if (catalog.tableExists("employees")) {

188

val tableInfo = catalog.getTable("employees")

189

println(s"Table type: ${tableInfo.tableType}")

190

191

// Get column information

192

val columns = catalog.listColumns("employees").collect()

193

columns.foreach(col =>

194

println(s"Column: ${col.name} (${col.dataType}, nullable: ${col.nullable})")

195

)

196

}

197

198

// Function operations

199

val functions = catalog.listFunctions().collect()

200

functions.foreach(func =>

201

println(s"Function: ${func.name} (${func.className})")

202

)

203

204

// Caching operations

205

catalog.cacheTable("frequently_used_table")

206

println(s"Table cached: ${catalog.isCached("frequently_used_table")}")

207

208

catalog.uncacheTable("frequently_used_table")

209

```

210

211

### Table Management Operations

212

213

Advanced table management and DDL operations.

214

215

**Creating and dropping tables:**

216

217

```scala

218

// Create external table

219

catalog.createTable(

220

tableName = "external_data",

221

path = "s3://bucket/path/to/data",

222

source = "parquet"

223

)

224

225

// Create table with schema and options

226

val schema = StructType(Seq(

227

StructField("id", LongType, nullable = false),

228

StructField("name", StringType, nullable = true),

229

StructField("created_date", DateType, nullable = true)

230

))

231

232

catalog.createTable(

233

tableName = "structured_data",

234

source = "delta",

235

schema = schema,

236

options = Map(

237

"path" -> "s3://bucket/delta/table",

238

"overwriteSchema" -> "true"

239

)

240

)

241

242

// Create database

243

catalog.createDatabase("analytics", ignoreIfExists = true)

244

catalog.createDatabase("data_lake", ignoreIfExists = true, path = "s3://bucket/databases/data_lake")

245

246

// Drop database (cascade removes all tables)

247

catalog.dropDatabase("old_database", ignoreIfNotExists = true, cascade = true)

248

249

// Create temporary views

250

spark.sql("""

251

CREATE OR REPLACE TEMPORARY VIEW active_users AS

252

SELECT * FROM users WHERE status = 'active'

253

""")

254

255

// Drop temporary views

256

catalog.dropTempView("active_users")

257

```

258

259

### Caching and Performance

260

261

Table caching for improved query performance.

262

263

```scala

264

import org.apache.spark.storage.StorageLevel

265

266

// Cache with default storage level (MEMORY_AND_DISK)

267

catalog.cacheTable("hot_data")

268

269

// Cache with specific storage level

270

catalog.cacheTable("lookup_table", StorageLevel.MEMORY_ONLY)

271

272

// Check cache status

273

val cachedTables = catalog.listTables()

274

.filter(_.isTemporary == false)

275

.collect()

276

.filter(table => catalog.isCached(table.name))

277

278

cachedTables.foreach(table =>

279

println(s"Cached table: ${table.name}")

280

)

281

282

// Clear specific table from cache

283

catalog.uncacheTable("hot_data")

284

285

// Clear all cached tables

286

catalog.clearCache()

287

288

// Refresh table metadata after external changes

289

catalog.refreshTable("external_table")

290

```

291

292

### Partition Management

293

294

Working with partitioned tables.

295

296

```scala

297

// Recover partitions for external tables

298

catalog.recoverPartitions("partitioned_external_table")

299

300

// List partitions (using SQL for detailed partition info)

301

val partitions = spark.sql("SHOW PARTITIONS partitioned_table").collect()

302

partitions.foreach(row => println(s"Partition: ${row.getString(0)}"))

303

304

// Add specific partitions

305

spark.sql("""

306

ALTER TABLE partitioned_table

307

ADD PARTITION (year=2023, month=12)

308

LOCATION 's3://bucket/data/year=2023/month=12'

309

""")

310

311

// Drop partition

312

spark.sql("""

313

ALTER TABLE partitioned_table

314

DROP PARTITION (year=2022, month=01)

315

""")

316

```

317

318

### Advanced Catalog Queries

319

320

Complex metadata queries and analysis.

321

322

```scala

323

// Find large tables

324

val largeTables = catalog.listTables()

325

.select("name", "database", "tableType")

326

.collect()

327

.filter(_.getString(2) != "VIEW") // Exclude views

328

329

// Analyze table schemas

330

def analyzeTableSchema(tableName: String): Unit = {

331

val columns = catalog.listColumns(tableName).collect()

332

333

println(s"Schema analysis for $tableName:")

334

println(s"Total columns: ${columns.length}")

335

336

val partitionCols = columns.filter(_.isPartition)

337

if (partitionCols.nonEmpty) {

338

println(s"Partition columns: ${partitionCols.map(_.name).mkString(", ")}")

339

}

340

341

val bucketCols = columns.filter(_.isBucket)

342

if (bucketCols.nonEmpty) {

343

println(s"Bucket columns: ${bucketCols.map(_.name).mkString(", ")}")

344

}

345

346

val dataTypes = columns.groupBy(_.dataType).mapValues(_.length)

347

println("Data type distribution:")

348

dataTypes.foreach { case (dataType, count) =>

349

println(s" $dataType: $count columns")

350

}

351

}

352

353

// Find tables with specific patterns

354

val userTables = catalog.listTables()

355

.filter(col("name").like("%user%"))

356

.collect()

357

358

// Cross-database analysis

359

val allDatabases = catalog.listDatabases().collect()

360

allDatabases.foreach { db =>

361

val tableCount = catalog.listTables(db.name).count()

362

println(s"Database ${db.name}: $tableCount tables")

363

}

364

```

365

366

### Integration with External Metastores

367

368

Working with Hive metastore and external catalog systems.

369

370

```scala

371

// Enable Hive support for metastore integration

372

val spark = SparkSession.builder()

373

.appName("Catalog Operations")

374

.enableHiveSupport()

375

.getOrCreate()

376

377

// Set Hive metastore location

378

spark.conf.set("spark.sql.warehouse.dir", "/path/to/warehouse")

379

spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")

380

381

// Create Hive-managed table

382

spark.sql("""

383

CREATE TABLE IF NOT EXISTS hive_table (

384

id BIGINT,

385

name STRING,

386

created_date DATE

387

)

388

USING HIVE

389

PARTITIONED BY (year INT, month INT)

390

STORED AS PARQUET

391

""")

392

393

// Register external table in Hive metastore

394

spark.sql("""

395

CREATE TABLE external_parquet_table (

396

id BIGINT,

397

name STRING,

398

value DOUBLE

399

)

400

USING PARQUET

401

LOCATION 's3://bucket/path/to/parquet'

402

""")

403

404

// Work with Hive databases

405

spark.sql("CREATE DATABASE IF NOT EXISTS hive_db")

406

catalog.setCurrentDatabase("hive_db")

407

408

// MSCK repair for Hive tables

409

spark.sql("MSCK REPAIR TABLE partitioned_hive_table")

410

```

411

412

### Catalog Utility Functions

413

414

Helper functions for common catalog operations.

415

416

```scala

417

object CatalogUtils {

418

def tableExists(spark: SparkSession, dbName: String, tableName: String): Boolean = {

419

try {

420

spark.catalog.getTable(dbName, tableName)

421

true

422

} catch {

423

case _: AnalysisException => false

424

}

425

}

426

427

def createDatabaseIfNotExists(spark: SparkSession, dbName: String, location: Option[String] = None): Unit = {

428

if (!spark.catalog.databaseExists(dbName)) {

429

location match {

430

case Some(path) => spark.catalog.createDatabase(dbName, ignoreIfExists = true, path)

431

case None => spark.catalog.createDatabase(dbName, ignoreIfExists = true)

432

}

433

println(s"Created database: $dbName")

434

} else {

435

println(s"Database already exists: $dbName")

436

}

437

}

438

439

def getTableStatistics(spark: SparkSession, tableName: String): Map[String, Any] = {

440

val table = spark.catalog.getTable(tableName)

441

val columns = spark.catalog.listColumns(tableName).collect()

442

443

Map(

444

"tableName" -> table.name,

445

"database" -> table.database,

446

"tableType" -> table.tableType,

447

"isTemporary" -> table.isTemporary,

448

"columnCount" -> columns.length,

449

"partitionColumns" -> columns.filter(_.isPartition).map(_.name),

450

"bucketColumns" -> columns.filter(_.isBucket).map(_.name),

451

"dataTypes" -> columns.map(_.dataType).distinct

452

)

453

}

454

455

def copyTableSchema(spark: SparkSession, sourceTable: String, targetTable: String): Unit = {

456

val sourceColumns = spark.catalog.listColumns(sourceTable).collect()

457

val schema = StructType(sourceColumns.map { col =>

458

val dataType = DataType.fromDDL(col.dataType)

459

StructField(col.name, dataType, col.nullable)

460

})

461

462

// Create empty table with same schema

463

val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

464

emptyDf.write.saveAsTable(targetTable)

465

}

466

}

467

468

// Usage examples

469

CatalogUtils.createDatabaseIfNotExists(spark, "analytics", Some("s3://bucket/analytics"))

470

471

val stats = CatalogUtils.getTableStatistics(spark, "employees")

472

println(s"Table statistics: $stats")

473

474

if (CatalogUtils.tableExists(spark, "default", "source_table")) {

475

CatalogUtils.copyTableSchema(spark, "source_table", "target_table")

476

}

477

```