0
# SQL and DataFrames
1
2
Spark SQL provides structured data processing capabilities through DataFrames and Datasets, along with a SQL query engine. It offers strong integration with various data sources and formats, and includes both batch and streaming processing capabilities.
3
4
## Package Information
5
6
SQL and DataFrame functionality is available through:
7
8
```scala
9
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row, Column}
10
import org.apache.spark.sql.functions._
11
import org.apache.spark.sql.types._
12
```
13
14
## Basic Usage
15
16
```scala
17
import org.apache.spark.sql.SparkSession
18
import org.apache.spark.sql.functions._
19
20
// Create Spark session
21
val spark = SparkSession.builder()
22
.appName("SQL Example")
23
.master("local[*]")
24
.getOrCreate()
25
26
// Read data
27
val df = spark.read
28
.option("header", "true")
29
.option("inferSchema", "true")
30
.csv("path/to/data.csv")
31
32
// DataFrame operations
33
val result = df
34
.select("name", "age", "salary")
35
.filter(col("age") > 25)
36
.groupBy("department")
37
.agg(avg("salary").alias("avg_salary"))
38
.orderBy(desc("avg_salary"))
39
40
result.show()
41
42
// SQL queries
43
df.createOrReplaceTempView("employees")
44
val sqlResult = spark.sql("""
45
SELECT department, AVG(salary) as avg_salary
46
FROM employees
47
WHERE age > 25
48
GROUP BY department
49
ORDER BY avg_salary DESC
50
""")
51
52
spark.stop()
53
```
54
55
## Capabilities
56
57
### Spark Session
58
59
The unified entry point for Spark SQL functionality, replacing SQLContext and HiveContext.
60
61
```scala { .api }
62
class SparkSession private(sparkContext: SparkContext, existingSharedState: Option[SharedState]) {
63
// Data reading
64
def read: DataFrameReader
65
def readStream: DataStreamReader
66
67
// SQL execution
68
def sql(sqlText: String): DataFrame
69
def table(tableName: String): DataFrame
70
71
// DataFrame creation
72
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
73
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
74
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
75
def emptyDataFrame: DataFrame
76
def range(end: Long): Dataset[java.lang.Long]
77
def range(start: Long, end: Long): Dataset[java.lang.Long]
78
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
79
80
// Catalog and metadata
81
def catalog: Catalog
82
def conf: RuntimeConfig
83
def sessionState: SessionState
84
def sharedState: SharedState
85
86
// Streaming
87
def streams: StreamingQueryManager
88
89
// Resources and control
90
def sparkContext: SparkContext
91
def version: String
92
def stop(): Unit
93
def close(): Unit
94
def newSession(): SparkSession
95
}
96
97
object SparkSession {
98
def builder(): Builder
99
def active: SparkSession
100
def getActiveSession: Option[SparkSession]
101
def getDefaultSession: Option[SparkSession]
102
def setActiveSession(session: SparkSession): Unit
103
def clearActiveSession(): Unit
104
def setDefaultSession(session: SparkSession): Unit
105
def clearDefaultSession(): Unit
106
}
107
108
class Builder {
109
def appName(name: String): Builder
110
def master(master: String): Builder
111
def config(key: String, value: String): Builder
112
def config(key: String, value: Long): Builder
113
def config(key: String, value: Double): Builder
114
def config(key: String, value: Boolean): Builder
115
def config(conf: SparkConf): Builder
116
def enableHiveSupport(): Builder
117
def getOrCreate(): SparkSession
118
}
119
```
120
121
Usage example:
122
123
```scala
124
val spark = SparkSession.builder()
125
.appName("My Application")
126
.master("local[4]")
127
.config("spark.sql.adaptive.enabled", "true")
128
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
129
.enableHiveSupport()
130
.getOrCreate()
131
```
132
133
### DataFrames and Datasets
134
135
DataFrames are Datasets of Row objects, providing a programming abstraction and DSL for structured data manipulation.
136
137
```scala { .api }
138
abstract class Dataset[T] extends Serializable {
139
// Column selection and projection
140
def select(cols: Column*): DataFrame
141
def select(col: String, cols: String*): DataFrame
142
def selectExpr(exprs: String*): DataFrame
143
def drop(colName: String): DataFrame
144
def drop(colNames: String*): DataFrame
145
def drop(col: Column): DataFrame
146
def withColumn(colName: String, col: Column): DataFrame
147
def withColumnRenamed(existingName: String, newName: String): DataFrame
148
149
// Filtering and conditions
150
def filter(condition: Column): Dataset[T]
151
def filter(conditionExpr: String): Dataset[T]
152
def where(condition: Column): Dataset[T]
153
154
// Grouping and aggregation
155
def groupBy(cols: Column*): RelationalGroupedDataset
156
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
157
def rollup(cols: Column*): RelationalGroupedDataset
158
def cube(cols: Column*): RelationalGroupedDataset
159
def agg(expr: Column, exprs: Column*): DataFrame
160
def agg(exprs: Map[String, String]): DataFrame
161
162
// Joins
163
def join(right: Dataset[_]): DataFrame
164
def join(right: Dataset[_], usingColumn: String): DataFrame
165
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
166
def join(right: Dataset[_], joinExprs: Column): DataFrame
167
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
168
def crossJoin(right: Dataset[_]): DataFrame
169
170
// Set operations
171
def union(other: Dataset[T]): Dataset[T]
172
def unionAll(other: Dataset[T]): Dataset[T]
173
def unionByName(other: Dataset[T]): Dataset[T]
174
def intersect(other: Dataset[T]): Dataset[T]
175
def intersectAll(other: Dataset[T]): Dataset[T]
176
def except(other: Dataset[T]): Dataset[T]
177
def exceptAll(other: Dataset[T]): Dataset[T]
178
179
// Sorting
180
def sort(sortCol: String, sortCols: String*): Dataset[T]
181
def sort(sortExprs: Column*): Dataset[T]
182
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
183
def orderBy(sortExprs: Column*): Dataset[T]
184
185
// Sampling and limiting
186
def sample(fraction: Double): Dataset[T]
187
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
188
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
189
def limit(n: Int): Dataset[T]
190
191
// Actions
192
def collect(): Array[T]
193
def collectAsList(): java.util.List[T]
194
def count(): Long
195
def describe(cols: String*): DataFrame
196
def first(): T
197
def head(): T
198
def head(n: Int): Array[T]
199
def take(n: Int): Array[T]
200
def takeAsList(n: Int): java.util.List[T]
201
def tail(n: Int): Array[T]
202
def foreach(f: T => Unit): Unit
203
def foreachPartition(f: Iterator[T] => Unit): Unit
204
205
// Display
206
def show(): Unit
207
def show(numRows: Int): Unit
208
def show(numRows: Int, truncate: Boolean): Unit
209
def show(numRows: Int, truncate: Int): Unit
210
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
211
212
// Schema and metadata
213
def schema: StructType
214
def printSchema(): Unit
215
def dtypes: Array[(String, String)]
216
def columns: Array[String]
217
218
// Persistence
219
def cache(): Dataset[T]
220
def persist(): Dataset[T]
221
def persist(newLevel: StorageLevel): Dataset[T]
222
def unpersist(): Dataset[T]
223
def unpersist(blocking: Boolean): Dataset[T]
224
225
// Type conversion
226
def as[U : Encoder]: Dataset[U]
227
def alias(alias: String): Dataset[T]
228
229
// I/O
230
def write: DataFrameWriter[T]
231
def writeStream: DataStreamWriter[T]
232
233
// Advanced transformations
234
def unpivot(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame
235
def melt(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame
236
def transpose(): DataFrame
237
def observe(name: String, expr: Column, exprs: Column*): Dataset[T]
238
def lateralJoin(tableFunctionCall: Column): DataFrame
239
240
// Utility operations
241
def exists(condition: Column): Boolean
242
def scalar(): Any
243
244
// SQL operations
245
def createOrReplaceTempView(viewName: String): Unit
246
def createGlobalTempView(viewName: String): Unit
247
def createTempView(viewName: String): Unit
248
}
249
250
type DataFrame = Dataset[Row]
251
```
252
253
### Column Operations
254
255
Represents a column in a DataFrame and provides methods for column expressions.
256
257
```scala { .api }
258
class Column(expr: Expression) {
259
// Comparison operators
260
def ===(other: Any): Column
261
def !==(other: Any): Column
262
def >(other: Any): Column
263
def >=(other: Any): Column
264
def <(other: Any): Column
265
def <=(other: Any): Column
266
def <=> (other: Any): Column
267
268
// Logical operators
269
def &&(other: Any): Column
270
def ||(other: Any): Column
271
def unary_!: Column
272
273
// Arithmetic operators
274
def +(other: Any): Column
275
def -(other: Any): Column
276
def *(other: Any): Column
277
def /(other: Any): Column
278
def %(other: Any): Column
279
280
// Null handling
281
def isNull: Column
282
def isNotNull: Column
283
def isNaN: Column
284
285
// String operations
286
def contains(other: Any): Column
287
def startsWith(other: Column): Column
288
def startsWith(literal: String): Column
289
def endsWith(other: Column): Column
290
def endsWith(literal: String): Column
291
def substr(startPos: Column, len: Column): Column
292
def substr(startPos: Int, len: Int): Column
293
def like(literal: String): Column
294
def rlike(literal: String): Column
295
296
// Array operations
297
def getItem(key: Any): Column
298
def getField(fieldName: String): Column
299
300
// Type conversion
301
def cast(to: DataType): Column
302
def cast(to: String): Column
303
304
// Naming
305
def alias(alias: String): Column
306
def as(alias: String): Column
307
def as(alias: String, metadata: Metadata): Column
308
def name(alias: String): Column
309
310
// Sorting
311
def asc: Column
312
def asc_nulls_first: Column
313
def asc_nulls_last: Column
314
def desc: Column
315
def desc_nulls_first: Column
316
def desc_nulls_last: Column
317
318
// Window functions
319
def over(): Column
320
def over(window: WindowSpec): Column
321
}
322
```
323
324
### Built-in Functions
325
326
The functions object provides a rich set of built-in functions for DataFrame operations.
327
328
```scala { .api }
329
object functions {
330
// Column creation
331
def col(colName: String): Column
332
def column(colName: String): Column
333
def lit(literal: Any): Column
334
def typedLit[T : TypeTag](literal: T): Column
335
336
// Conditional expressions
337
def when(condition: Column, value: Any): Column
338
def coalesce(cols: Column*): Column
339
def isnull(col: Column): Column
340
def nanvl(col1: Column, col2: Column): Column
341
342
// Aggregate functions
343
def count(e: Column): Column
344
def count(columnName: String): TypedColumn[Any, Long]
345
def countDistinct(expr: Column, exprs: Column*): Column
346
def approx_count_distinct(e: Column): Column
347
def sum(e: Column): Column
348
def sum(columnName: String): TypedColumn[Any, java.lang.Double]
349
def avg(e: Column): Column
350
def avg(columnName: String): TypedColumn[Any, java.lang.Double]
351
def mean(e: Column): Column
352
def min(e: Column): Column
353
def max(e: Column): Column
354
def first(e: Column): Column
355
def first(e: Column, ignoreNulls: Boolean): Column
356
def last(e: Column): Column
357
def last(e: Column, ignoreNulls: Boolean): Column
358
def collect_list(e: Column): Column
359
def collect_set(e: Column): Column
360
361
// String functions
362
def ascii(e: Column): Column
363
def base64(e: Column): Column
364
def concat(exprs: Column*): Column
365
def concat_ws(sep: String, exprs: Column*): Column
366
def length(e: Column): Column
367
def lower(e: Column): Column
368
def upper(e: Column): Column
369
def ltrim(e: Column): Column
370
def rtrim(e: Column): Column
371
def trim(e: Column): Column
372
def regexp_extract(e: Column, pattern: String, idx: Int): Column
373
def regexp_replace(e: Column, pattern: String, replacement: String): Column
374
def split(str: Column, pattern: String): Column
375
def substring(str: Column, pos: Int, len: Int): Column
376
377
// Math functions
378
def abs(e: Column): Column
379
def acos(e: Column): Column
380
def asin(e: Column): Column
381
def atan(e: Column): Column
382
def atan2(y: Column, x: Column): Column
383
def ceil(e: Column): Column
384
def cos(e: Column): Column
385
def exp(e: Column): Column
386
def floor(e: Column): Column
387
def log(e: Column): Column
388
def log10(e: Column): Column
389
def pow(l: Column, r: Column): Column
390
def round(e: Column): Column
391
def round(e: Column, scale: Int): Column
392
def sin(e: Column): Column
393
def sqrt(e: Column): Column
394
def tan(e: Column): Column
395
396
// Date and time functions
397
def current_date(): Column
398
def current_timestamp(): Column
399
def date_add(start: Column, days: Int): Column
400
def date_sub(start: Column, days: Int): Column
401
def datediff(end: Column, start: Column): Column
402
def date_format(dateExpr: Column, format: String): Column
403
def dayofmonth(e: Column): Column
404
def dayofweek(e: Column): Column
405
def dayofyear(e: Column): Column
406
def hour(e: Column): Column
407
def minute(e: Column): Column
408
def month(e: Column): Column
409
def quarter(e: Column): Column
410
def second(e: Column): Column
411
def to_date(e: Column): Column
412
def to_date(e: Column, fmt: String): Column
413
def to_timestamp(s: Column): Column
414
def to_timestamp(s: Column, fmt: String): Column
415
def unix_timestamp(): Column
416
def unix_timestamp(s: Column): Column
417
def unix_timestamp(s: Column, p: String): Column
418
def year(e: Column): Column
419
420
// Array functions
421
def array(cols: Column*): Column
422
def array_contains(column: Column, value: Any): Column
423
def array_distinct(e: Column): Column
424
def array_max(e: Column): Column
425
def array_min(e: Column): Column
426
def array_position(column: Column, value: Any): Column
427
def array_remove(column: Column, element: Any): Column
428
def array_sort(e: Column): Column
429
def arrays_overlap(a1: Column, a2: Column): Column
430
def explode(e: Column): Column
431
def explode_outer(e: Column): Column
432
def posexplode(e: Column): Column
433
def posexplode_outer(e: Column): Column
434
def size(e: Column): Column
435
def slice(x: Column, start: Int, length: Int): Column
436
def sort_array(e: Column): Column
437
def sort_array(e: Column, asc: Boolean): Column
438
439
// Map functions
440
def map(cols: Column*): Column
441
def map_keys(e: Column): Column
442
def map_values(e: Column): Column
443
444
// Struct functions
445
def struct(cols: Column*): Column
446
447
// Window functions
448
def row_number(): Column
449
def rank(): Column
450
def dense_rank(): Column
451
def percent_rank(): Column
452
def ntile(n: Int): Column
453
def cume_dist(): Column
454
def lag(e: Column, offset: Int): Column
455
def lag(e: Column, offset: Int, defaultValue: Any): Column
456
def lead(e: Column, offset: Int): Column
457
def lead(e: Column, offset: Int, defaultValue: Any): Column
458
459
// UDF creation
460
def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction
461
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction
462
}
463
```
464
465
### Data Types
466
467
Spark SQL data type system for schema definition.
468
469
```scala { .api }
470
abstract class DataType extends AbstractDataType {
471
def json: String
472
def prettyJson: String
473
def simpleString: String
474
def catalogString: String
475
def sql: String
476
}
477
478
object DataTypes {
479
def createArrayType(elementType: DataType): ArrayType
480
def createMapType(keyType: DataType, valueType: DataType): MapType
481
def createStructField(name: String, dataType: DataType, nullable: Boolean): StructField
482
def createStructType(fields: Array[StructField]): StructType
483
484
val StringType: StringType
485
val BinaryType: BinaryType
486
val BooleanType: BooleanType
487
val DateType: DateType
488
val TimestampType: TimestampType
489
val CalendarIntervalType: CalendarIntervalType
490
val DoubleType: DoubleType
491
val FloatType: FloatType
492
val ByteType: ByteType
493
val IntegerType: IntegerType
494
val LongType: LongType
495
val ShortType: ShortType
496
val NullType: NullType
497
}
498
499
case class StructType(fields: Array[StructField]) extends DataType {
500
def add(field: StructField): StructType
501
def add(name: String, dataType: DataType): StructType
502
def add(name: String, dataType: DataType, nullable: Boolean): StructType
503
def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
504
def apply(name: String): StructField
505
def apply(index: Int): StructField
506
def fieldNames: Array[String]
507
def length: Int
508
def size: Int
509
}
510
511
case class StructField(
512
name: String,
513
dataType: DataType,
514
nullable: Boolean = true,
515
metadata: Metadata = Metadata.empty
516
) {
517
def getComment(): Option[String]
518
}
519
520
case class ArrayType(elementType: DataType, containsNull: Boolean = true) extends DataType
521
522
case class MapType(
523
keyType: DataType,
524
valueType: DataType,
525
valueContainsNull: Boolean = true
526
) extends DataType
527
```
528
529
### Row
530
531
Represents a row in a DataFrame.
532
533
```scala { .api }
534
trait Row extends Serializable {
535
def size: Int
536
def length: Int
537
def schema: StructType
538
def apply(i: Int): Any
539
def get(i: Int): Any
540
def getAs[T](i: Int): T
541
def getAs[T](fieldName: String): T
542
def fieldIndex(name: String): Int
543
544
// Type-specific getters
545
def isNullAt(i: Int): Boolean
546
def getBoolean(i: Int): Boolean
547
def getByte(i: Int): Byte
548
def getShort(i: Int): Short
549
def getInt(i: Int): Int
550
def getLong(i: Int): Long
551
def getFloat(i: Int): Float
552
def getDouble(i: Int): Double
553
def getString(i: Int): String
554
def getDecimal(i: Int): java.math.BigDecimal
555
def getDate(i: Int): java.sql.Date
556
def getTimestamp(i: Int): java.sql.Timestamp
557
def getSeq[T](i: Int): Seq[T]
558
def getList[T](i: Int): java.util.List[T]
559
def getMap[K, V](i: Int): scala.collection.Map[K, V]
560
def getJavaMap[K, V](i: Int): java.util.Map[K, V]
561
def getStruct(i: Int): Row
562
563
// Array conversion
564
def toSeq: Seq[Any]
565
def copy(): Row
566
}
567
568
object Row {
569
def empty: Row
570
def apply(values: Any*): Row
571
def fromSeq(values: Seq[Any]): Row
572
def fromTuple(tuple: Product): Row
573
def merge(rows: Row*): Row
574
def unapplySeq(row: Row): Some[Seq[Any]]
575
}
576
```
577
578
### Data I/O
579
580
#### DataFrameReader
581
582
Interface for loading DataFrames from external storage systems.
583
584
```scala { .api }
585
class DataFrameReader {
586
// Configuration
587
def format(source: String): DataFrameReader
588
def schema(schema: StructType): DataFrameReader
589
def schema(schemaString: String): DataFrameReader
590
def option(key: String, value: String): DataFrameReader
591
def option(key: String, value: Boolean): DataFrameReader
592
def option(key: String, value: Long): DataFrameReader
593
def option(key: String, value: Double): DataFrameReader
594
def options(options: scala.collection.Map[String, String]): DataFrameReader
595
def options(options: java.util.Map[String, String]): DataFrameReader
596
597
// Data sources
598
def csv(path: String): DataFrame
599
def csv(paths: String*): DataFrame
600
def json(path: String): DataFrame
601
def json(paths: String*): DataFrame
602
def parquet(path: String): DataFrame
603
def parquet(paths: String*): DataFrame
604
def orc(path: String): DataFrame
605
def orc(paths: String*): DataFrame
606
def text(path: String): DataFrame
607
def text(paths: String*): DataFrame
608
def textFile(path: String): Dataset[String]
609
def textFile(paths: String*): Dataset[String]
610
def table(tableName: String): DataFrame
611
def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame
612
613
// Generic load
614
def load(): DataFrame
615
def load(path: String): DataFrame
616
def load(paths: String*): DataFrame
617
}
618
```
619
620
#### DataFrameWriter
621
622
Interface for saving DataFrames to external storage systems.
623
624
```scala { .api }
625
class DataFrameWriter[T] {
626
// Configuration
627
def mode(saveMode: SaveMode): DataFrameWriter[T]
628
def mode(saveMode: String): DataFrameWriter[T]
629
def format(source: String): DataFrameWriter[T]
630
def option(key: String, value: String): DataFrameWriter[T]
631
def option(key: String, value: Boolean): DataFrameWriter[T]
632
def option(key: String, value: Long): DataFrameWriter[T]
633
def option(key: String, value: Double): DataFrameWriter[T]
634
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
635
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
636
def partitionBy(colNames: String*): DataFrameWriter[T]
637
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
638
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
639
640
// Data sources
641
def csv(path: String): Unit
642
def json(path: String): Unit
643
def parquet(path: String): Unit
644
def orc(path: String): Unit
645
def text(path: String): Unit
646
def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit
647
def insertInto(tableName: String): Unit
648
def saveAsTable(tableName: String): Unit
649
650
// Generic save
651
def save(): Unit
652
def save(path: String): Unit
653
}
654
655
object SaveMode extends Enumeration {
656
val Append, Overwrite, ErrorIfExists, Ignore = Value
657
}
658
```
659
660
Usage example:
661
662
```scala
663
// Reading data
664
val df = spark.read
665
.format("csv")
666
.option("header", "true")
667
.option("inferSchema", "true")
668
.load("path/to/file.csv")
669
670
// Writing data
671
df.write
672
.format("parquet")
673
.mode("overwrite")
674
.option("compression", "snappy")
675
.partitionBy("year", "month")
676
.save("path/to/output")
677
```
678
679
### Grouped Data Operations
680
681
```scala { .api }
682
class RelationalGroupedDataset protected(
683
df: DataFrame,
684
groupingExprs: Seq[Expression],
685
groupType: RelationalGroupedDataset.GroupType
686
) {
687
// Aggregation functions
688
def agg(expr: Column, exprs: Column*): DataFrame
689
def agg(exprs: Map[String, String]): DataFrame
690
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
691
def count(): DataFrame
692
def mean(colNames: String*): DataFrame
693
def max(colNames: String*): DataFrame
694
def min(colNames: String*): DataFrame
695
def sum(colNames: String*): DataFrame
696
def avg(colNames: String*): DataFrame
697
698
// Pivot operations
699
def pivot(pivotColumn: String): RelationalGroupedDataset
700
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
701
}
702
```
703
704
Usage example:
705
706
```scala
707
// Group by operations
708
val grouped = df.groupBy("department", "level")
709
.agg(
710
count("*").alias("employee_count"),
711
avg("salary").alias("avg_salary"),
712
max("salary").alias("max_salary")
713
)
714
715
// Pivot operations
716
val pivoted = df.groupBy("department")
717
.pivot("level")
718
.agg(avg("salary"))
719
```