0
# SQL and DataFrames
1
2
Spark SQL provides high-level APIs for working with structured data through DataFrames, Datasets, and SQL queries. It offers a programming abstraction called DataFrames and can act as a distributed SQL query engine.
3
4
## SparkSession
5
6
The entry point for all DataFrame and Dataset functionality in Spark SQL.
7
8
```scala { .api }
9
class SparkSession private(
10
@transient val sparkContext: SparkContext,
11
@transient private val existingSharedState: Option[SharedState] = None,
12
@transient private val parentSessionState: Option[SessionState] = None,
13
@transient private[sql] val extensions: SparkSessionExtensions = new SparkSessionExtensions) {
14
15
// SQL execution
16
def sql(sqlText: String): DataFrame
17
def table(tableName: String): DataFrame
18
19
// DataFrame creation
20
def read: DataFrameReader
21
def readStream: DataStreamReader
22
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
23
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
24
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
25
def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
26
def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]
27
def emptyDataset[T : Encoder]: Dataset[T]
28
def emptyDataFrame: DataFrame
29
30
// Range creation
31
def range(end: Long): Dataset[Long]
32
def range(start: Long, end: Long): Dataset[Long]
33
def range(start: Long, end: Long, step: Long): Dataset[Long]
34
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
35
36
// Session management
37
def newSession(): SparkSession
38
def cloneSession(): SparkSession
39
def close(): Unit
40
def stop(): Unit
41
42
// Configuration and metadata
43
def conf: RuntimeConfig
44
def catalog: Catalog
45
def udf: UDFRegistration
46
def streams: StreamingQueryManager
47
def version: String
48
49
// Legacy compatibility
50
def sqlContext: SQLContext
51
}
52
53
object SparkSession {
54
def builder(): Builder
55
def active: SparkSession
56
def getActiveSession: Option[SparkSession]
57
def getDefaultSession: Option[SparkSession]
58
def setActiveSession(session: SparkSession): Unit
59
def clearActiveSession(): Unit
60
def setDefaultSession(session: SparkSession): Unit
61
def clearDefaultSession(): Unit
62
}
63
64
class Builder {
65
def appName(name: String): Builder
66
def master(master: String): Builder
67
def config(key: String, value: String): Builder
68
def config(key: String, value: Long): Builder
69
def config(key: String, value: Double): Builder
70
def config(key: String, value: Boolean): Builder
71
def config(conf: SparkConf): Builder
72
def enableHiveSupport(): Builder
73
def getOrCreate(): SparkSession
74
}
75
```
76
77
### Usage Examples
78
79
```scala
80
import org.apache.spark.sql.SparkSession
81
82
// Create SparkSession
83
val spark = SparkSession.builder()
84
.appName("SQL Example")
85
.master("local[*]")
86
.config("spark.sql.adaptive.enabled", "true")
87
.getOrCreate()
88
89
// Execute SQL
90
val df = spark.sql("SELECT * FROM parquet.`/path/to/file.parquet`")
91
92
// Access table
93
val table = spark.table("my_table")
94
95
// Create DataFrame from data
96
val data = Seq(("Alice", 25), ("Bob", 30), ("Charlie", 35))
97
val df = spark.createDataFrame(data).toDF("name", "age")
98
99
spark.stop()
100
```
101
102
## Dataset and DataFrame
103
104
DataFrame is a type alias for Dataset[Row]. Dataset is the primary structured data abstraction in Spark SQL.
105
106
```scala { .api }
107
class Dataset[T] private[sql](
108
@transient val sparkSession: SparkSession,
109
@transient val queryExecution: QueryExecution,
110
encoder: Encoder[T]) {
111
112
// Column selection
113
def select(cols: Column*): DataFrame
114
def select(col: String, cols: String*): DataFrame
115
def selectExpr(exprs: String*): DataFrame
116
def drop(colName: String): DataFrame
117
def drop(colNames: String*): DataFrame
118
def drop(col: Column): DataFrame
119
def withColumn(colName: String, col: Column): DataFrame
120
def withColumnRenamed(existingName: String, newName: String): DataFrame
121
122
// Filtering
123
def filter(condition: Column): Dataset[T]
124
def filter(conditionExpr: String): Dataset[T]
125
def where(condition: Column): Dataset[T]
126
127
// Grouping and aggregation
128
def groupBy(cols: Column*): RelationalGroupedDataset
129
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
130
def rollup(cols: Column*): RelationalGroupedDataset
131
def cube(cols: Column*): RelationalGroupedDataset
132
def agg(expr: Column, exprs: Column*): DataFrame
133
def agg(exprs: Map[String, String]): DataFrame
134
135
// Sorting
136
def orderBy(sortExprs: Column*): Dataset[T]
137
def sort(sortExprs: Column*): Dataset[T]
138
def sortWithinPartitions(sortExprs: Column*): Dataset[T]
139
140
// Joins
141
def join(right: Dataset[_]): DataFrame
142
def join(right: Dataset[_], usingColumn: String): DataFrame
143
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
144
def join(right: Dataset[_], joinExprs: Column): DataFrame
145
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
146
def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
147
def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
148
def crossJoin(right: Dataset[_]): DataFrame
149
150
// Set operations
151
def union(other: Dataset[T]): Dataset[T]
152
def unionAll(other: Dataset[T]): Dataset[T]
153
def unionByName(other: Dataset[T]): Dataset[T]
154
def intersect(other: Dataset[T]): Dataset[T]
155
def intersectAll(other: Dataset[T]): Dataset[T]
156
def except(other: Dataset[T]): Dataset[T]
157
def exceptAll(other: Dataset[T]): Dataset[T]
158
159
// Deduplication
160
def distinct(): Dataset[T]
161
def dropDuplicates(): Dataset[T]
162
def dropDuplicates(colNames: Array[String]): Dataset[T]
163
def dropDuplicates(col1: String, cols: String*): Dataset[T]
164
165
// Sampling and limiting
166
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
167
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
168
def limit(n: Int): Dataset[T]
169
170
// Type conversion
171
def as[U : Encoder]: Dataset[U]
172
def alias(alias: String): Dataset[T]
173
174
// Transformations
175
def map[U : Encoder](func: T => U): Dataset[U]
176
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
177
def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U]
178
def filter(func: T => Boolean): Dataset[T]
179
def foreach(func: T => Unit): Unit
180
def foreachPartition(func: Iterator[T] => Unit): Unit
181
def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]
182
def reduce(func: (T, T) => T): T
183
def groupByKey[K : Encoder](func: T => K): KeyValueGroupedDataset[K, T]
184
185
// Partitioning
186
def coalesce(numPartitions: Int): Dataset[T]
187
def repartition(numPartitions: Int): Dataset[T]
188
def repartition(partitionExprs: Column*): Dataset[T]
189
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
190
def repartitionByRange(partitionExprs: Column*): Dataset[T]
191
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
192
193
// Actions
194
def collect(): Array[T]
195
def collectAsList(): java.util.List[T]
196
def count(): Long
197
def first(): T
198
def head(): T
199
def head(n: Int): Array[T]
200
def take(n: Int): Array[T]
201
def takeAsList(n: Int): java.util.List[T]
202
def tail(n: Int): Array[T]
203
def isEmpty: Boolean
204
205
// Display
206
def show(): Unit
207
def show(numRows: Int): Unit
208
def show(numRows: Int, truncate: Boolean): Unit
209
def show(numRows: Int, truncate: Int): Unit
210
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
211
212
// Persistence
213
def persist(): this.type
214
def persist(newLevel: StorageLevel): this.type
215
def cache(): this.type
216
def unpersist(): this.type
217
def unpersist(blocking: Boolean): this.type
218
def checkpoint(): Dataset[T]
219
def checkpoint(eager: Boolean): Dataset[T]
220
def localCheckpoint(): Dataset[T]
221
def localCheckpoint(eager: Boolean): Dataset[T]
222
223
// I/O
224
def write: DataFrameWriter[T]
225
def writeStream: DataStreamWriter[T]
226
227
// Metadata
228
def schema: StructType
229
def printSchema(): Unit
230
def printSchema(level: Int): Unit
231
def explain(): Unit
232
def explain(extended: Boolean): Unit
233
def explain(mode: String): Unit
234
def columns: Array[String]
235
def dtypes: Array[(String, String)]
236
def col(colName: String): Column
237
def apply(colName: String): Column
238
def isStreaming: Boolean
239
def isLocal: Boolean
240
def rdd: RDD[T]
241
def toJavaRDD: JavaRDD[T]
242
def javaRDD: JavaRDD[T]
243
244
// Statistics and NA handling
245
def stat: DataFrameStatFunctions
246
def na: DataFrameNaFunctions
247
def summary(statistics: String*): DataFrame
248
def describe(cols: String*): DataFrame
249
}
250
251
// Type alias
252
type DataFrame = Dataset[Row]
253
```
254
255
### Usage Examples
256
257
```scala
258
import org.apache.spark.sql.{SparkSession, functions => F}
259
import org.apache.spark.sql.types._
260
261
val spark = SparkSession.builder().appName("DataFrame Example").master("local[*]").getOrCreate()
262
import spark.implicits._
263
264
// Create DataFrame
265
val df = Seq(
266
("Alice", 25, "Engineer"),
267
("Bob", 30, "Manager"),
268
("Charlie", 35, "Engineer"),
269
("Diana", 28, "Designer")
270
).toDF("name", "age", "role")
271
272
// Basic operations
273
val adults = df.filter($"age" >= 30)
274
val selected = df.select($"name", $"age")
275
val withNewCol = df.withColumn("age_category", when($"age" < 30, "Young").otherwise("Adult"))
276
277
// Grouping and aggregation
278
val roleStats = df.groupBy("role").agg(
279
F.count("*").alias("count"),
280
F.avg("age").alias("avg_age"),
281
F.max("age").alias("max_age")
282
)
283
284
// Joins
285
val departments = Seq(
286
("Engineer", "Technology"),
287
("Manager", "Operations"),
288
("Designer", "Creative")
289
).toDF("role", "department")
290
291
val joined = df.join(departments, "role")
292
293
// SQL operations
294
df.createOrReplaceTempView("employees")
295
val sqlResult = spark.sql("SELECT role, AVG(age) as avg_age FROM employees GROUP BY role")
296
297
spark.stop()
298
```
299
300
## DataFrameReader
301
302
Interface for loading DataFrames from external storage.
303
304
```scala { .api }
305
class DataFrameReader private[sql](sparkSession: SparkSession) {
306
// Format specification
307
def format(source: String): DataFrameReader
308
309
// Options
310
def option(key: String, value: String): DataFrameReader
311
def option(key: String, value: Boolean): DataFrameReader
312
def option(key: String, value: Long): DataFrameReader
313
def option(key: String, value: Double): DataFrameReader
314
def options(options: scala.collection.Map[String, String]): DataFrameReader
315
def options(options: java.util.Map[String, String]): DataFrameReader
316
317
// Schema
318
def schema(schema: StructType): DataFrameReader
319
def schema(schemaString: String): DataFrameReader
320
321
// Loading methods
322
def load(): DataFrame
323
def load(path: String): DataFrame
324
def load(paths: String*): DataFrame
325
326
// Format-specific methods
327
def csv(path: String): DataFrame
328
def csv(paths: String*): DataFrame
329
def json(path: String): DataFrame
330
def json(paths: String*): DataFrame
331
def parquet(paths: String*): DataFrame
332
def orc(paths: String*): DataFrame
333
def text(paths: String*): DataFrame
334
def textFile(paths: String*): Dataset[String]
335
def table(tableName: String): DataFrame
336
337
// JDBC
338
def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame
339
def jdbc(url: String, table: String, predicates: Array[String],
340
connectionProperties: java.util.Properties): DataFrame
341
def jdbc(url: String, table: String, columnName: String, lowerBound: Long,
342
upperBound: Long, numPartitions: Int,
343
connectionProperties: java.util.Properties): DataFrame
344
}
345
```
346
347
### Usage Examples
348
349
```scala
350
// Read various formats
351
val csvDF = spark.read
352
.option("header", "true")
353
.option("inferSchema", "true")
354
.csv("path/to/file.csv")
355
356
val jsonDF = spark.read
357
.option("multiline", "true")
358
.json("path/to/file.json")
359
360
val parquetDF = spark.read.parquet("path/to/file.parquet")
361
362
// With custom schema
363
val schema = StructType(Seq(
364
StructField("name", StringType, nullable = false),
365
StructField("age", IntegerType, nullable = false),
366
StructField("salary", DoubleType, nullable = true)
367
))
368
369
val typedDF = spark.read
370
.schema(schema)
371
.csv("path/to/data.csv")
372
373
// JDBC
374
val jdbcDF = spark.read
375
.format("jdbc")
376
.option("url", "jdbc:postgresql://localhost:5432/mydb")
377
.option("dbtable", "employees")
378
.option("user", "username")
379
.option("password", "password")
380
.load()
381
```
382
383
## DataFrameWriter
384
385
Interface for saving DataFrames to external storage.
386
387
```scala { .api }
388
class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
389
// Mode specification
390
def mode(saveMode: SaveMode): DataFrameWriter[T]
391
def mode(saveMode: String): DataFrameWriter[T]
392
393
// Format specification
394
def format(source: String): DataFrameWriter[T]
395
396
// Options
397
def option(key: String, value: String): DataFrameWriter[T]
398
def option(key: String, value: Boolean): DataFrameWriter[T]
399
def option(key: String, value: Long): DataFrameWriter[T]
400
def option(key: String, value: Double): DataFrameWriter[T]
401
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
402
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
403
404
// Partitioning and bucketing
405
def partitionBy(colNames: String*): DataFrameWriter[T]
406
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
407
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
408
409
// Saving methods
410
def save(): Unit
411
def save(path: String): Unit
412
def insertInto(tableName: String): Unit
413
def saveAsTable(tableName: String): Unit
414
415
// Format-specific methods
416
def csv(path: String): Unit
417
def json(path: String): Unit
418
def parquet(path: String): Unit
419
def orc(path: String): Unit
420
def text(path: String): Unit
421
422
// JDBC
423
def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit
424
}
425
426
object SaveMode extends Enumeration {
427
type SaveMode = Value
428
val Overwrite, Append, ErrorIfExists, Ignore = Value
429
}
430
```
431
432
### Usage Examples
433
434
```scala
435
// Save in different formats
436
df.write
437
.mode(SaveMode.Overwrite)
438
.option("header", "true")
439
.csv("output/path")
440
441
df.write
442
.mode("append")
443
.parquet("output/parquet")
444
445
// Partitioned write
446
df.write
447
.partitionBy("year", "month")
448
.parquet("output/partitioned")
449
450
// Save as table
451
df.write
452
.mode(SaveMode.Overwrite)
453
.saveAsTable("my_table")
454
455
// JDBC write
456
df.write
457
.format("jdbc")
458
.option("url", "jdbc:postgresql://localhost:5432/mydb")
459
.option("dbtable", "output_table")
460
.option("user", "username")
461
.option("password", "password")
462
.save()
463
```
464
465
## Column and Functions
466
467
Column expressions and built-in functions for DataFrame operations.
468
469
```scala { .api }
470
class Column(val expr: Expression) {
471
// Comparison operators
472
def ===(other: Any): Column
473
def =!=(other: Any): Column
474
def >(other: Any): Column
475
def <(other: Any): Column
476
def >=(other: Any): Column
477
def <=(other: Any): Column
478
def <=> (other: Any): Column
479
480
// Logical operators
481
def &&(other: Column): Column
482
def ||(other: Column): Column
483
def unary_! : Column
484
485
// Null handling
486
def isNull: Column
487
def isNotNull: Column
488
def isNaN: Column
489
490
// Arithmetic operators
491
def +(other: Any): Column
492
def -(other: Any): Column
493
def *(other: Any): Column
494
def /(other: Any): Column
495
def %(other: Any): Column
496
497
// String operations
498
def like(literal: String): Column
499
def rlike(literal: String): Column
500
def startsWith(literal: String): Column
501
def endsWith(literal: String): Column
502
def contains(other: Any): Column
503
def substr(startPos: Column, len: Column): Column
504
def substr(startPos: Int, len: Int): Column
505
506
// Collection operations
507
def apply(extraction: Any): Column
508
def getField(fieldName: String): Column
509
def getItem(key: Any): Column
510
511
// Sorting
512
def asc: Column
513
def asc_nulls_first: Column
514
def asc_nulls_last: Column
515
def desc: Column
516
def desc_nulls_first: Column
517
def desc_nulls_last: Column
518
519
// Windowing
520
def over(window: WindowSpec): Column
521
522
// Type casting
523
def cast(to: DataType): Column
524
def cast(to: String): Column
525
526
// Aliasing
527
def alias(alias: String): Column
528
def as(alias: String): Column
529
def as(alias: Symbol): Column
530
def name(alias: String): Column
531
532
// Others
533
def when(condition: Column, value: Any): Column
534
def otherwise(value: Any): Column
535
def between(lowerBound: Any, upperBound: Any): Column
536
def isin(list: Any*): Column
537
}
538
```
539
540
### Built-in Functions
541
542
```scala { .api }
543
// Functions object contains hundreds of built-in functions
544
object functions {
545
// Column creation
546
def col(colName: String): Column
547
def column(colName: String): Column
548
def lit(literal: Any): Column
549
def typedLit[T : TypeTag](literal: T): Column
550
551
// Aggregate functions
552
def sum(e: Column): Column
553
def avg(e: Column): Column
554
def mean(e: Column): Column
555
def max(e: Column): Column
556
def min(e: Column): Column
557
def count(e: Column): Column
558
def countDistinct(expr: Column, exprs: Column*): Column
559
def approx_count_distinct(e: Column): Column
560
def first(e: Column): Column
561
def last(e: Column): Column
562
def collect_list(e: Column): Column
563
def collect_set(e: Column): Column
564
565
// String functions
566
def ascii(e: Column): Column
567
def base64(e: Column): Column
568
def concat(exprs: Column*): Column
569
def concat_ws(sep: String, exprs: Column*): Column
570
def decode(value: Column, charset: String): Column
571
def encode(value: Column, charset: String): Column
572
def format_number(x: Column, d: Int): Column
573
def format_string(format: String, arguments: Column*): Column
574
def initcap(e: Column): Column
575
def instr(str: Column, substring: String): Column
576
def length(e: Column): Column
577
def levenshtein(l: Column, r: Column): Column
578
def locate(substr: String, str: Column): Column
579
def lower(e: Column): Column
580
def lpad(str: Column, len: Int, pad: String): Column
581
def ltrim(e: Column): Column
582
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
583
def regexp_replace(e: Column, pattern: String, replacement: String): Column
584
def repeat(str: Column, n: Int): Column
585
def reverse(str: Column): Column
586
def rpad(str: Column, len: Int, pad: String): Column
587
def rtrim(e: Column): Column
588
def soundex(e: Column): Column
589
def split(str: Column, pattern: String): Column
590
def substring(str: Column, pos: Int, len: Int): Column
591
def substring_index(str: Column, delim: String, count: Int): Column
592
def translate(src: Column, matchingString: String, replaceString: String): Column
593
def trim(e: Column): Column
594
def upper(e: Column): Column
595
596
// Date and time functions
597
def current_date(): Column
598
def current_timestamp(): Column
599
def date_add(start: Column, days: Int): Column
600
def date_sub(start: Column, days: Int): Column
601
def datediff(end: Column, start: Column): Column
602
def date_format(dateExpr: Column, format: String): Column
603
def dayofmonth(e: Column): Column
604
def dayofweek(e: Column): Column
605
def dayofyear(e: Column): Column
606
def hour(e: Column): Column
607
def last_day(e: Column): Column
608
def minute(e: Column): Column
609
def month(e: Column): Column
610
def months_between(end: Column, start: Column): Column
611
def next_day(date: Column, dayOfWeek: String): Column
612
def quarter(e: Column): Column
613
def second(e: Column): Column
614
def to_date(e: Column): Column
615
def to_date(e: Column, fmt: String): Column
616
def to_timestamp(s: Column): Column
617
def to_timestamp(s: Column, fmt: String): Column
618
def trunc(date: Column, format: String): Column
619
def unix_timestamp(): Column
620
def unix_timestamp(s: Column): Column
621
def unix_timestamp(s: Column, p: String): Column
622
def from_unixtime(ut: Column): Column
623
def from_unixtime(ut: Column, f: String): Column
624
def weekofyear(e: Column): Column
625
def year(e: Column): Column
626
627
// Math functions
628
def abs(e: Column): Column
629
def acos(e: Column): Column
630
def asin(e: Column): Column
631
def atan(e: Column): Column
632
def atan2(y: Column, x: Column): Column
633
def ceil(e: Column): Column
634
def cos(e: Column): Column
635
def cosh(e: Column): Column
636
def exp(e: Column): Column
637
def expm1(e: Column): Column
638
def floor(e: Column): Column
639
def greatest(exprs: Column*): Column
640
def least(exprs: Column*): Column
641
def log(e: Column): Column
642
def log10(e: Column): Column
643
def log1p(e: Column): Column
644
def log2(e: Column): Column
645
def pow(l: Column, r: Column): Column
646
def rint(e: Column): Column
647
def round(e: Column): Column
648
def round(e: Column, scale: Int): Column
649
def signum(e: Column): Column
650
def sin(e: Column): Column
651
def sinh(e: Column): Column
652
def sqrt(e: Column): Column
653
def tan(e: Column): Column
654
def tanh(e: Column): Column
655
def toDegrees(e: Column): Column
656
def toRadians(e: Column): Column
657
658
// Conditional functions
659
def when(condition: Column, value: Any): Column
660
def coalesce(e: Column*): Column
661
def isnull(e: Column): Column
662
def isnan(e: Column): Column
663
def nanvl(col1: Column, col2: Column): Column
664
665
// Array functions
666
def array(cols: Column*): Column
667
def array_contains(column: Column, value: Any): Column
668
def array_distinct(e: Column): Column
669
def array_except(col1: Column, col2: Column): Column
670
def array_intersect(col1: Column, col2: Column): Column
671
def array_join(column: Column, delimiter: String): Column
672
def array_max(e: Column): Column
673
def array_min(e: Column): Column
674
def array_position(col: Column, value: Any): Column
675
def array_remove(column: Column, element: Any): Column
676
def array_repeat(left: Column, right: Column): Column
677
def array_sort(e: Column): Column
678
def array_union(col1: Column, col2: Column): Column
679
def arrays_overlap(a1: Column, a2: Column): Column
680
def arrays_zip(e: Column*): Column
681
def element_at(column: Column, extraction: Any): Column
682
def explode(e: Column): Column
683
def explode_outer(e: Column): Column
684
def flatten(e: Column): Column
685
def posexplode(e: Column): Column
686
def posexplode_outer(e: Column): Column
687
def reverse(e: Column): Column
688
def sequence(start: Column, stop: Column): Column
689
def shuffle(e: Column): Column
690
def size(e: Column): Column
691
def slice(x: Column, start: Int, length: Int): Column
692
def sort_array(e: Column): Column
693
def sort_array(e: Column, asc: Boolean): Column
694
695
// Map functions
696
def create_map(cols: Column*): Column
697
def map_concat(cols: Column*): Column
698
def map_from_arrays(keys: Column, values: Column): Column
699
def map_from_entries(e: Column): Column
700
def map_keys(e: Column): Column
701
def map_values(e: Column): Column
702
703
// JSON functions
704
def get_json_object(e: Column, path: String): Column
705
def json_tuple(json: Column, fields: String*): Column
706
def from_json(e: Column, schema: DataType): Column
707
def from_json(e: Column, schema: String): Column
708
def to_json(e: Column): Column
709
def schema_of_json(json: String): Column
710
def schema_of_json(json: Column): Column
711
712
// Window functions
713
def row_number(): Column
714
def dense_rank(): Column
715
def rank(): Column
716
def cume_dist(): Column
717
def percent_rank(): Column
718
def ntile(n: Int): Column
719
def lag(e: Column, offset: Int): Column
720
def lag(e: Column, offset: Int, defaultValue: Any): Column
721
def lead(e: Column, offset: Int): Column
722
def lead(e: Column, offset: Int, defaultValue: Any): Column
723
def first_value(e: Column): Column
724
def first_value(e: Column, ignoreNulls: Boolean): Column
725
def last_value(e: Column): Column
726
def last_value(e: Column, ignoreNulls: Boolean): Column
727
}
728
```
729
730
### Usage Examples
731
732
```scala
733
import org.apache.spark.sql.functions._
734
735
// String operations
736
val df = spark.range(1).select(
737
lit("Hello World").alias("text"),
738
lit(42).alias("number"),
739
current_timestamp().alias("now")
740
)
741
742
val processed = df.select(
743
upper($"text").alias("upper_text"),
744
substring($"text", 1, 5).alias("first_five"),
745
regexp_replace($"text", "World", "Spark").alias("replaced"),
746
round($"number" / 7.0, 2).alias("divided"),
747
date_format($"now", "yyyy-MM-dd").alias("date_only")
748
)
749
750
// Conditional logic
751
val categorized = df.select(
752
when($"number" > 50, "High")
753
.when($"number" > 20, "Medium")
754
.otherwise("Low")
755
.alias("category")
756
)
757
758
// Aggregations with grouping
759
val aggregated = df.groupBy("category").agg(
760
count("*").alias("count"),
761
avg("number").alias("avg_number"),
762
collect_list("text").alias("texts")
763
)
764
```