or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdexceptions.mdgraphx.mdindex.mdlogging.mdmllib.mdsql.mdstorage.mdstreaming.mdutils.md

sql.mddocs/

0

# SQL and DataFrames

1

2

Spark SQL provides a programming interface for working with structured and semi-structured data through DataFrames, Datasets, and SQL queries. It includes the Catalyst optimizer for automatic query optimization and supports various data sources.

3

4

## Capabilities

5

6

### SparkSession

7

8

The unified entry point for working with structured data in Spark. Replaces SQLContext and HiveContext from earlier versions.

9

10

```scala { .api }

11

/**

12

* The entry point to programming Spark with the Dataset and DataFrame API.

13

*/

14

abstract class SparkSession extends Serializable with Closeable {

15

16

// Session lifecycle

17

def stop(): Unit

18

def close(): Unit

19

def version: String

20

21

// Configuration and state

22

def conf: RuntimeConfig

23

def sparkContext: SparkContext // Access to underlying SparkContext

24

def catalog: Catalog // Metadata management

25

def udf: UDFRegistration // User-defined function registration

26

def streams: StreamingQueryManager // Structured streaming

27

28

// DataFrame/Dataset creation from data

29

def createDataFrame[A <: Product: TypeTag](data: Seq[A]): DataFrame

30

def createDataFrame(rows: util.List[Row], schema: StructType): DataFrame

31

def createDataFrame(data: util.List[_], beanClass: Class[_]): DataFrame

32

def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame

33

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

34

def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame

35

36

// SQL interface

37

def sql(sqlText: String): DataFrame

38

def sql(sqlText: String, args: Map[String, Any]): DataFrame

39

def sql(sqlText: String, args: Array[_]): DataFrame

40

def table(tableName: String): DataFrame

41

42

// Data reading

43

def read: DataFrameReader

44

def readStream: DataStreamReader // For structured streaming

45

46

// Session management

47

def newSession(): SparkSession

48

}

49

50

// SparkSession builder for configuration

51

object SparkSession {

52

def builder(): Builder

53

def active: SparkSession

54

def getActiveSession: Option[SparkSession]

55

def getDefaultSession: Option[SparkSession]

56

57

class Builder {

58

def appName(name: String): Builder

59

def master(master: String): Builder

60

def config(key: String, value: String): Builder

61

def config(key: String, value: Long): Builder

62

def config(key: String, value: Double): Builder

63

def config(key: String, value: Boolean): Builder

64

def config(conf: SparkConf): Builder

65

def enableHiveSupport(): Builder

66

def getOrCreate(): SparkSession

67

}

68

}

69

```

70

71

**Usage Examples:**

72

73

```scala

74

import org.apache.spark.sql.SparkSession

75

76

// Create SparkSession

77

val spark = SparkSession.builder()

78

.appName("My SQL App")

79

.master("local[*]")

80

.config("spark.sql.adaptive.enabled", "true")

81

.config("spark.sql.adaptive.coalescePartitions.enabled", "true")

82

.getOrCreate()

83

84

// Create DataFrame from data

85

import spark.implicits._

86

87

case class Person(name: String, age: Int, city: String)

88

val people = Seq(

89

Person("Alice", 25, "New York"),

90

Person("Bob", 30, "San Francisco"),

91

Person("Charlie", 35, "Boston")

92

)

93

val df = spark.createDataFrame(people)

94

95

// Use SQL

96

df.createOrReplaceTempView("people")

97

val adults = spark.sql("SELECT name, age FROM people WHERE age >= 30")

98

adults.show()

99

100

// Access catalog

101

spark.catalog.listTables().show()

102

spark.catalog.listColumns("people").show()

103

104

// Clean up

105

spark.stop()

106

```

107

108

### DataFrameReader

109

110

Interface for reading data from external sources into DataFrames.

111

112

```scala { .api }

113

/**

114

* Interface used to load a DataFrame from external storage systems (e.g. file systems,

115

* key-value stores, etc).

116

*/

117

class DataFrameReader {

118

119

// Format specification

120

def format(source: String): DataFrameReader

121

122

// Schema specification

123

def schema(schema: StructType): DataFrameReader

124

def schema(schemaString: String): DataFrameReader

125

126

// Options

127

def option(key: String, value: String): DataFrameReader

128

def option(key: String, value: Boolean): DataFrameReader

129

def option(key: String, value: Long): DataFrameReader

130

def option(key: String, value: Double): DataFrameReader

131

def options(options: Map[String, String]): DataFrameReader

132

133

// Common format readers

134

def csv(path: String): DataFrame

135

def csv(paths: String*): DataFrame

136

def json(path: String): DataFrame

137

def json(paths: String*): DataFrame

138

def parquet(path: String): DataFrame

139

def parquet(paths: String*): DataFrame

140

def orc(path: String): DataFrame

141

def text(path: String): DataFrame

142

def textFile(path: String): Dataset[String]

143

144

// Generic load

145

def load(): DataFrame

146

def load(path: String): DataFrame

147

def load(paths: String*): DataFrame

148

149

// Database sources

150

def jdbc(url: String, table: String, properties: Properties): DataFrame

151

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame

152

153

// Table sources

154

def table(tableName: String): DataFrame

155

}

156

```

157

158

**Usage Examples:**

159

160

```scala

161

val spark = SparkSession.builder().appName("DataReader").getOrCreate()

162

163

// Read CSV with schema inference

164

val df1 = spark.read

165

.option("header", "true")

166

.option("inferSchema", "true")

167

.csv("path/to/file.csv")

168

169

// Read CSV with explicit schema

170

import org.apache.spark.sql.types._

171

val schema = StructType(Array(

172

StructField("name", StringType, true),

173

StructField("age", IntegerType, true),

174

StructField("salary", DoubleType, true)

175

))

176

177

val df2 = spark.read

178

.schema(schema)

179

.option("header", "true")

180

.csv("path/to/file.csv")

181

182

// Read JSON

183

val jsonDF = spark.read.json("path/to/file.json")

184

185

// Read Parquet

186

val parquetDF = spark.read.parquet("path/to/file.parquet")

187

188

// Read from database

189

import java.util.Properties

190

val connectionProperties = new Properties()

191

connectionProperties.put("user", "username")

192

connectionProperties.put("password", "password")

193

194

val jdbcDF = spark.read

195

.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

196

197

// Read with custom format

198

val customDF = spark.read

199

.format("org.apache.spark.sql.cassandra")

200

.options(Map("table" -> "words", "keyspace" -> "test"))

201

.load()

202

```

203

204

### Dataset[T] and DataFrame

205

206

Dataset is a strongly-typed distributed collection. DataFrame is an alias for Dataset[Row].

207

208

```scala { .api }

209

/**

210

* A Dataset is a strongly typed collection of domain-specific objects that can be transformed

211

* in parallel using functional or relational operations.

212

*/

213

abstract class Dataset[T] extends Serializable {

214

215

// Basic properties

216

def sparkSession: SparkSession

217

def encoder: Encoder[T]

218

def schema: StructType

219

def dtypes: Array[(String, String)]

220

def columns: Array[String]

221

def isEmpty: Boolean

222

def isLocal: Boolean

223

224

// Type conversions

225

def toDF(): DataFrame

226

def toDF(colNames: String*): DataFrame

227

def as[U: Encoder]: Dataset[U]

228

def to(schema: StructType): DataFrame

229

230

// Schema operations

231

def printSchema(): Unit

232

def explain(): Unit

233

def explain(extended: Boolean): Unit

234

def explain(mode: String): Unit

235

236

// Column operations and selection

237

def apply(colName: String): Column

238

def col(colName: String): Column

239

def select(cols: Column*): DataFrame

240

def select(col: String, cols: String*): DataFrame

241

def selectExpr(exprs: String*): DataFrame

242

def select[U1](c1: TypedColumn[T, U1]): Dataset[U1]

243

def withColumn(colName: String, col: Column): DataFrame

244

def withColumnRenamed(existingName: String, newName: String): DataFrame

245

def drop(colName: String): DataFrame

246

def drop(colNames: String*): DataFrame

247

def drop(col: Column): DataFrame

248

249

// Filtering

250

def filter(condition: Column): Dataset[T]

251

def filter(conditionExpr: String): Dataset[T]

252

def filter(func: T => Boolean): Dataset[T]

253

def where(condition: Column): Dataset[T]

254

255

// Transformations

256

def map[U: Encoder](func: T => U): Dataset[U]

257

def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]

258

def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]

259

def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]

260

261

// Sampling and limiting

262

def sample(fraction: Double): Dataset[T]

263

def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]

264

def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]

265

def limit(n: Int): Dataset[T]

266

267

// Set operations

268

def union(other: Dataset[T]): Dataset[T]

269

def unionAll(other: Dataset[T]): Dataset[T] // Deprecated, use union

270

def unionByName(other: Dataset[T]): Dataset[T]

271

def intersect(other: Dataset[T]): Dataset[T]

272

def intersectAll(other: Dataset[T]): Dataset[T]

273

def except(other: Dataset[T]): Dataset[T]

274

def exceptAll(other: Dataset[T]): Dataset[T]

275

276

// Sorting

277

def sort(sortCol: String, sortCols: String*): Dataset[T]

278

def sort(sortExprs: Column*): Dataset[T]

279

def orderBy(sortCol: String, sortCols: String*): Dataset[T]

280

def orderBy(sortExprs: Column*): Dataset[T]

281

282

// Grouping and aggregation

283

def groupBy(cols: Column*): RelationalGroupedDataset

284

def groupBy(col1: String, cols: String*): RelationalGroupedDataset

285

def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

286

def rollup(cols: Column*): RelationalGroupedDataset

287

def cube(cols: Column*): RelationalGroupedDataset

288

def agg(expr: Column, exprs: Column*): DataFrame

289

def agg(exprs: Map[String, String]): DataFrame

290

291

// Joins

292

def join(right: Dataset[_]): DataFrame

293

def join(right: Dataset[_], joinExprs: Column): DataFrame

294

def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

295

def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame

296

def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame

297

def crossJoin(right: Dataset[_]): DataFrame

298

299

// Window operations

300

def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

301

302

// Actions

303

def show(): Unit

304

def show(numRows: Int): Unit

305

def show(numRows: Int, truncate: Boolean): Unit

306

def show(numRows: Int, truncate: Int): Unit

307

def collect(): Array[T]

308

def collectAsList(): util.List[T]

309

def count(): Long

310

def first(): T

311

def head(): T

312

def head(n: Int): Array[T]

313

def take(n: Int): Array[T]

314

def takeAsList(n: Int): util.List[T]

315

def tail(n: Int): Array[T]

316

def reduce(func: (T, T) => T): T

317

def foreach(func: T => Unit): Unit

318

def foreachPartition(func: Iterator[T] => Unit): Unit

319

320

// Persistence

321

def cache(): Dataset[T]

322

def persist(): Dataset[T]

323

def persist(newLevel: StorageLevel): Dataset[T]

324

def unpersist(): Dataset[T]

325

def unpersist(blocking: Boolean): Dataset[T]

326

def storageLevel: StorageLevel

327

328

// Checkpointing

329

def checkpoint(): Dataset[T]

330

def checkpoint(eager: Boolean): Dataset[T]

331

def localCheckpoint(): Dataset[T]

332

def localCheckpoint(eager: Boolean): Dataset[T]

333

334

// RDD conversion

335

def rdd: RDD[T]

336

def toJavaRDD: JavaRDD[T]

337

def javaRDD: JavaRDD[T]

338

339

// Writing

340

def write: DataFrameWriter[T]

341

def writeStream: DataStreamWriter[T]

342

}

343

344

// DataFrame is an alias for Dataset[Row]

345

type DataFrame = Dataset[Row]

346

```

347

348

**Usage Examples:**

349

350

```scala

351

import org.apache.spark.sql.{SparkSession, functions => F}

352

import org.apache.spark.sql.types._

353

354

val spark = SparkSession.builder().appName("DataFrameExample").getOrCreate()

355

import spark.implicits._

356

357

// Create sample data

358

case class Employee(name: String, department: String, salary: Double, age: Int)

359

val employees = Seq(

360

Employee("Alice", "Engineering", 80000, 25),

361

Employee("Bob", "Engineering", 90000, 30),

362

Employee("Charlie", "Sales", 70000, 35),

363

Employee("David", "Sales", 75000, 28),

364

Employee("Eve", "Marketing", 65000, 32)

365

).toDF()

366

367

// Basic operations

368

employees.show()

369

employees.printSchema()

370

println(s"Count: ${employees.count()}")

371

372

// Column operations

373

val withBonus = employees

374

.withColumn("bonus", F.col("salary") * 0.1)

375

.withColumn("total_comp", F.col("salary") + F.col("bonus"))

376

377

// Selection and filtering

378

val highEarners = employees

379

.select("name", "department", "salary")

380

.filter(F.col("salary") > 70000)

381

.orderBy(F.desc("salary"))

382

383

// Grouping and aggregation

384

val deptStats = employees

385

.groupBy("department")

386

.agg(

387

F.avg("salary").as("avg_salary"),

388

F.max("salary").as("max_salary"),

389

F.count("*").as("employee_count")

390

)

391

392

// Joins

393

val departments = Seq(

394

("Engineering", "Tech"),

395

("Sales", "Business"),

396

("Marketing", "Business")

397

).toDF("dept_name", "division")

398

399

val joinedData = employees

400

.join(departments, employees("department") === departments("dept_name"))

401

.select("name", "department", "division", "salary")

402

403

// Window functions

404

import org.apache.spark.sql.expressions.Window

405

406

val windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))

407

val rankedEmployees = employees

408

.withColumn("rank", F.row_number().over(windowSpec))

409

.withColumn("salary_percentile", F.percent_rank().over(windowSpec))

410

411

// Advanced transformations

412

val processedData = employees

413

.filter(F.col("age") >= 25)

414

.withColumn("salary_category",

415

F.when(F.col("salary") >= 80000, "High")

416

.when(F.col("salary") >= 70000, "Medium")

417

.otherwise("Low"))

418

.groupBy("department", "salary_category")

419

.count()

420

421

// Actions

422

val topEarner = employees.orderBy(F.desc("salary")).first()

423

val salaryList = employees.select("salary").as[Double].collect()

424

val avgSalary = employees.agg(F.avg("salary")).head().getDouble(0)

425

426

// Persistence

427

val cachedEmployees = employees.cache()

428

cachedEmployees.count() // Triggers caching

429

```

430

431

### Column Operations and Functions

432

433

Rich set of functions for working with columns and expressions.

434

435

```scala { .api }

436

// Column class for representing expressions

437

class Column extends Logging {

438

// Arithmetic operations

439

def +(other: Any): Column

440

def -(other: Any): Column

441

def *(other: Any): Column

442

def /(other: Any): Column

443

def %(other: Any): Column

444

445

// Comparison operations

446

def ===(other: Any): Column

447

def =!=(other: Any): Column

448

def >(other: Any): Column

449

def <(other: Any): Column

450

def >=(other: Any): Column

451

def <=(other: Any): Column

452

453

// Logical operations

454

def &&(other: Column): Column

455

def ||(other: Column): Column

456

def unary_!(): Column

457

458

// String operations

459

def contains(other: Any): Column

460

def startsWith(other: Column): Column

461

def endsWith(other: Column): Column

462

def like(literal: String): Column

463

def rlike(literal: String): Column

464

def substr(startPos: Column, len: Column): Column

465

466

// Null handling

467

def isNull: Column

468

def isNotNull: Column

469

def isNaN: Column

470

471

// Type operations

472

def cast(to: DataType): Column

473

def cast(to: String): Column

474

475

// Sorting

476

def asc: Column

477

def desc: Column

478

def asc_nulls_first: Column

479

def asc_nulls_last: Column

480

def desc_nulls_first: Column

481

def desc_nulls_last: Column

482

483

// Aliasing

484

def as(alias: String): Column

485

def alias(alias: String): Column

486

def name(alias: String): Column

487

}

488

489

// Built-in functions (org.apache.spark.sql.functions)

490

object functions {

491

// Aggregate functions

492

def count(e: Column): Column

493

def count(columnName: String): Column

494

def countDistinct(expr: Column, exprs: Column*): Column

495

def sum(e: Column): Column

496

def avg(e: Column): Column

497

def mean(e: Column): Column

498

def min(e: Column): Column

499

def max(e: Column): Column

500

def first(e: Column): Column

501

def last(e: Column): Column

502

def stddev(e: Column): Column

503

def variance(e: Column): Column

504

def collect_list(e: Column): Column

505

def collect_set(e: Column): Column

506

507

// String functions

508

def concat(exprs: Column*): Column

509

def concat_ws(sep: String, exprs: Column*): Column

510

def upper(e: Column): Column

511

def lower(e: Column): Column

512

def length(e: Column): Column

513

def trim(e: Column): Column

514

def ltrim(e: Column): Column

515

def rtrim(e: Column): Column

516

def regexp_replace(e: Column, pattern: String, replacement: String): Column

517

def regexp_extract(e: Column, exp: String, groupIdx: Int): Column

518

def split(str: Column, pattern: String): Column

519

520

// Date/time functions

521

def current_date(): Column

522

def current_timestamp(): Column

523

def date_add(start: Column, days: Int): Column

524

def date_sub(start: Column, days: Int): Column

525

def datediff(end: Column, start: Column): Column

526

def date_format(dateExpr: Column, format: String): Column

527

def year(e: Column): Column

528

def month(e: Column): Column

529

def dayofmonth(e: Column): Column

530

def hour(e: Column): Column

531

def minute(e: Column): Column

532

def second(e: Column): Column

533

534

// Mathematical functions

535

def abs(e: Column): Column

536

def sqrt(e: Column): Column

537

def pow(l: Column, r: Column): Column

538

def exp(e: Column): Column

539

def log(e: Column): Column

540

def round(e: Column, scale: Int): Column

541

def floor(e: Column): Column

542

def ceil(e: Column): Column

543

def rand(): Column

544

def randn(): Column

545

546

// Conditional functions

547

def when(condition: Column, value: Any): Column

548

def coalesce(e: Column*): Column

549

def isnull(e: Column): Column

550

def isnan(e: Column): Column

551

def greatest(exprs: Column*): Column

552

def least(exprs: Column*): Column

553

554

// Array functions

555

def array(cols: Column*): Column

556

def array_contains(column: Column, value: Any): Column

557

def explode(e: Column): Column

558

def posexplode(e: Column): Column

559

def size(e: Column): Column

560

def sort_array(e: Column): Column

561

562

// Window functions

563

def row_number(): Column

564

def rank(): Column

565

def dense_rank(): Column

566

def percent_rank(): Column

567

def ntile(n: Int): Column

568

def lag(columnName: String, offset: Int): Column

569

def lead(columnName: String, offset: Int): Column

570

571

// UDF creation

572

def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction

573

def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction

574

// ... more UDF variants

575

576

// Column creation

577

def col(colName: String): Column

578

def column(colName: String): Column

579

def lit(literal: Any): Column

580

def expr(expr: String): Column

581

}

582

```

583

584

**Usage Examples:**

585

586

```scala

587

import org.apache.spark.sql.{functions => F}

588

589

val df = spark.read.option("header", "true").csv("data.csv")

590

591

// String operations

592

val cleaned = df

593

.withColumn("name_upper", F.upper(F.col("name")))

594

.withColumn("name_length", F.length(F.col("name")))

595

.withColumn("email_domain",

596

F.regexp_extract(F.col("email"), "@(.+)", 1))

597

598

// Date operations

599

val withDates = df

600

.withColumn("current_date", F.current_date())

601

.withColumn("age_years",

602

F.datediff(F.current_date(), F.col("birth_date")) / 365)

603

.withColumn("birth_year", F.year(F.col("birth_date")))

604

605

// Conditional logic

606

val categorized = df

607

.withColumn("age_group",

608

F.when(F.col("age") < 18, "Minor")

609

.when(F.col("age") < 65, "Adult")

610

.otherwise("Senior"))

611

.withColumn("salary_adjusted",

612

F.coalesce(F.col("salary"), F.lit(0.0)))

613

614

// Mathematical operations

615

val calculated = df

616

.withColumn("salary_log", F.log(F.col("salary")))

617

.withColumn("bonus", F.col("salary") * F.lit(0.1))

618

.withColumn("total_comp", F.col("salary") + F.col("bonus"))

619

620

// Array operations (assuming address is an array)

621

val arrayOps = df

622

.withColumn("address_count", F.size(F.col("addresses")))

623

.withColumn("first_address", F.col("addresses").getItem(0))

624

.select(F.col("name"), F.explode(F.col("addresses")).as("address"))

625

```

626

627

### SQL Interface

628

629

Direct SQL query execution with parameter binding and table management.

630

631

```scala { .api }

632

// SQL execution

633

def sql(sqlText: String): DataFrame

634

def sql(sqlText: String, args: Map[String, Any]): DataFrame

635

636

// Table operations

637

def table(tableName: String): DataFrame

638

639

// Catalog interface for metadata

640

trait Catalog {

641

def currentDatabase: String

642

def setCurrentDatabase(dbName: String): Unit

643

def listDatabases(): Dataset[Database]

644

def listTables(): Dataset[Table]

645

def listTables(dbName: String): Dataset[Table]

646

def listColumns(tableName: String): Dataset[Column]

647

def listColumns(dbName: String, tableName: String): Dataset[Column]

648

def listFunctions(): Dataset[Function]

649

def listFunctions(dbName: String): Dataset[Function]

650

def tableExists(tableName: String): Boolean

651

def tableExists(dbName: String, tableName: String): Boolean

652

def functionExists(functionName: String): Boolean

653

def functionExists(dbName: String, functionName: String): Boolean

654

655

// Temporary view management

656

def createTable(tableName: String, path: String): DataFrame

657

def createTable(tableName: String, path: String, source: String): DataFrame

658

def dropTempView(viewName: String): Boolean

659

def dropGlobalTempView(viewName: String): Boolean

660

def isCached(tableName: String): Boolean

661

def cacheTable(tableName: String): Unit

662

def uncacheTable(tableName: String): Unit

663

def clearCache(): Unit

664

def refreshTable(tableName: String): Unit

665

def refreshByPath(path: String): Unit

666

}

667

```

668

669

**Usage Examples:**

670

671

```scala

672

val spark = SparkSession.builder().appName("SQLExample").getOrCreate()

673

674

// Create temporary view

675

val employees = spark.read.parquet("employees.parquet")

676

employees.createOrReplaceTempView("employees")

677

678

// SQL queries

679

val highEarners = spark.sql("""

680

SELECT name, department, salary

681

FROM employees

682

WHERE salary > 75000

683

ORDER BY salary DESC

684

""")

685

686

// Parameterized queries

687

val minSalary = 80000

688

val deptFilter = "Engineering"

689

val filtered = spark.sql(

690

"SELECT * FROM employees WHERE salary >= ? AND department = ?",

691

Array(minSalary, deptFilter)

692

)

693

694

// Complex SQL with CTEs

695

val complexQuery = spark.sql("""

696

WITH dept_stats AS (

697

SELECT

698

department,

699

AVG(salary) as avg_salary,

700

COUNT(*) as emp_count

701

FROM employees

702

GROUP BY department

703

),

704

top_depts AS (

705

SELECT department

706

FROM dept_stats

707

WHERE avg_salary > 70000

708

)

709

SELECT e.name, e.salary, e.department

710

FROM employees e

711

JOIN top_depts t ON e.department = t.department

712

WHERE e.salary > (

713

SELECT avg_salary * 0.9

714

FROM dept_stats d

715

WHERE d.department = e.department

716

)

717

""")

718

719

// Catalog operations

720

spark.catalog.listTables().show()

721

spark.catalog.listColumns("employees").show()

722

723

// Cache management

724

spark.catalog.cacheTable("employees")

725

println(s"Is cached: ${spark.catalog.isCached("employees")}")

726

spark.catalog.uncacheTable("employees")

727

```

728

729

## Data Sources and Formats

730

731

### Built-in Data Sources

732

733

Spark SQL supports reading from and writing to various data sources:

734

735

- **Parquet**: Columnar storage format (default and recommended)

736

- **JSON**: Semi-structured data format

737

- **CSV**: Comma-separated values with configurable options

738

- **ORC**: Optimized row columnar format

739

- **Avro**: Schema evolution support

740

- **Text**: Plain text files

741

- **JDBC**: Relational databases

742

- **Hive Tables**: Integration with Hive metastore

743

744

### Performance Optimization

745

746

Key strategies for optimizing Spark SQL performance:

747

748

1. **Use appropriate file formats**: Prefer Parquet for analytics workloads

749

2. **Partition data**: Use `partitionBy()` when writing large datasets

750

3. **Cache frequently accessed data**: Use `cache()` or `persist()`

751

4. **Optimize joins**: Use broadcast joins for small tables

752

5. **Configure adaptive query execution**: Enable AQE for automatic optimization

753

6. **Use columnar operations**: Prefer DataFrame/Dataset APIs over RDD operations

754

7. **Optimize predicates**: Push down filters close to the data source

755

756

The SQL module provides the primary interface for most Spark applications, offering both programmatic APIs and SQL syntax for data analysis and transformation tasks.