or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md

functions-expressions.mddocs/

0

# SQL Functions and Expressions

1

2

Spark SQL provides an extensive library of built-in functions for data manipulation, accessible through the `org.apache.spark.sql.functions` object and the `Column` class. These functions cover mathematical operations, string manipulation, date/time processing, aggregations, and complex data type operations.

3

4

## Column Class

5

6

```scala { .api }

7

class Column {

8

// Arithmetic operations

9

def +(other: Any): Column

10

def -(other: Any): Column

11

def *(other: Any): Column

12

def /(other: Any): Column

13

def %(other: Any): Column

14

def unary_-: Column

15

16

// Comparison operations

17

def ===(other: Any): Column

18

def !==(other: Any): Column

19

def >(other: Any): Column

20

def >=(other: Any): Column

21

def <(other: Any): Column

22

def <=(other: Any): Column

23

def <=> (other: Any): Column // Null-safe equality

24

25

// Logical operations

26

def &&(other: Column): Column

27

def ||(other: Column): Column

28

def unary_!: Column

29

30

// String operations

31

def contains(other: Any): Column

32

def startsWith(other: Column): Column

33

def startsWith(literal: String): Column

34

def endsWith(other: Column): Column

35

def endsWith(literal: String): Column

36

def rlike(literal: String): Column

37

def like(literal: String): Column

38

39

// Null operations

40

def isNull: Column

41

def isNotNull: Column

42

def isNaN: Column

43

44

// Type operations

45

def cast(to: DataType): Column

46

def cast(to: String): Column

47

def as(alias: String): Column

48

def as(alias: Symbol): Column

49

def name(alias: String): Column

50

51

// Collection operations

52

def getItem(key: Any): Column

53

def getField(fieldName: String): Column

54

55

// Sorting

56

def asc: Column

57

def desc: Column

58

def asc_nulls_first: Column

59

def asc_nulls_last: Column

60

def desc_nulls_first: Column

61

def desc_nulls_last: Column

62

63

// SQL expressions

64

def when(condition: Column, value: Any): Column

65

def otherwise(value: Any): Column

66

def over(window: WindowSpec): Column

67

def isin(list: Any*): Column

68

def between(lowerBound: Any, upperBound: Any): Column

69

}

70

```

71

72

## Core Functions

73

74

### Column Creation

75

76

```scala { .api }

77

// Column references

78

def col(colName: String): Column

79

def column(colName: String): Column

80

81

// Literal values

82

def lit(literal: Any): Column

83

84

// Input metadata

85

def input_file_name(): Column

86

def monotonically_increasing_id(): Column

87

def spark_partition_id(): Column

88

```

89

90

**Usage Examples:**

91

92

```scala

93

import org.apache.spark.sql.functions._

94

95

// Column references

96

val nameCol = col("name")

97

val ageCol = column("age")

98

99

// Literals

100

val constantValue = lit(42)

101

val stringLiteral = lit("Hello")

102

val dateLiteral = lit(java.sql.Date.valueOf("2023-01-01"))

103

104

// Metadata functions

105

val withFilename = df.withColumn("source_file", input_file_name())

106

val withId = df.withColumn("unique_id", monotonically_increasing_id())

107

```

108

109

## Mathematical Functions

110

111

```scala { .api }

112

// Basic math

113

def abs(e: Column): Column

114

def ceil(e: Column): Column

115

def floor(e: Column): Column

116

def round(e: Column): Column

117

def round(e: Column, scale: Int): Column

118

def signum(e: Column): Column

119

120

// Exponential and logarithmic

121

def exp(e: Column): Column

122

def expm1(e: Column): Column

123

def log(e: Column): Column

124

def log10(e: Column): Column

125

def log2(e: Column): Column

126

def log1p(e: Column): Column

127

def pow(l: Column, r: Column): Column

128

def pow(l: Column, r: Double): Column

129

def sqrt(e: Column): Column

130

131

// Trigonometric

132

def sin(e: Column): Column

133

def cos(e: Column): Column

134

def tan(e: Column): Column

135

def asin(e: Column): Column

136

def acos(e: Column): Column

137

def atan(e: Column): Column

138

def atan2(l: Column, r: Column): Column

139

def sinh(e: Column): Column

140

def cosh(e: Column): Column

141

def tanh(e: Column): Column

142

143

// Angle conversion

144

def degrees(e: Column): Column

145

def radians(e: Column): Column

146

147

// Random functions

148

def rand(): Column

149

def rand(seed: Long): Column

150

def randn(): Column

151

def randn(seed: Long): Column

152

```

153

154

**Usage Examples:**

155

156

```scala

157

// Basic calculations

158

val absValues = df.withColumn("abs_value", abs(col("amount")))

159

val rounded = df.withColumn("rounded_price", round(col("price"), 2))

160

161

// Power and logarithms

162

val squared = df.withColumn("squared", pow(col("value"), 2))

163

val logValues = df.withColumn("log_amount", log(col("amount")))

164

165

// Trigonometry for calculations

166

val distances = df.withColumn("distance",

167

sqrt(pow(col("x2") - col("x1"), 2) + pow(col("y2") - col("y1"), 2)))

168

169

// Random sampling

170

val withRandom = df.withColumn("random_score", rand(42))

171

```

172

173

## String Functions

174

175

```scala { .api }

176

// String manipulation

177

def concat(exprs: Column*): Column

178

def concat_ws(sep: String, exprs: Column*): Column

179

def format_string(format: String, arguments: Column*): Column

180

def length(e: Column): Column

181

def lower(e: Column): Column

182

def upper(e: Column): Column

183

def initcap(e: Column): Column

184

185

// Trimming and padding

186

def ltrim(e: Column): Column

187

def rtrim(e: Column): Column

188

def trim(e: Column): Column

189

def lpad(str: Column, len: Int, pad: String): Column

190

def rpad(str: Column, len: Int, pad: String): Column

191

192

// Substring operations

193

def substring(str: Column, pos: Int, len: Int): Column

194

def substring_index(str: Column, delim: String, count: Int): Column

195

def left(str: Column, len: Int): Column

196

def right(str: Column, len: Int): Column

197

198

// Regular expressions

199

def regexp_extract(e: Column, exp: String, groupIdx: Int): Column

200

def regexp_replace(e: Column, pattern: String, replacement: String): Column

201

def rlike(str: Column, regexp: Column): Column

202

203

// String testing

204

def ascii(e: Column): Column

205

def base64(e: Column): Column

206

def unbase64(e: Column): Column

207

def encode(value: Column, charset: String): Column

208

def decode(value: Column, charset: String): Column

209

210

// String splitting and parsing

211

def split(str: Column, pattern: String): Column

212

def split(str: Column, pattern: String, limit: Int): Column

213

214

// Hashing

215

def md5(e: Column): Column

216

def sha1(e: Column): Column

217

def sha2(e: Column, numBits: Int): Column

218

def crc32(e: Column): Column

219

def hash(cols: Column*): Column

220

def xxhash64(cols: Column*): Column

221

```

222

223

**Usage Examples:**

224

225

```scala

226

// String concatenation

227

val fullName = df.withColumn("full_name",

228

concat(col("first_name"), lit(" "), col("last_name")))

229

230

val csvData = df.withColumn("csv_row",

231

concat_ws(",", col("id"), col("name"), col("email")))

232

233

// String formatting

234

val formatted = df.withColumn("description",

235

format_string("User %s (ID: %d)", col("name"), col("id")))

236

237

// Text processing

238

val cleaned = df

239

.withColumn("trimmed", trim(col("description")))

240

.withColumn("upper_name", upper(col("name")))

241

.withColumn("name_length", length(col("name")))

242

243

// Regular expressions

244

val phoneExtract = df.withColumn("area_code",

245

regexp_extract(col("phone"), """(\d{3})-\d{3}-\d{4}""", 1))

246

247

val cleanedText = df.withColumn("clean_text",

248

regexp_replace(col("text"), "[^a-zA-Z0-9 ]", ""))

249

250

// String splitting

251

val nameParts = df.withColumn("name_parts", split(col("full_name"), " "))

252

253

// Hashing for data masking

254

val hashedEmail = df.withColumn("email_hash", sha2(col("email"), 256))

255

```

256

257

## Date and Time Functions

258

259

```scala { .api }

260

// Current date/time

261

def current_date(): Column

262

def current_timestamp(): Column

263

def now(): Column

264

265

// Date arithmetic

266

def date_add(start: Column, days: Int): Column

267

def date_sub(start: Column, days: Int): Column

268

def datediff(end: Column, start: Column): Column

269

def add_months(start: Column, numMonths: Int): Column

270

def months_between(end: Column, start: Column): Column

271

def months_between(end: Column, start: Column, roundOff: Boolean): Column

272

273

// Date extraction

274

def year(e: Column): Column

275

def quarter(e: Column): Column

276

def month(e: Column): Column

277

def dayofmonth(e: Column): Column

278

def dayofweek(e: Column): Column

279

def dayofyear(e: Column): Column

280

def hour(e: Column): Column

281

def minute(e: Column): Column

282

def second(e: Column): Column

283

def weekofyear(e: Column): Column

284

285

// Date formatting and parsing

286

def date_format(dateExpr: Column, format: String): Column

287

def from_unixtime(ut: Column): Column

288

def from_unixtime(ut: Column, f: String): Column

289

def unix_timestamp(): Column

290

def unix_timestamp(s: Column): Column

291

def unix_timestamp(s: Column, p: String): Column

292

def to_timestamp(s: Column): Column

293

def to_timestamp(s: Column, fmt: String): Column

294

def to_date(e: Column): Column

295

def to_date(e: Column, fmt: String): Column

296

297

// Date/time truncation

298

def trunc(date: Column, format: String): Column

299

def date_trunc(format: String, timestamp: Column): Column

300

301

// Time zones

302

def from_utc_timestamp(ts: Column, tz: String): Column

303

def to_utc_timestamp(ts: Column, tz: String): Column

304

```

305

306

**Usage Examples:**

307

308

```scala

309

// Current date and time

310

val withTimestamp = df.withColumn("processed_at", current_timestamp())

311

val withDate = df.withColumn("processed_date", current_date())

312

313

// Date calculations

314

val daysUntilDeadline = df.withColumn("days_left",

315

datediff(col("deadline"), current_date()))

316

317

val futureDate = df.withColumn("review_date",

318

add_months(col("created_date"), 6))

319

320

// Date part extraction

321

val withDateParts = df

322

.withColumn("year", year(col("created_date")))

323

.withColumn("month", month(col("created_date")))

324

.withColumn("day_of_week", dayofweek(col("created_date")))

325

326

// Date formatting

327

val formatted = df.withColumn("formatted_date",

328

date_format(col("timestamp"), "yyyy-MM-dd HH:mm"))

329

330

// Unix timestamp conversion

331

val unixTime = df.withColumn("unix_ts",

332

unix_timestamp(col("date_string"), "yyyy-MM-dd"))

333

334

val fromUnix = df.withColumn("readable_date",

335

from_unixtime(col("unix_timestamp")))

336

337

// Date parsing

338

val parsedDate = df.withColumn("parsed_date",

339

to_date(col("date_string"), "MM/dd/yyyy"))

340

341

// Time zone conversion

342

val utcTime = df.withColumn("utc_time",

343

to_utc_timestamp(col("local_time"), "America/New_York"))

344

```

345

346

## Aggregate Functions

347

348

```scala { .api }

349

// Basic aggregations

350

def count(e: Column): Column

351

def sum(e: Column): Column

352

def avg(e: Column): Column

353

def mean(e: Column): Column

354

def max(e: Column): Column

355

def min(e: Column): Column

356

357

// Statistical functions

358

def stddev(e: Column): Column

359

def stddev_pop(e: Column): Column

360

def stddev_samp(e: Column): Column

361

def variance(e: Column): Column

362

def var_pop(e: Column): Column

363

def var_samp(e: Column): Column

364

def skewness(e: Column): Column

365

def kurtosis(e: Column): Column

366

367

// Collection aggregations

368

def collect_list(e: Column): Column

369

def collect_set(e: Column): Column

370

371

// Distinct counting

372

def countDistinct(expr: Column, exprs: Column*): Column

373

def approx_count_distinct(e: Column): Column

374

def approx_count_distinct(e: Column, rsd: Double): Column

375

376

// First/last values

377

def first(e: Column): Column

378

def first(e: Column, ignoreNulls: Boolean): Column

379

def last(e: Column): Column

380

def last(e: Column, ignoreNulls: Boolean): Column

381

382

// Percentiles

383

def expr(expr: String): Column // For percentile_approx, etc.

384

```

385

386

**Usage Examples:**

387

388

```scala

389

// Basic aggregations

390

val summary = df.agg(

391

count(col("id")).alias("total_rows"),

392

sum(col("amount")).alias("total_amount"),

393

avg(col("amount")).alias("avg_amount"),

394

max(col("created_date")).alias("latest_date"),

395

min(col("created_date")).alias("earliest_date")

396

)

397

398

// Statistical analysis

399

val stats = df.agg(

400

stddev(col("score")).alias("std_dev"),

401

variance(col("score")).alias("variance"),

402

skewness(col("score")).alias("skewness"),

403

kurtosis(col("score")).alias("kurtosis")

404

)

405

406

// Collect values

407

val categories = df.agg(

408

collect_set(col("category")).alias("unique_categories"),

409

collect_list(col("name")).alias("all_names")

410

)

411

412

// Distinct counts

413

val uniqueCounts = df.agg(

414

countDistinct(col("user_id")).alias("unique_users"),

415

approx_count_distinct(col("session_id"), 0.05).alias("approx_sessions")

416

)

417

418

// First/last values (useful with ordering)

419

val firstLast = df

420

.orderBy(col("timestamp"))

421

.agg(

422

first(col("status")).alias("first_status"),

423

last(col("status")).alias("last_status")

424

)

425

```

426

427

## Conditional Functions

428

429

```scala { .api }

430

def when(condition: Column, value: Any): Column

431

def coalesce(e: Column*): Column

432

def isnull(e: Column): Column

433

def isnan(e: Column): Column

434

def nanvl(col1: Column, col2: Column): Column

435

def greatest(exprs: Column*): Column

436

def least(exprs: Column*): Column

437

```

438

439

**Usage Examples:**

440

441

```scala

442

// Conditional logic

443

val categorized = df.withColumn("age_group",

444

when(col("age") < 18, "Minor")

445

.when(col("age") < 65, "Adult")

446

.otherwise("Senior")

447

)

448

449

// Handle nulls

450

val withDefaults = df.withColumn("description",

451

coalesce(col("description"), lit("No description available")))

452

453

// Handle NaN values

454

val cleanNaN = df.withColumn("clean_score",

455

nanvl(col("score"), lit(0.0)))

456

457

// Find extreme values

458

val ranges = df.withColumn("max_value",

459

greatest(col("value1"), col("value2"), col("value3")))

460

```

461

462

## Array and Map Functions

463

464

```scala { .api }

465

// Array creation and manipulation

466

def array(cols: Column*): Column

467

def array_contains(column: Column, value: Any): Column

468

def array_distinct(e: Column): Column

469

def array_except(col1: Column, col2: Column): Column

470

def array_intersect(col1: Column, col2: Column): Column

471

def array_join(column: Column, delimiter: String): Column

472

def array_max(e: Column): Column

473

def array_min(e: Column): Column

474

def array_position(col: Column, value: Any): Column

475

def array_remove(col: Column, element: Any): Column

476

def array_repeat(col: Column, count: Int): Column

477

def array_sort(e: Column): Column

478

def array_union(col1: Column, col2: Column): Column

479

def arrays_overlap(a1: Column, a2: Column): Column

480

def arrays_zip(e: Column*): Column

481

def size(e: Column): Column

482

def slice(x: Column, start: Int, length: Int): Column

483

def sort_array(e: Column): Column

484

def sort_array(e: Column, asc: Boolean): Column

485

486

// Array explosion

487

def explode(e: Column): Column

488

def explode_outer(e: Column): Column

489

def posexplode(e: Column): Column

490

def posexplode_outer(e: Column): Column

491

492

// Map operations

493

def map(cols: Column*): Column

494

def map_keys(e: Column): Column

495

def map_values(e: Column): Column

496

def map_from_arrays(keys: Column, values: Column): Column

497

def map_from_entries(e: Column): Column

498

```

499

500

**Usage Examples:**

501

502

```scala

503

// Create arrays

504

val arrayCol = df.withColumn("scores_array",

505

array(col("score1"), col("score2"), col("score3")))

506

507

// Array operations

508

val arrayOps = df

509

.withColumn("has_zero", array_contains(col("scores"), 0))

510

.withColumn("unique_scores", array_distinct(col("scores")))

511

.withColumn("max_score", array_max(col("scores")))

512

.withColumn("array_size", size(col("scores")))

513

514

// Array to string

515

val joined = df.withColumn("scores_csv",

516

array_join(col("scores"), ","))

517

518

// Explode arrays to rows

519

val exploded = df.select(col("id"), explode(col("tags")).alias("tag"))

520

521

// Position-based explosion

522

val withPosition = df.select(col("id"),

523

posexplode(col("values")).alias(Seq("pos", "value")))

524

525

// Map operations

526

val mapCol = df.withColumn("properties",

527

map(lit("name"), col("name"), lit("age"), col("age")))

528

529

val keys = df.withColumn("prop_keys", map_keys(col("properties")))

530

val values = df.withColumn("prop_values", map_values(col("properties")))

531

```

532

533

## Window Functions

534

535

```scala { .api }

536

// Ranking functions

537

def row_number(): Column

538

def rank(): Column

539

def dense_rank(): Column

540

def percent_rank(): Column

541

def ntile(n: Int): Column

542

def cume_dist(): Column

543

544

// Offset functions

545

def lag(e: Column, offset: Int): Column

546

def lag(e: Column, offset: Int, defaultValue: Any): Column

547

def lead(e: Column, offset: Int): Column

548

def lead(e: Column, offset: Int, defaultValue: Any): Column

549

```

550

551

**Usage Examples:**

552

553

```scala

554

import org.apache.spark.sql.expressions.Window

555

556

// Window specifications

557

val windowSpec = Window.partitionBy("department").orderBy(col("salary").desc)

558

val rowsWindow = Window.partitionBy("category").orderBy("date")

559

.rowsBetween(Window.unboundedPreceding, Window.currentRow)

560

561

// Ranking

562

val ranked = df.withColumn("salary_rank",

563

rank().over(windowSpec))

564

565

val rowNumbers = df.withColumn("row_num",

566

row_number().over(windowSpec))

567

568

val percentiles = df.withColumn("salary_percentile",

569

percent_rank().over(windowSpec))

570

571

// Lag/Lead for time series

572

val withLag = df.withColumn("prev_value",

573

lag(col("value"), 1).over(Window.partitionBy("id").orderBy("timestamp")))

574

575

val withLead = df.withColumn("next_value",

576

lead(col("value"), 1, 0).over(Window.partitionBy("id").orderBy("timestamp")))

577

578

// Running aggregations

579

val runningSum = df.withColumn("running_total",

580

sum(col("amount")).over(rowsWindow))

581

```

582

583

## Type Conversion Functions

584

585

```scala { .api }

586

def cast(col: Column, dataType: DataType): Column

587

def cast(col: Column, dataType: String): Column

588

```

589

590

**Usage Examples:**

591

592

```scala

593

import org.apache.spark.sql.types._

594

595

// Type casting

596

val converted = df

597

.withColumn("age_int", col("age").cast(IntegerType))

598

.withColumn("score_double", col("score").cast("double"))

599

.withColumn("created_date", col("created_timestamp").cast(DateType))

600

601

// String to numeric conversions

602

val numeric = df

603

.withColumn("amount_decimal", col("amount_str").cast(DecimalType(10, 2)))

604

.withColumn("count_long", col("count_str").cast(LongType))

605

606

// Date/time conversions

607

val dates = df

608

.withColumn("date_from_string", to_date(col("date_str"), "yyyy-MM-dd"))

609

.withColumn("timestamp_from_string", to_timestamp(col("ts_str"), "yyyy-MM-dd HH:mm:ss"))

610

```

611

612

## JSON Functions

613

614

```scala { .api }

615

def from_json(e: Column, schema: DataType): Column

616

def from_json(e: Column, schema: String): Column

617

def to_json(e: Column): Column

618

def json_tuple(json: Column, fields: String*): Column

619

def get_json_object(e: Column, path: String): Column

620

```

621

622

**Usage Examples:**

623

624

```scala

625

import org.apache.spark.sql.types._

626

627

// JSON parsing

628

val jsonSchema = StructType(Array(

629

StructField("name", StringType, true),

630

StructField("age", IntegerType, true)

631

))

632

633

val parsed = df.withColumn("parsed_json",

634

from_json(col("json_string"), jsonSchema))

635

636

// Extract JSON fields

637

val name = df.withColumn("name",

638

get_json_object(col("json_data"), "$.name"))

639

640

// Convert to JSON

641

val jsonified = df.withColumn("row_as_json", to_json(struct("*")))

642

643

// JSON tuple extraction

644

val extracted = df.select(col("id"),

645

json_tuple(col("json_data"), "name", "age").alias(Seq("name", "age")))

646

```