0
# SQL Functions and Expressions
1
2
Spark SQL provides an extensive library of built-in functions for data manipulation, accessible through the `org.apache.spark.sql.functions` object and the `Column` class. These functions cover mathematical operations, string manipulation, date/time processing, aggregations, and complex data type operations.
3
4
## Column Class
5
6
```scala { .api }
7
class Column {
8
// Arithmetic operations
9
def +(other: Any): Column
10
def -(other: Any): Column
11
def *(other: Any): Column
12
def /(other: Any): Column
13
def %(other: Any): Column
14
def unary_-: Column
15
16
// Comparison operations
17
def ===(other: Any): Column
18
def !==(other: Any): Column
19
def >(other: Any): Column
20
def >=(other: Any): Column
21
def <(other: Any): Column
22
def <=(other: Any): Column
23
def <=> (other: Any): Column // Null-safe equality
24
25
// Logical operations
26
def &&(other: Column): Column
27
def ||(other: Column): Column
28
def unary_!: Column
29
30
// String operations
31
def contains(other: Any): Column
32
def startsWith(other: Column): Column
33
def startsWith(literal: String): Column
34
def endsWith(other: Column): Column
35
def endsWith(literal: String): Column
36
def rlike(literal: String): Column
37
def like(literal: String): Column
38
39
// Null operations
40
def isNull: Column
41
def isNotNull: Column
42
def isNaN: Column
43
44
// Type operations
45
def cast(to: DataType): Column
46
def cast(to: String): Column
47
def as(alias: String): Column
48
def as(alias: Symbol): Column
49
def name(alias: String): Column
50
51
// Collection operations
52
def getItem(key: Any): Column
53
def getField(fieldName: String): Column
54
55
// Sorting
56
def asc: Column
57
def desc: Column
58
def asc_nulls_first: Column
59
def asc_nulls_last: Column
60
def desc_nulls_first: Column
61
def desc_nulls_last: Column
62
63
// SQL expressions
64
def when(condition: Column, value: Any): Column
65
def otherwise(value: Any): Column
66
def over(window: WindowSpec): Column
67
def isin(list: Any*): Column
68
def between(lowerBound: Any, upperBound: Any): Column
69
}
70
```
71
72
## Core Functions
73
74
### Column Creation
75
76
```scala { .api }
77
// Column references
78
def col(colName: String): Column
79
def column(colName: String): Column
80
81
// Literal values
82
def lit(literal: Any): Column
83
84
// Input metadata
85
def input_file_name(): Column
86
def monotonically_increasing_id(): Column
87
def spark_partition_id(): Column
88
```
89
90
**Usage Examples:**
91
92
```scala
93
import org.apache.spark.sql.functions._
94
95
// Column references
96
val nameCol = col("name")
97
val ageCol = column("age")
98
99
// Literals
100
val constantValue = lit(42)
101
val stringLiteral = lit("Hello")
102
val dateLiteral = lit(java.sql.Date.valueOf("2023-01-01"))
103
104
// Metadata functions
105
val withFilename = df.withColumn("source_file", input_file_name())
106
val withId = df.withColumn("unique_id", monotonically_increasing_id())
107
```
108
109
## Mathematical Functions
110
111
```scala { .api }
112
// Basic math
113
def abs(e: Column): Column
114
def ceil(e: Column): Column
115
def floor(e: Column): Column
116
def round(e: Column): Column
117
def round(e: Column, scale: Int): Column
118
def signum(e: Column): Column
119
120
// Exponential and logarithmic
121
def exp(e: Column): Column
122
def expm1(e: Column): Column
123
def log(e: Column): Column
124
def log10(e: Column): Column
125
def log2(e: Column): Column
126
def log1p(e: Column): Column
127
def pow(l: Column, r: Column): Column
128
def pow(l: Column, r: Double): Column
129
def sqrt(e: Column): Column
130
131
// Trigonometric
132
def sin(e: Column): Column
133
def cos(e: Column): Column
134
def tan(e: Column): Column
135
def asin(e: Column): Column
136
def acos(e: Column): Column
137
def atan(e: Column): Column
138
def atan2(l: Column, r: Column): Column
139
def sinh(e: Column): Column
140
def cosh(e: Column): Column
141
def tanh(e: Column): Column
142
143
// Angle conversion
144
def degrees(e: Column): Column
145
def radians(e: Column): Column
146
147
// Random functions
148
def rand(): Column
149
def rand(seed: Long): Column
150
def randn(): Column
151
def randn(seed: Long): Column
152
```
153
154
**Usage Examples:**
155
156
```scala
157
// Basic calculations
158
val absValues = df.withColumn("abs_value", abs(col("amount")))
159
val rounded = df.withColumn("rounded_price", round(col("price"), 2))
160
161
// Power and logarithms
162
val squared = df.withColumn("squared", pow(col("value"), 2))
163
val logValues = df.withColumn("log_amount", log(col("amount")))
164
165
// Trigonometry for calculations
166
val distances = df.withColumn("distance",
167
sqrt(pow(col("x2") - col("x1"), 2) + pow(col("y2") - col("y1"), 2)))
168
169
// Random sampling
170
val withRandom = df.withColumn("random_score", rand(42))
171
```
172
173
## String Functions
174
175
```scala { .api }
176
// String manipulation
177
def concat(exprs: Column*): Column
178
def concat_ws(sep: String, exprs: Column*): Column
179
def format_string(format: String, arguments: Column*): Column
180
def length(e: Column): Column
181
def lower(e: Column): Column
182
def upper(e: Column): Column
183
def initcap(e: Column): Column
184
185
// Trimming and padding
186
def ltrim(e: Column): Column
187
def rtrim(e: Column): Column
188
def trim(e: Column): Column
189
def lpad(str: Column, len: Int, pad: String): Column
190
def rpad(str: Column, len: Int, pad: String): Column
191
192
// Substring operations
193
def substring(str: Column, pos: Int, len: Int): Column
194
def substring_index(str: Column, delim: String, count: Int): Column
195
def left(str: Column, len: Int): Column
196
def right(str: Column, len: Int): Column
197
198
// Regular expressions
199
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
200
def regexp_replace(e: Column, pattern: String, replacement: String): Column
201
def rlike(str: Column, regexp: Column): Column
202
203
// String testing
204
def ascii(e: Column): Column
205
def base64(e: Column): Column
206
def unbase64(e: Column): Column
207
def encode(value: Column, charset: String): Column
208
def decode(value: Column, charset: String): Column
209
210
// String splitting and parsing
211
def split(str: Column, pattern: String): Column
212
def split(str: Column, pattern: String, limit: Int): Column
213
214
// Hashing
215
def md5(e: Column): Column
216
def sha1(e: Column): Column
217
def sha2(e: Column, numBits: Int): Column
218
def crc32(e: Column): Column
219
def hash(cols: Column*): Column
220
def xxhash64(cols: Column*): Column
221
```
222
223
**Usage Examples:**
224
225
```scala
226
// String concatenation
227
val fullName = df.withColumn("full_name",
228
concat(col("first_name"), lit(" "), col("last_name")))
229
230
val csvData = df.withColumn("csv_row",
231
concat_ws(",", col("id"), col("name"), col("email")))
232
233
// String formatting
234
val formatted = df.withColumn("description",
235
format_string("User %s (ID: %d)", col("name"), col("id")))
236
237
// Text processing
238
val cleaned = df
239
.withColumn("trimmed", trim(col("description")))
240
.withColumn("upper_name", upper(col("name")))
241
.withColumn("name_length", length(col("name")))
242
243
// Regular expressions
244
val phoneExtract = df.withColumn("area_code",
245
regexp_extract(col("phone"), """(\d{3})-\d{3}-\d{4}""", 1))
246
247
val cleanedText = df.withColumn("clean_text",
248
regexp_replace(col("text"), "[^a-zA-Z0-9 ]", ""))
249
250
// String splitting
251
val nameParts = df.withColumn("name_parts", split(col("full_name"), " "))
252
253
// Hashing for data masking
254
val hashedEmail = df.withColumn("email_hash", sha2(col("email"), 256))
255
```
256
257
## Date and Time Functions
258
259
```scala { .api }
260
// Current date/time
261
def current_date(): Column
262
def current_timestamp(): Column
263
def now(): Column
264
265
// Date arithmetic
266
def date_add(start: Column, days: Int): Column
267
def date_sub(start: Column, days: Int): Column
268
def datediff(end: Column, start: Column): Column
269
def add_months(start: Column, numMonths: Int): Column
270
def months_between(end: Column, start: Column): Column
271
def months_between(end: Column, start: Column, roundOff: Boolean): Column
272
273
// Date extraction
274
def year(e: Column): Column
275
def quarter(e: Column): Column
276
def month(e: Column): Column
277
def dayofmonth(e: Column): Column
278
def dayofweek(e: Column): Column
279
def dayofyear(e: Column): Column
280
def hour(e: Column): Column
281
def minute(e: Column): Column
282
def second(e: Column): Column
283
def weekofyear(e: Column): Column
284
285
// Date formatting and parsing
286
def date_format(dateExpr: Column, format: String): Column
287
def from_unixtime(ut: Column): Column
288
def from_unixtime(ut: Column, f: String): Column
289
def unix_timestamp(): Column
290
def unix_timestamp(s: Column): Column
291
def unix_timestamp(s: Column, p: String): Column
292
def to_timestamp(s: Column): Column
293
def to_timestamp(s: Column, fmt: String): Column
294
def to_date(e: Column): Column
295
def to_date(e: Column, fmt: String): Column
296
297
// Date/time truncation
298
def trunc(date: Column, format: String): Column
299
def date_trunc(format: String, timestamp: Column): Column
300
301
// Time zones
302
def from_utc_timestamp(ts: Column, tz: String): Column
303
def to_utc_timestamp(ts: Column, tz: String): Column
304
```
305
306
**Usage Examples:**
307
308
```scala
309
// Current date and time
310
val withTimestamp = df.withColumn("processed_at", current_timestamp())
311
val withDate = df.withColumn("processed_date", current_date())
312
313
// Date calculations
314
val daysUntilDeadline = df.withColumn("days_left",
315
datediff(col("deadline"), current_date()))
316
317
val futureDate = df.withColumn("review_date",
318
add_months(col("created_date"), 6))
319
320
// Date part extraction
321
val withDateParts = df
322
.withColumn("year", year(col("created_date")))
323
.withColumn("month", month(col("created_date")))
324
.withColumn("day_of_week", dayofweek(col("created_date")))
325
326
// Date formatting
327
val formatted = df.withColumn("formatted_date",
328
date_format(col("timestamp"), "yyyy-MM-dd HH:mm"))
329
330
// Unix timestamp conversion
331
val unixTime = df.withColumn("unix_ts",
332
unix_timestamp(col("date_string"), "yyyy-MM-dd"))
333
334
val fromUnix = df.withColumn("readable_date",
335
from_unixtime(col("unix_timestamp")))
336
337
// Date parsing
338
val parsedDate = df.withColumn("parsed_date",
339
to_date(col("date_string"), "MM/dd/yyyy"))
340
341
// Time zone conversion
342
val utcTime = df.withColumn("utc_time",
343
to_utc_timestamp(col("local_time"), "America/New_York"))
344
```
345
346
## Aggregate Functions
347
348
```scala { .api }
349
// Basic aggregations
350
def count(e: Column): Column
351
def sum(e: Column): Column
352
def avg(e: Column): Column
353
def mean(e: Column): Column
354
def max(e: Column): Column
355
def min(e: Column): Column
356
357
// Statistical functions
358
def stddev(e: Column): Column
359
def stddev_pop(e: Column): Column
360
def stddev_samp(e: Column): Column
361
def variance(e: Column): Column
362
def var_pop(e: Column): Column
363
def var_samp(e: Column): Column
364
def skewness(e: Column): Column
365
def kurtosis(e: Column): Column
366
367
// Collection aggregations
368
def collect_list(e: Column): Column
369
def collect_set(e: Column): Column
370
371
// Distinct counting
372
def countDistinct(expr: Column, exprs: Column*): Column
373
def approx_count_distinct(e: Column): Column
374
def approx_count_distinct(e: Column, rsd: Double): Column
375
376
// First/last values
377
def first(e: Column): Column
378
def first(e: Column, ignoreNulls: Boolean): Column
379
def last(e: Column): Column
380
def last(e: Column, ignoreNulls: Boolean): Column
381
382
// Percentiles
383
def expr(expr: String): Column // For percentile_approx, etc.
384
```
385
386
**Usage Examples:**
387
388
```scala
389
// Basic aggregations
390
val summary = df.agg(
391
count(col("id")).alias("total_rows"),
392
sum(col("amount")).alias("total_amount"),
393
avg(col("amount")).alias("avg_amount"),
394
max(col("created_date")).alias("latest_date"),
395
min(col("created_date")).alias("earliest_date")
396
)
397
398
// Statistical analysis
399
val stats = df.agg(
400
stddev(col("score")).alias("std_dev"),
401
variance(col("score")).alias("variance"),
402
skewness(col("score")).alias("skewness"),
403
kurtosis(col("score")).alias("kurtosis")
404
)
405
406
// Collect values
407
val categories = df.agg(
408
collect_set(col("category")).alias("unique_categories"),
409
collect_list(col("name")).alias("all_names")
410
)
411
412
// Distinct counts
413
val uniqueCounts = df.agg(
414
countDistinct(col("user_id")).alias("unique_users"),
415
approx_count_distinct(col("session_id"), 0.05).alias("approx_sessions")
416
)
417
418
// First/last values (useful with ordering)
419
val firstLast = df
420
.orderBy(col("timestamp"))
421
.agg(
422
first(col("status")).alias("first_status"),
423
last(col("status")).alias("last_status")
424
)
425
```
426
427
## Conditional Functions
428
429
```scala { .api }
430
def when(condition: Column, value: Any): Column
431
def coalesce(e: Column*): Column
432
def isnull(e: Column): Column
433
def isnan(e: Column): Column
434
def nanvl(col1: Column, col2: Column): Column
435
def greatest(exprs: Column*): Column
436
def least(exprs: Column*): Column
437
```
438
439
**Usage Examples:**
440
441
```scala
442
// Conditional logic
443
val categorized = df.withColumn("age_group",
444
when(col("age") < 18, "Minor")
445
.when(col("age") < 65, "Adult")
446
.otherwise("Senior")
447
)
448
449
// Handle nulls
450
val withDefaults = df.withColumn("description",
451
coalesce(col("description"), lit("No description available")))
452
453
// Handle NaN values
454
val cleanNaN = df.withColumn("clean_score",
455
nanvl(col("score"), lit(0.0)))
456
457
// Find extreme values
458
val ranges = df.withColumn("max_value",
459
greatest(col("value1"), col("value2"), col("value3")))
460
```
461
462
## Array and Map Functions
463
464
```scala { .api }
465
// Array creation and manipulation
466
def array(cols: Column*): Column
467
def array_contains(column: Column, value: Any): Column
468
def array_distinct(e: Column): Column
469
def array_except(col1: Column, col2: Column): Column
470
def array_intersect(col1: Column, col2: Column): Column
471
def array_join(column: Column, delimiter: String): Column
472
def array_max(e: Column): Column
473
def array_min(e: Column): Column
474
def array_position(col: Column, value: Any): Column
475
def array_remove(col: Column, element: Any): Column
476
def array_repeat(col: Column, count: Int): Column
477
def array_sort(e: Column): Column
478
def array_union(col1: Column, col2: Column): Column
479
def arrays_overlap(a1: Column, a2: Column): Column
480
def arrays_zip(e: Column*): Column
481
def size(e: Column): Column
482
def slice(x: Column, start: Int, length: Int): Column
483
def sort_array(e: Column): Column
484
def sort_array(e: Column, asc: Boolean): Column
485
486
// Array explosion
487
def explode(e: Column): Column
488
def explode_outer(e: Column): Column
489
def posexplode(e: Column): Column
490
def posexplode_outer(e: Column): Column
491
492
// Map operations
493
def map(cols: Column*): Column
494
def map_keys(e: Column): Column
495
def map_values(e: Column): Column
496
def map_from_arrays(keys: Column, values: Column): Column
497
def map_from_entries(e: Column): Column
498
```
499
500
**Usage Examples:**
501
502
```scala
503
// Create arrays
504
val arrayCol = df.withColumn("scores_array",
505
array(col("score1"), col("score2"), col("score3")))
506
507
// Array operations
508
val arrayOps = df
509
.withColumn("has_zero", array_contains(col("scores"), 0))
510
.withColumn("unique_scores", array_distinct(col("scores")))
511
.withColumn("max_score", array_max(col("scores")))
512
.withColumn("array_size", size(col("scores")))
513
514
// Array to string
515
val joined = df.withColumn("scores_csv",
516
array_join(col("scores"), ","))
517
518
// Explode arrays to rows
519
val exploded = df.select(col("id"), explode(col("tags")).alias("tag"))
520
521
// Position-based explosion
522
val withPosition = df.select(col("id"),
523
posexplode(col("values")).alias(Seq("pos", "value")))
524
525
// Map operations
526
val mapCol = df.withColumn("properties",
527
map(lit("name"), col("name"), lit("age"), col("age")))
528
529
val keys = df.withColumn("prop_keys", map_keys(col("properties")))
530
val values = df.withColumn("prop_values", map_values(col("properties")))
531
```
532
533
## Window Functions
534
535
```scala { .api }
536
// Ranking functions
537
def row_number(): Column
538
def rank(): Column
539
def dense_rank(): Column
540
def percent_rank(): Column
541
def ntile(n: Int): Column
542
def cume_dist(): Column
543
544
// Offset functions
545
def lag(e: Column, offset: Int): Column
546
def lag(e: Column, offset: Int, defaultValue: Any): Column
547
def lead(e: Column, offset: Int): Column
548
def lead(e: Column, offset: Int, defaultValue: Any): Column
549
```
550
551
**Usage Examples:**
552
553
```scala
554
import org.apache.spark.sql.expressions.Window
555
556
// Window specifications
557
val windowSpec = Window.partitionBy("department").orderBy(col("salary").desc)
558
val rowsWindow = Window.partitionBy("category").orderBy("date")
559
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
560
561
// Ranking
562
val ranked = df.withColumn("salary_rank",
563
rank().over(windowSpec))
564
565
val rowNumbers = df.withColumn("row_num",
566
row_number().over(windowSpec))
567
568
val percentiles = df.withColumn("salary_percentile",
569
percent_rank().over(windowSpec))
570
571
// Lag/Lead for time series
572
val withLag = df.withColumn("prev_value",
573
lag(col("value"), 1).over(Window.partitionBy("id").orderBy("timestamp")))
574
575
val withLead = df.withColumn("next_value",
576
lead(col("value"), 1, 0).over(Window.partitionBy("id").orderBy("timestamp")))
577
578
// Running aggregations
579
val runningSum = df.withColumn("running_total",
580
sum(col("amount")).over(rowsWindow))
581
```
582
583
## Type Conversion Functions
584
585
```scala { .api }
586
def cast(col: Column, dataType: DataType): Column
587
def cast(col: Column, dataType: String): Column
588
```
589
590
**Usage Examples:**
591
592
```scala
593
import org.apache.spark.sql.types._
594
595
// Type casting
596
val converted = df
597
.withColumn("age_int", col("age").cast(IntegerType))
598
.withColumn("score_double", col("score").cast("double"))
599
.withColumn("created_date", col("created_timestamp").cast(DateType))
600
601
// String to numeric conversions
602
val numeric = df
603
.withColumn("amount_decimal", col("amount_str").cast(DecimalType(10, 2)))
604
.withColumn("count_long", col("count_str").cast(LongType))
605
606
// Date/time conversions
607
val dates = df
608
.withColumn("date_from_string", to_date(col("date_str"), "yyyy-MM-dd"))
609
.withColumn("timestamp_from_string", to_timestamp(col("ts_str"), "yyyy-MM-dd HH:mm:ss"))
610
```
611
612
## JSON Functions
613
614
```scala { .api }
615
def from_json(e: Column, schema: DataType): Column
616
def from_json(e: Column, schema: String): Column
617
def to_json(e: Column): Column
618
def json_tuple(json: Column, fields: String*): Column
619
def get_json_object(e: Column, path: String): Column
620
```
621
622
**Usage Examples:**
623
624
```scala
625
import org.apache.spark.sql.types._
626
627
// JSON parsing
628
val jsonSchema = StructType(Array(
629
StructField("name", StringType, true),
630
StructField("age", IntegerType, true)
631
))
632
633
val parsed = df.withColumn("parsed_json",
634
from_json(col("json_string"), jsonSchema))
635
636
// Extract JSON fields
637
val name = df.withColumn("name",
638
get_json_object(col("json_data"), "$.name"))
639
640
// Convert to JSON
641
val jsonified = df.withColumn("row_as_json", to_json(struct("*")))
642
643
// JSON tuple extraction
644
val extracted = df.select(col("id"),
645
json_tuple(col("json_data"), "name", "age").alias(Seq("name", "age")))
646
```