0
# SQL and DataFrames
1
2
Spark SQL provides a programming interface for working with structured and semi-structured data through DataFrames, Datasets, and SQL queries. It includes the Catalyst optimizer for automatic query optimization and supports various data sources.
3
4
## Capabilities
5
6
### SparkSession
7
8
The unified entry point for working with structured data in Spark. Replaces SQLContext and HiveContext from earlier versions.
9
10
```scala { .api }
11
/**
12
* The entry point to programming Spark with the Dataset and DataFrame API.
13
*/
14
abstract class SparkSession extends Serializable with Closeable {
15
16
// Session lifecycle
17
def stop(): Unit
18
def close(): Unit
19
def version: String
20
21
// Configuration and state
22
def conf: RuntimeConfig
23
def sparkContext: SparkContext // Access to underlying SparkContext
24
def catalog: Catalog // Metadata management
25
def udf: UDFRegistration // User-defined function registration
26
def streams: StreamingQueryManager // Structured streaming
27
28
// DataFrame/Dataset creation from data
29
def createDataFrame[A <: Product: TypeTag](data: Seq[A]): DataFrame
30
def createDataFrame(rows: util.List[Row], schema: StructType): DataFrame
31
def createDataFrame(data: util.List[_], beanClass: Class[_]): DataFrame
32
def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame
33
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
34
def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame
35
36
// SQL interface
37
def sql(sqlText: String): DataFrame
38
def sql(sqlText: String, args: Map[String, Any]): DataFrame
39
def sql(sqlText: String, args: Array[_]): DataFrame
40
def table(tableName: String): DataFrame
41
42
// Data reading
43
def read: DataFrameReader
44
def readStream: DataStreamReader // For structured streaming
45
46
// Session management
47
def newSession(): SparkSession
48
}
49
50
// SparkSession builder for configuration
51
object SparkSession {
52
def builder(): Builder
53
def active: SparkSession
54
def getActiveSession: Option[SparkSession]
55
def getDefaultSession: Option[SparkSession]
56
57
class Builder {
58
def appName(name: String): Builder
59
def master(master: String): Builder
60
def config(key: String, value: String): Builder
61
def config(key: String, value: Long): Builder
62
def config(key: String, value: Double): Builder
63
def config(key: String, value: Boolean): Builder
64
def config(conf: SparkConf): Builder
65
def enableHiveSupport(): Builder
66
def getOrCreate(): SparkSession
67
}
68
}
69
```
70
71
**Usage Examples:**
72
73
```scala
74
import org.apache.spark.sql.SparkSession
75
76
// Create SparkSession
77
val spark = SparkSession.builder()
78
.appName("My SQL App")
79
.master("local[*]")
80
.config("spark.sql.adaptive.enabled", "true")
81
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
82
.getOrCreate()
83
84
// Create DataFrame from data
85
import spark.implicits._
86
87
case class Person(name: String, age: Int, city: String)
88
val people = Seq(
89
Person("Alice", 25, "New York"),
90
Person("Bob", 30, "San Francisco"),
91
Person("Charlie", 35, "Boston")
92
)
93
val df = spark.createDataFrame(people)
94
95
// Use SQL
96
df.createOrReplaceTempView("people")
97
val adults = spark.sql("SELECT name, age FROM people WHERE age >= 30")
98
adults.show()
99
100
// Access catalog
101
spark.catalog.listTables().show()
102
spark.catalog.listColumns("people").show()
103
104
// Clean up
105
spark.stop()
106
```
107
108
### DataFrameReader
109
110
Interface for reading data from external sources into DataFrames.
111
112
```scala { .api }
113
/**
114
* Interface used to load a DataFrame from external storage systems (e.g. file systems,
115
* key-value stores, etc).
116
*/
117
class DataFrameReader {
118
119
// Format specification
120
def format(source: String): DataFrameReader
121
122
// Schema specification
123
def schema(schema: StructType): DataFrameReader
124
def schema(schemaString: String): DataFrameReader
125
126
// Options
127
def option(key: String, value: String): DataFrameReader
128
def option(key: String, value: Boolean): DataFrameReader
129
def option(key: String, value: Long): DataFrameReader
130
def option(key: String, value: Double): DataFrameReader
131
def options(options: Map[String, String]): DataFrameReader
132
133
// Common format readers
134
def csv(path: String): DataFrame
135
def csv(paths: String*): DataFrame
136
def json(path: String): DataFrame
137
def json(paths: String*): DataFrame
138
def parquet(path: String): DataFrame
139
def parquet(paths: String*): DataFrame
140
def orc(path: String): DataFrame
141
def text(path: String): DataFrame
142
def textFile(path: String): Dataset[String]
143
144
// Generic load
145
def load(): DataFrame
146
def load(path: String): DataFrame
147
def load(paths: String*): DataFrame
148
149
// Database sources
150
def jdbc(url: String, table: String, properties: Properties): DataFrame
151
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
152
153
// Table sources
154
def table(tableName: String): DataFrame
155
}
156
```
157
158
**Usage Examples:**
159
160
```scala
161
val spark = SparkSession.builder().appName("DataReader").getOrCreate()
162
163
// Read CSV with schema inference
164
val df1 = spark.read
165
.option("header", "true")
166
.option("inferSchema", "true")
167
.csv("path/to/file.csv")
168
169
// Read CSV with explicit schema
170
import org.apache.spark.sql.types._
171
val schema = StructType(Array(
172
StructField("name", StringType, true),
173
StructField("age", IntegerType, true),
174
StructField("salary", DoubleType, true)
175
))
176
177
val df2 = spark.read
178
.schema(schema)
179
.option("header", "true")
180
.csv("path/to/file.csv")
181
182
// Read JSON
183
val jsonDF = spark.read.json("path/to/file.json")
184
185
// Read Parquet
186
val parquetDF = spark.read.parquet("path/to/file.parquet")
187
188
// Read from database
189
import java.util.Properties
190
val connectionProperties = new Properties()
191
connectionProperties.put("user", "username")
192
connectionProperties.put("password", "password")
193
194
val jdbcDF = spark.read
195
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
196
197
// Read with custom format
198
val customDF = spark.read
199
.format("org.apache.spark.sql.cassandra")
200
.options(Map("table" -> "words", "keyspace" -> "test"))
201
.load()
202
```
203
204
### Dataset[T] and DataFrame
205
206
Dataset is a strongly-typed distributed collection. DataFrame is an alias for Dataset[Row].
207
208
```scala { .api }
209
/**
210
* A Dataset is a strongly typed collection of domain-specific objects that can be transformed
211
* in parallel using functional or relational operations.
212
*/
213
abstract class Dataset[T] extends Serializable {
214
215
// Basic properties
216
def sparkSession: SparkSession
217
def encoder: Encoder[T]
218
def schema: StructType
219
def dtypes: Array[(String, String)]
220
def columns: Array[String]
221
def isEmpty: Boolean
222
def isLocal: Boolean
223
224
// Type conversions
225
def toDF(): DataFrame
226
def toDF(colNames: String*): DataFrame
227
def as[U: Encoder]: Dataset[U]
228
def to(schema: StructType): DataFrame
229
230
// Schema operations
231
def printSchema(): Unit
232
def explain(): Unit
233
def explain(extended: Boolean): Unit
234
def explain(mode: String): Unit
235
236
// Column operations and selection
237
def apply(colName: String): Column
238
def col(colName: String): Column
239
def select(cols: Column*): DataFrame
240
def select(col: String, cols: String*): DataFrame
241
def selectExpr(exprs: String*): DataFrame
242
def select[U1](c1: TypedColumn[T, U1]): Dataset[U1]
243
def withColumn(colName: String, col: Column): DataFrame
244
def withColumnRenamed(existingName: String, newName: String): DataFrame
245
def drop(colName: String): DataFrame
246
def drop(colNames: String*): DataFrame
247
def drop(col: Column): DataFrame
248
249
// Filtering
250
def filter(condition: Column): Dataset[T]
251
def filter(conditionExpr: String): Dataset[T]
252
def filter(func: T => Boolean): Dataset[T]
253
def where(condition: Column): Dataset[T]
254
255
// Transformations
256
def map[U: Encoder](func: T => U): Dataset[U]
257
def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
258
def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
259
def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]
260
261
// Sampling and limiting
262
def sample(fraction: Double): Dataset[T]
263
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
264
def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
265
def limit(n: Int): Dataset[T]
266
267
// Set operations
268
def union(other: Dataset[T]): Dataset[T]
269
def unionAll(other: Dataset[T]): Dataset[T] // Deprecated, use union
270
def unionByName(other: Dataset[T]): Dataset[T]
271
def intersect(other: Dataset[T]): Dataset[T]
272
def intersectAll(other: Dataset[T]): Dataset[T]
273
def except(other: Dataset[T]): Dataset[T]
274
def exceptAll(other: Dataset[T]): Dataset[T]
275
276
// Sorting
277
def sort(sortCol: String, sortCols: String*): Dataset[T]
278
def sort(sortExprs: Column*): Dataset[T]
279
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
280
def orderBy(sortExprs: Column*): Dataset[T]
281
282
// Grouping and aggregation
283
def groupBy(cols: Column*): RelationalGroupedDataset
284
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
285
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
286
def rollup(cols: Column*): RelationalGroupedDataset
287
def cube(cols: Column*): RelationalGroupedDataset
288
def agg(expr: Column, exprs: Column*): DataFrame
289
def agg(exprs: Map[String, String]): DataFrame
290
291
// Joins
292
def join(right: Dataset[_]): DataFrame
293
def join(right: Dataset[_], joinExprs: Column): DataFrame
294
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
295
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
296
def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
297
def crossJoin(right: Dataset[_]): DataFrame
298
299
// Window operations
300
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
301
302
// Actions
303
def show(): Unit
304
def show(numRows: Int): Unit
305
def show(numRows: Int, truncate: Boolean): Unit
306
def show(numRows: Int, truncate: Int): Unit
307
def collect(): Array[T]
308
def collectAsList(): util.List[T]
309
def count(): Long
310
def first(): T
311
def head(): T
312
def head(n: Int): Array[T]
313
def take(n: Int): Array[T]
314
def takeAsList(n: Int): util.List[T]
315
def tail(n: Int): Array[T]
316
def reduce(func: (T, T) => T): T
317
def foreach(func: T => Unit): Unit
318
def foreachPartition(func: Iterator[T] => Unit): Unit
319
320
// Persistence
321
def cache(): Dataset[T]
322
def persist(): Dataset[T]
323
def persist(newLevel: StorageLevel): Dataset[T]
324
def unpersist(): Dataset[T]
325
def unpersist(blocking: Boolean): Dataset[T]
326
def storageLevel: StorageLevel
327
328
// Checkpointing
329
def checkpoint(): Dataset[T]
330
def checkpoint(eager: Boolean): Dataset[T]
331
def localCheckpoint(): Dataset[T]
332
def localCheckpoint(eager: Boolean): Dataset[T]
333
334
// RDD conversion
335
def rdd: RDD[T]
336
def toJavaRDD: JavaRDD[T]
337
def javaRDD: JavaRDD[T]
338
339
// Writing
340
def write: DataFrameWriter[T]
341
def writeStream: DataStreamWriter[T]
342
}
343
344
// DataFrame is an alias for Dataset[Row]
345
type DataFrame = Dataset[Row]
346
```
347
348
**Usage Examples:**
349
350
```scala
351
import org.apache.spark.sql.{SparkSession, functions => F}
352
import org.apache.spark.sql.types._
353
354
val spark = SparkSession.builder().appName("DataFrameExample").getOrCreate()
355
import spark.implicits._
356
357
// Create sample data
358
case class Employee(name: String, department: String, salary: Double, age: Int)
359
val employees = Seq(
360
Employee("Alice", "Engineering", 80000, 25),
361
Employee("Bob", "Engineering", 90000, 30),
362
Employee("Charlie", "Sales", 70000, 35),
363
Employee("David", "Sales", 75000, 28),
364
Employee("Eve", "Marketing", 65000, 32)
365
).toDF()
366
367
// Basic operations
368
employees.show()
369
employees.printSchema()
370
println(s"Count: ${employees.count()}")
371
372
// Column operations
373
val withBonus = employees
374
.withColumn("bonus", F.col("salary") * 0.1)
375
.withColumn("total_comp", F.col("salary") + F.col("bonus"))
376
377
// Selection and filtering
378
val highEarners = employees
379
.select("name", "department", "salary")
380
.filter(F.col("salary") > 70000)
381
.orderBy(F.desc("salary"))
382
383
// Grouping and aggregation
384
val deptStats = employees
385
.groupBy("department")
386
.agg(
387
F.avg("salary").as("avg_salary"),
388
F.max("salary").as("max_salary"),
389
F.count("*").as("employee_count")
390
)
391
392
// Joins
393
val departments = Seq(
394
("Engineering", "Tech"),
395
("Sales", "Business"),
396
("Marketing", "Business")
397
).toDF("dept_name", "division")
398
399
val joinedData = employees
400
.join(departments, employees("department") === departments("dept_name"))
401
.select("name", "department", "division", "salary")
402
403
// Window functions
404
import org.apache.spark.sql.expressions.Window
405
406
val windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))
407
val rankedEmployees = employees
408
.withColumn("rank", F.row_number().over(windowSpec))
409
.withColumn("salary_percentile", F.percent_rank().over(windowSpec))
410
411
// Advanced transformations
412
val processedData = employees
413
.filter(F.col("age") >= 25)
414
.withColumn("salary_category",
415
F.when(F.col("salary") >= 80000, "High")
416
.when(F.col("salary") >= 70000, "Medium")
417
.otherwise("Low"))
418
.groupBy("department", "salary_category")
419
.count()
420
421
// Actions
422
val topEarner = employees.orderBy(F.desc("salary")).first()
423
val salaryList = employees.select("salary").as[Double].collect()
424
val avgSalary = employees.agg(F.avg("salary")).head().getDouble(0)
425
426
// Persistence
427
val cachedEmployees = employees.cache()
428
cachedEmployees.count() // Triggers caching
429
```
430
431
### Column Operations and Functions
432
433
Rich set of functions for working with columns and expressions.
434
435
```scala { .api }
436
// Column class for representing expressions
437
class Column extends Logging {
438
// Arithmetic operations
439
def +(other: Any): Column
440
def -(other: Any): Column
441
def *(other: Any): Column
442
def /(other: Any): Column
443
def %(other: Any): Column
444
445
// Comparison operations
446
def ===(other: Any): Column
447
def =!=(other: Any): Column
448
def >(other: Any): Column
449
def <(other: Any): Column
450
def >=(other: Any): Column
451
def <=(other: Any): Column
452
453
// Logical operations
454
def &&(other: Column): Column
455
def ||(other: Column): Column
456
def unary_!(): Column
457
458
// String operations
459
def contains(other: Any): Column
460
def startsWith(other: Column): Column
461
def endsWith(other: Column): Column
462
def like(literal: String): Column
463
def rlike(literal: String): Column
464
def substr(startPos: Column, len: Column): Column
465
466
// Null handling
467
def isNull: Column
468
def isNotNull: Column
469
def isNaN: Column
470
471
// Type operations
472
def cast(to: DataType): Column
473
def cast(to: String): Column
474
475
// Sorting
476
def asc: Column
477
def desc: Column
478
def asc_nulls_first: Column
479
def asc_nulls_last: Column
480
def desc_nulls_first: Column
481
def desc_nulls_last: Column
482
483
// Aliasing
484
def as(alias: String): Column
485
def alias(alias: String): Column
486
def name(alias: String): Column
487
}
488
489
// Built-in functions (org.apache.spark.sql.functions)
490
object functions {
491
// Aggregate functions
492
def count(e: Column): Column
493
def count(columnName: String): Column
494
def countDistinct(expr: Column, exprs: Column*): Column
495
def sum(e: Column): Column
496
def avg(e: Column): Column
497
def mean(e: Column): Column
498
def min(e: Column): Column
499
def max(e: Column): Column
500
def first(e: Column): Column
501
def last(e: Column): Column
502
def stddev(e: Column): Column
503
def variance(e: Column): Column
504
def collect_list(e: Column): Column
505
def collect_set(e: Column): Column
506
507
// String functions
508
def concat(exprs: Column*): Column
509
def concat_ws(sep: String, exprs: Column*): Column
510
def upper(e: Column): Column
511
def lower(e: Column): Column
512
def length(e: Column): Column
513
def trim(e: Column): Column
514
def ltrim(e: Column): Column
515
def rtrim(e: Column): Column
516
def regexp_replace(e: Column, pattern: String, replacement: String): Column
517
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
518
def split(str: Column, pattern: String): Column
519
520
// Date/time functions
521
def current_date(): Column
522
def current_timestamp(): Column
523
def date_add(start: Column, days: Int): Column
524
def date_sub(start: Column, days: Int): Column
525
def datediff(end: Column, start: Column): Column
526
def date_format(dateExpr: Column, format: String): Column
527
def year(e: Column): Column
528
def month(e: Column): Column
529
def dayofmonth(e: Column): Column
530
def hour(e: Column): Column
531
def minute(e: Column): Column
532
def second(e: Column): Column
533
534
// Mathematical functions
535
def abs(e: Column): Column
536
def sqrt(e: Column): Column
537
def pow(l: Column, r: Column): Column
538
def exp(e: Column): Column
539
def log(e: Column): Column
540
def round(e: Column, scale: Int): Column
541
def floor(e: Column): Column
542
def ceil(e: Column): Column
543
def rand(): Column
544
def randn(): Column
545
546
// Conditional functions
547
def when(condition: Column, value: Any): Column
548
def coalesce(e: Column*): Column
549
def isnull(e: Column): Column
550
def isnan(e: Column): Column
551
def greatest(exprs: Column*): Column
552
def least(exprs: Column*): Column
553
554
// Array functions
555
def array(cols: Column*): Column
556
def array_contains(column: Column, value: Any): Column
557
def explode(e: Column): Column
558
def posexplode(e: Column): Column
559
def size(e: Column): Column
560
def sort_array(e: Column): Column
561
562
// Window functions
563
def row_number(): Column
564
def rank(): Column
565
def dense_rank(): Column
566
def percent_rank(): Column
567
def ntile(n: Int): Column
568
def lag(columnName: String, offset: Int): Column
569
def lead(columnName: String, offset: Int): Column
570
571
// UDF creation
572
def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction
573
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction
574
// ... more UDF variants
575
576
// Column creation
577
def col(colName: String): Column
578
def column(colName: String): Column
579
def lit(literal: Any): Column
580
def expr(expr: String): Column
581
}
582
```
583
584
**Usage Examples:**
585
586
```scala
587
import org.apache.spark.sql.{functions => F}
588
589
val df = spark.read.option("header", "true").csv("data.csv")
590
591
// String operations
592
val cleaned = df
593
.withColumn("name_upper", F.upper(F.col("name")))
594
.withColumn("name_length", F.length(F.col("name")))
595
.withColumn("email_domain",
596
F.regexp_extract(F.col("email"), "@(.+)", 1))
597
598
// Date operations
599
val withDates = df
600
.withColumn("current_date", F.current_date())
601
.withColumn("age_years",
602
F.datediff(F.current_date(), F.col("birth_date")) / 365)
603
.withColumn("birth_year", F.year(F.col("birth_date")))
604
605
// Conditional logic
606
val categorized = df
607
.withColumn("age_group",
608
F.when(F.col("age") < 18, "Minor")
609
.when(F.col("age") < 65, "Adult")
610
.otherwise("Senior"))
611
.withColumn("salary_adjusted",
612
F.coalesce(F.col("salary"), F.lit(0.0)))
613
614
// Mathematical operations
615
val calculated = df
616
.withColumn("salary_log", F.log(F.col("salary")))
617
.withColumn("bonus", F.col("salary") * F.lit(0.1))
618
.withColumn("total_comp", F.col("salary") + F.col("bonus"))
619
620
// Array operations (assuming address is an array)
621
val arrayOps = df
622
.withColumn("address_count", F.size(F.col("addresses")))
623
.withColumn("first_address", F.col("addresses").getItem(0))
624
.select(F.col("name"), F.explode(F.col("addresses")).as("address"))
625
```
626
627
### SQL Interface
628
629
Direct SQL query execution with parameter binding and table management.
630
631
```scala { .api }
632
// SQL execution
633
def sql(sqlText: String): DataFrame
634
def sql(sqlText: String, args: Map[String, Any]): DataFrame
635
636
// Table operations
637
def table(tableName: String): DataFrame
638
639
// Catalog interface for metadata
640
trait Catalog {
641
def currentDatabase: String
642
def setCurrentDatabase(dbName: String): Unit
643
def listDatabases(): Dataset[Database]
644
def listTables(): Dataset[Table]
645
def listTables(dbName: String): Dataset[Table]
646
def listColumns(tableName: String): Dataset[Column]
647
def listColumns(dbName: String, tableName: String): Dataset[Column]
648
def listFunctions(): Dataset[Function]
649
def listFunctions(dbName: String): Dataset[Function]
650
def tableExists(tableName: String): Boolean
651
def tableExists(dbName: String, tableName: String): Boolean
652
def functionExists(functionName: String): Boolean
653
def functionExists(dbName: String, functionName: String): Boolean
654
655
// Temporary view management
656
def createTable(tableName: String, path: String): DataFrame
657
def createTable(tableName: String, path: String, source: String): DataFrame
658
def dropTempView(viewName: String): Boolean
659
def dropGlobalTempView(viewName: String): Boolean
660
def isCached(tableName: String): Boolean
661
def cacheTable(tableName: String): Unit
662
def uncacheTable(tableName: String): Unit
663
def clearCache(): Unit
664
def refreshTable(tableName: String): Unit
665
def refreshByPath(path: String): Unit
666
}
667
```
668
669
**Usage Examples:**
670
671
```scala
672
val spark = SparkSession.builder().appName("SQLExample").getOrCreate()
673
674
// Create temporary view
675
val employees = spark.read.parquet("employees.parquet")
676
employees.createOrReplaceTempView("employees")
677
678
// SQL queries
679
val highEarners = spark.sql("""
680
SELECT name, department, salary
681
FROM employees
682
WHERE salary > 75000
683
ORDER BY salary DESC
684
""")
685
686
// Parameterized queries
687
val minSalary = 80000
688
val deptFilter = "Engineering"
689
val filtered = spark.sql(
690
"SELECT * FROM employees WHERE salary >= ? AND department = ?",
691
Array(minSalary, deptFilter)
692
)
693
694
// Complex SQL with CTEs
695
val complexQuery = spark.sql("""
696
WITH dept_stats AS (
697
SELECT
698
department,
699
AVG(salary) as avg_salary,
700
COUNT(*) as emp_count
701
FROM employees
702
GROUP BY department
703
),
704
top_depts AS (
705
SELECT department
706
FROM dept_stats
707
WHERE avg_salary > 70000
708
)
709
SELECT e.name, e.salary, e.department
710
FROM employees e
711
JOIN top_depts t ON e.department = t.department
712
WHERE e.salary > (
713
SELECT avg_salary * 0.9
714
FROM dept_stats d
715
WHERE d.department = e.department
716
)
717
""")
718
719
// Catalog operations
720
spark.catalog.listTables().show()
721
spark.catalog.listColumns("employees").show()
722
723
// Cache management
724
spark.catalog.cacheTable("employees")
725
println(s"Is cached: ${spark.catalog.isCached("employees")}")
726
spark.catalog.uncacheTable("employees")
727
```
728
729
## Data Sources and Formats
730
731
### Built-in Data Sources
732
733
Spark SQL supports reading from and writing to various data sources:
734
735
- **Parquet**: Columnar storage format (default and recommended)
736
- **JSON**: Semi-structured data format
737
- **CSV**: Comma-separated values with configurable options
738
- **ORC**: Optimized row columnar format
739
- **Avro**: Schema evolution support
740
- **Text**: Plain text files
741
- **JDBC**: Relational databases
742
- **Hive Tables**: Integration with Hive metastore
743
744
### Performance Optimization
745
746
Key strategies for optimizing Spark SQL performance:
747
748
1. **Use appropriate file formats**: Prefer Parquet for analytics workloads
749
2. **Partition data**: Use `partitionBy()` when writing large datasets
750
3. **Cache frequently accessed data**: Use `cache()` or `persist()`
751
4. **Optimize joins**: Use broadcast joins for small tables
752
5. **Configure adaptive query execution**: Enable AQE for automatic optimization
753
6. **Use columnar operations**: Prefer DataFrame/Dataset APIs over RDD operations
754
7. **Optimize predicates**: Push down filters close to the data source
755
756
The SQL module provides the primary interface for most Spark applications, offering both programmatic APIs and SQL syntax for data analysis and transformation tasks.