0
# SQL and DataFrames
1
2
High-level APIs for working with structured data using DataFrames and Datasets. Built on Spark SQL with Catalyst optimizer for query optimization and code generation. Provides seamless integration with various data sources and formats.
3
4
## Capabilities
5
6
### SparkSession
7
8
Entry point for DataFrame and Dataset APIs. The modern way to work with Spark SQL functionality.
9
10
```scala { .api }
11
/**
12
* Entry point for DataFrame and Dataset APIs
13
*/
14
class SparkSession {
15
/** Get DataFrameReader for loading data */
16
def read: DataFrameReader
17
/** Get DataStreamReader for streaming data */
18
def readStream: DataStreamReader
19
/** Execute SQL query */
20
def sql(sqlText: String): DataFrame
21
/** Access table as DataFrame */
22
def table(tableName: String): DataFrame
23
/** Create Dataset of numbers */
24
def range(end: Long): Dataset[Long]
25
def range(start: Long, end: Long): Dataset[Long]
26
/** Create DataFrame from sequence */
27
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
28
/** Create Dataset from sequence */
29
def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
30
/** Create empty DataFrame */
31
def emptyDataFrame: DataFrame
32
/** Create empty Dataset */
33
def emptyDataset[T: Encoder]: Dataset[T]
34
/** Access catalog functions */
35
def catalog: Catalog
36
/** Access runtime configuration */
37
def conf: RuntimeConfig
38
/** Register user-defined functions */
39
def udf: UDFRegistration
40
/** Manage streaming queries */
41
def streams: StreamingQueryManager
42
/** Stop SparkSession */
43
def stop(): Unit
44
}
45
46
/**
47
* Builder for SparkSession
48
*/
49
object SparkSession {
50
def builder(): Builder
51
52
class Builder {
53
def master(master: String): Builder
54
def appName(name: String): Builder
55
def config(key: String, value: String): Builder
56
def config(conf: SparkConf): Builder
57
def enableHiveSupport(): Builder
58
def getOrCreate(): SparkSession
59
}
60
}
61
```
62
63
**Usage Examples:**
64
65
```scala
66
import org.apache.spark.sql.SparkSession
67
68
// Create SparkSession
69
val spark = SparkSession.builder()
70
.appName("MyApp")
71
.master("local[*]")
72
.config("spark.sql.adaptive.enabled", "true")
73
.getOrCreate()
74
75
// Load data
76
val df = spark.read
77
.option("header", "true")
78
.csv("people.csv")
79
80
// SQL queries
81
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
82
83
// Create from data
84
val data = Seq(("Alice", 25), ("Bob", 30))
85
val df2 = spark.createDataFrame(data).toDF("name", "age")
86
87
spark.stop()
88
```
89
90
**Python SparkSession API:**
91
92
```python { .api }
93
class SparkSession:
94
"""
95
Entry point for DataFrame and SQL APIs in Python
96
"""
97
@property
98
def read(self) -> DataFrameReader
99
@property
100
def readStream(self) -> DataStreamReader
101
def sql(self, sqlQuery: str) -> DataFrame
102
def table(self, tableName: str) -> DataFrame
103
def range(self, start: int, end: int = None, step: int = 1, numPartitions: int = None) -> DataFrame
104
def createDataFrame(self, data: List, schema: Optional[Union[List[str], StructType]] = None) -> DataFrame
105
@property
106
def catalog(self) -> Catalog
107
@property
108
def conf(self) -> RuntimeConfig
109
@property
110
def udf(self) -> UDFRegistration
111
@property
112
def streams(self) -> StreamingQueryManager
113
def stop(self) -> None
114
115
class SparkSession:
116
@staticmethod
117
def builder() -> Builder
118
119
class Builder:
120
def master(self, master: str) -> Builder
121
def appName(self, name: str) -> Builder
122
def config(self, key: str, value: str) -> Builder
123
def enableHiveSupport(self) -> Builder
124
def getOrCreate(self) -> SparkSession
125
```
126
127
**Python Usage Examples:**
128
129
```python
130
from pyspark.sql import SparkSession
131
132
# Create SparkSession
133
spark = SparkSession.builder \
134
.appName("MyApp") \
135
.master("local[*]") \
136
.config("spark.sql.adaptive.enabled", "true") \
137
.getOrCreate()
138
139
# Load data
140
df = spark.read \
141
.option("header", "true") \
142
.csv("people.csv")
143
144
# SQL queries
145
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
146
147
# Create from data
148
data = [("Alice", 25), ("Bob", 30)]
149
df2 = spark.createDataFrame(data, ["name", "age"])
150
151
spark.stop()
152
```
153
154
### Dataset[T] and DataFrame
155
156
Dataset is a distributed collection of data with compile-time type safety. DataFrame is a type alias for Dataset[Row].
157
158
```scala { .api }
159
/**
160
* Distributed collection of data with schema
161
*/
162
class Dataset[T] {
163
/** Display data in tabular format */
164
def show(numRows: Int = 20, truncate: Boolean = true): Unit
165
/** Print schema to console */
166
def printSchema(): Unit
167
/** Show query execution plan */
168
def explain(extended: Boolean = false): Unit
169
170
/** Select columns */
171
def select(cols: Column*): DataFrame
172
def select(col: String, cols: String*): DataFrame
173
/** Filter rows */
174
def filter(condition: Column): Dataset[T]
175
def where(condition: Column): Dataset[T]
176
/** Group by columns */
177
def groupBy(cols: Column*): RelationalGroupedDataset
178
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
179
/** Aggregate expressions */
180
def agg(expr: Column, exprs: Column*): DataFrame
181
182
/** Join with another Dataset */
183
def join(right: Dataset[_]): DataFrame
184
def join(right: Dataset[_], joinExprs: Column): DataFrame
185
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
186
/** Union with another Dataset */
187
def union(other: Dataset[T]): Dataset[T]
188
def unionAll(other: Dataset[T]): Dataset[T]
189
/** Intersection */
190
def intersect(other: Dataset[T]): Dataset[T]
191
/** Difference */
192
def except(other: Dataset[T]): Dataset[T]
193
194
/** Sort by columns */
195
def orderBy(sortExprs: Column*): Dataset[T]
196
def sort(sortExprs: Column*): Dataset[T]
197
/** Limit number of rows */
198
def limit(n: Int): Dataset[T]
199
/** Remove duplicates */
200
def distinct(): Dataset[T]
201
def dropDuplicates(): Dataset[T]
202
def dropDuplicates(colNames: Array[String]): Dataset[T]
203
204
/** Add or replace column */
205
def withColumn(colName: String, col: Column): DataFrame
206
/** Rename column */
207
def withColumnRenamed(existingName: String, newName: String): DataFrame
208
/** Drop column */
209
def drop(colName: String): DataFrame
210
def drop(col: Column): DataFrame
211
212
/** Collect all rows to driver */
213
def collect(): Array[T]
214
/** Collect as Java List */
215
def collectAsList(): java.util.List[T]
216
/** Count rows */
217
def count(): Long
218
/** Get first row */
219
def first(): T
220
def head(): T
221
/** Get first n rows */
222
def head(n: Int): Array[T]
223
def take(n: Int): Array[T]
224
225
/** Get writer for saving data */
226
def write: DataFrameWriter[T]
227
/** Get writer for streaming */
228
def writeStream: DataStreamWriter[T]
229
230
/** Cache Dataset */
231
def cache(): Dataset[T]
232
/** Persist with storage level */
233
def persist(newLevel: StorageLevel): Dataset[T]
234
/** Remove from cache */
235
def unpersist(blocking: Boolean = false): Dataset[T]
236
237
/** Convert to different type */
238
def as[U](implicit encoder: Encoder[U]): Dataset[U]
239
/** Map function with encoder */
240
def map[U](func: T => U)(implicit encoder: Encoder[U]): Dataset[U]
241
/** FlatMap with encoder */
242
def flatMap[U](func: T => TraversableOnce[U])(implicit encoder: Encoder[U]): Dataset[U]
243
/** Apply function to each element */
244
def foreach(f: T => Unit): Unit
245
/** Apply function to each partition */
246
def foreachPartition(f: Iterator[T] => Unit): Unit
247
}
248
249
/** DataFrame is Dataset[Row] */
250
type DataFrame = Dataset[Row]
251
```
252
253
**Usage Examples:**
254
255
```scala
256
import org.apache.spark.sql.functions._
257
258
// Load and explore data
259
val df = spark.read.json("people.json")
260
df.show()
261
df.printSchema()
262
df.explain()
263
264
// Transformations
265
val adults = df
266
.select("name", "age")
267
.filter($"age" > 18)
268
.orderBy($"age".desc)
269
270
// Aggregations
271
val avgAge = df
272
.groupBy("department")
273
.agg(avg("age").as("avg_age"), count("*").as("count"))
274
275
// Joins
276
val departments = spark.read.json("departments.json")
277
val joined = df.join(departments, "dept_id")
278
279
// Window functions
280
import org.apache.spark.sql.expressions.Window
281
val windowSpec = Window.partitionBy("department").orderBy("salary")
282
val ranked = df.withColumn("rank", row_number().over(windowSpec))
283
284
// Type-safe operations with case classes
285
case class Person(name: String, age: Int)
286
val people = df.as[Person]
287
val names = people.map(_.name.toUpperCase)
288
```
289
290
### Column Expressions
291
292
Column expressions for building queries and transformations.
293
294
```scala { .api }
295
/**
296
* Column expression for DataFrame operations
297
*/
298
class Column {
299
/** Create alias */
300
def alias(alias: String): Column
301
def as(alias: String): Column
302
/** Cast to different type */
303
def cast(to: DataType): Column
304
def cast(to: String): Column
305
306
/** Null checks */
307
def isNull: Column
308
def isNotNull: Column
309
def isNaN: Column
310
311
/** Logical operators */
312
def &&(other: Any): Column
313
def ||(other: Any): Column
314
def unary_! : Column
315
316
/** Comparison operators */
317
def ===(other: Any): Column
318
def =!=(other: Any): Column
319
def >(other: Any): Column
320
def <(other: Any): Column
321
def >=(other: Any): Column
322
def <=(other: Any): Column
323
324
/** Arithmetic operators */
325
def +(other: Any): Column
326
def -(other: Any): Column
327
def *(other: Any): Column
328
def /(other: Any): Column
329
def %(other: Any): Column
330
331
/** String operations */
332
def contains(other: Any): Column
333
def startsWith(other: Column): Column
334
def endsWith(other: Column): Column
335
def like(literal: String): Column
336
def rlike(literal: String): Column
337
338
/** Sorting */
339
def asc: Column
340
def desc: Column
341
def asc_nulls_first: Column
342
def desc_nulls_last: Column
343
344
/** Array operations */
345
def getItem(key: Any): Column
346
def getField(fieldName: String): Column
347
}
348
```
349
350
### Built-in Functions
351
352
Comprehensive set of built-in functions for data manipulation.
353
354
```scala { .api }
355
/**
356
* Built-in functions for DataFrame operations
357
*/
358
object functions {
359
/** Column references */
360
def col(colName: String): Column
361
def column(colName: String): Column
362
def lit(literal: Any): Column
363
364
/** Conditional expressions */
365
def when(condition: Column, value: Any): Column
366
def coalesce(cols: Column*): Column
367
def isnull(col: Column): Column
368
def isnan(col: Column): Column
369
370
/** String functions */
371
def upper(e: Column): Column
372
def lower(e: Column): Column
373
def trim(e: Column): Column
374
def ltrim(e: Column): Column
375
def rtrim(e: Column): Column
376
def length(e: Column): Column
377
def substring(str: Column, pos: Int, len: Int): Column
378
def concat(exprs: Column*): Column
379
def concat_ws(sep: String, exprs: Column*): Column
380
def split(str: Column, pattern: String): Column
381
def regexp_replace(e: Column, pattern: String, replacement: String): Column
382
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
383
384
/** Math functions */
385
def abs(e: Column): Column
386
def sqrt(e: Column): Column
387
def pow(l: Column, r: Column): Column
388
def round(e: Column, scale: Int): Column
389
def ceil(e: Column): Column
390
def floor(e: Column): Column
391
def sin(e: Column): Column
392
def cos(e: Column): Column
393
def tan(e: Column): Column
394
def log(e: Column): Column
395
def exp(e: Column): Column
396
def greatest(exprs: Column*): Column
397
def least(exprs: Column*): Column
398
399
/** Aggregate functions */
400
def sum(e: Column): Column
401
def avg(e: Column): Column
402
def mean(e: Column): Column
403
def count(e: Column): Column
404
def countDistinct(expr: Column, exprs: Column*): Column
405
def min(e: Column): Column
406
def max(e: Column): Column
407
def first(e: Column): Column
408
def last(e: Column): Column
409
def stddev(e: Column): Column
410
def variance(e: Column): Column
411
def collect_list(e: Column): Column
412
def collect_set(e: Column): Column
413
414
/** Date/Time functions */
415
def current_date(): Column
416
def current_timestamp(): Column
417
def date_add(start: Column, days: Int): Column
418
def date_sub(start: Column, days: Int): Column
419
def date_format(dateExpr: Column, format: String): Column
420
def year(e: Column): Column
421
def month(e: Column): Column
422
def dayofmonth(e: Column): Column
423
def hour(e: Column): Column
424
def minute(e: Column): Column
425
def second(e: Column): Column
426
def unix_timestamp(): Column
427
def from_unixtime(ut: Column): Column
428
429
/** Array functions */
430
def array(cols: Column*): Column
431
def array_contains(column: Column, value: Any): Column
432
def explode(e: Column): Column
433
def posexplode(e: Column): Column
434
def size(e: Column): Column
435
def sort_array(e: Column): Column
436
def reverse(e: Column): Column
437
def array_distinct(e: Column): Column
438
439
/** Map functions */
440
def map(cols: Column*): Column
441
def map_keys(e: Column): Column
442
def map_values(e: Column): Column
443
444
/** Window functions */
445
def row_number(): Column
446
def rank(): Column
447
def dense_rank(): Column
448
def percent_rank(): Column
449
def ntile(n: Int): Column
450
def lag(e: Column, offset: Int): Column
451
def lead(e: Column, offset: Int): Column
452
def first_value(e: Column): Column
453
def last_value(e: Column): Column
454
}
455
```
456
457
### Data I/O
458
459
Reading and writing data from various sources.
460
461
```scala { .api }
462
/**
463
* Interface for loading DataFrames from external storage
464
*/
465
class DataFrameReader {
466
/** Specify data source format */
467
def format(source: String): DataFrameReader
468
/** Add input option */
469
def option(key: String, value: String): DataFrameReader
470
def option(key: String, value: Boolean): DataFrameReader
471
def option(key: String, value: Long): DataFrameReader
472
def option(key: String, value: Double): DataFrameReader
473
/** Add multiple options */
474
def options(options: Map[String, String]): DataFrameReader
475
/** Set expected schema */
476
def schema(schema: StructType): DataFrameReader
477
def schema(schemaString: String): DataFrameReader
478
479
/** Load data */
480
def load(): DataFrame
481
def load(path: String): DataFrame
482
def load(paths: String*): DataFrame
483
484
/** Format-specific methods */
485
def json(path: String): DataFrame
486
def json(jsonRDD: RDD[String]): DataFrame
487
def json(jsonDataset: Dataset[String]): DataFrame
488
def parquet(paths: String*): DataFrame
489
def text(paths: String*): DataFrame
490
def textFile(paths: String*): Dataset[String]
491
def csv(paths: String*): DataFrame
492
def orc(paths: String*): DataFrame
493
def jdbc(url: String, table: String, properties: Properties): DataFrame
494
def table(tableName: String): DataFrame
495
}
496
497
/**
498
* Interface for saving DataFrames to external storage
499
*/
500
class DataFrameWriter[T] {
501
/** Set save mode */
502
def mode(saveMode: SaveMode): DataFrameWriter[T]
503
def mode(saveMode: String): DataFrameWriter[T]
504
/** Specify output format */
505
def format(source: String): DataFrameWriter[T]
506
/** Add output option */
507
def option(key: String, value: String): DataFrameWriter[T]
508
def option(key: String, value: Boolean): DataFrameWriter[T]
509
def option(key: String, value: Long): DataFrameWriter[T]
510
def option(key: String, value: Double): DataFrameWriter[T]
511
/** Add multiple options */
512
def options(options: Map[String, String]): DataFrameWriter[T]
513
/** Partition output by columns */
514
def partitionBy(colNames: String*): DataFrameWriter[T]
515
/** Bucket output by columns */
516
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
517
/** Sort within buckets */
518
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
519
520
/** Save data */
521
def save(): Unit
522
def save(path: String): Unit
523
/** Insert into existing table */
524
def insertInto(tableName: String): Unit
525
/** Save as table */
526
def saveAsTable(tableName: String): Unit
527
528
/** Format-specific methods */
529
def json(path: String): Unit
530
def parquet(path: String): Unit
531
def text(path: String): Unit
532
def csv(path: String): Unit
533
def orc(path: String): Unit
534
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
535
}
536
537
/**
538
* Save modes for DataFrameWriter
539
*/
540
object SaveMode extends Enumeration {
541
val Append, Overwrite, ErrorIfExists, Ignore = Value
542
}
543
```
544
545
**Usage Examples:**
546
547
```scala
548
// Reading data
549
val df = spark.read
550
.format("csv")
551
.option("header", "true")
552
.option("inferSchema", "true")
553
.load("data.csv")
554
555
// Reading with schema
556
import org.apache.spark.sql.types._
557
val schema = StructType(Array(
558
StructField("name", StringType, true),
559
StructField("age", IntegerType, true)
560
))
561
val df2 = spark.read.schema(schema).csv("data.csv")
562
563
// Writing data
564
df.write
565
.mode(SaveMode.Overwrite)
566
.option("header", "true")
567
.csv("output")
568
569
// Partitioned output
570
df.write
571
.partitionBy("year", "month")
572
.parquet("partitioned_output")
573
574
// Database operations
575
df.write
576
.format("jdbc")
577
.option("url", "jdbc:postgresql://localhost/test")
578
.option("dbtable", "people")
579
.option("user", "username")
580
.option("password", "password")
581
.save()
582
```
583
584
### Data Types and Schema
585
586
Schema definition and data type system.
587
588
```scala { .api }
589
/**
590
* Base class for data types
591
*/
592
abstract class DataType
593
594
/** Primitive types */
595
object StringType extends DataType
596
object IntegerType extends DataType
597
object LongType extends DataType
598
object DoubleType extends DataType
599
object FloatType extends DataType
600
object BooleanType extends DataType
601
object DateType extends DataType
602
object TimestampType extends DataType
603
object BinaryType extends DataType
604
605
/** Complex types */
606
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType
607
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType
608
case class StructType(fields: Array[StructField]) extends DataType
609
610
/**
611
* Field in a struct
612
*/
613
case class StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata)
614
615
/**
616
* Row in a DataFrame
617
*/
618
trait Row {
619
def length: Int
620
def size: Int
621
def get(i: Int): Any
622
def getString(i: Int): String
623
def getBoolean(i: Int): Boolean
624
def getInt(i: Int): Int
625
def getLong(i: Int): Long
626
def getFloat(i: Int): Float
627
def getDouble(i: Int): Double
628
def getAs[T](i: Int): T
629
def getAs[T](fieldName: String): T
630
def isNullAt(i: Int): Boolean
631
def toSeq: Seq[Any]
632
}
633
```
634
635
## Error Handling
636
637
Common SQL exceptions:
638
639
- `AnalysisException` - SQL analysis errors (invalid columns, type mismatches)
640
- `ParseException` - SQL parsing errors
641
- `StreamingQueryException` - Streaming query failures
642
- `SparkSQLException` - General SQL execution errors