or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

sql-dataframes.mddocs/

0

# SQL and DataFrames

1

2

Spark SQL provides structured data processing capabilities through DataFrames and Datasets, along with a SQL query engine. It offers strong integration with various data sources and formats, and includes both batch and streaming processing capabilities.

3

4

## Package Information

5

6

SQL and DataFrame functionality is available through:

7

8

```scala

9

import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row, Column}

10

import org.apache.spark.sql.functions._

11

import org.apache.spark.sql.types._

12

```

13

14

## Basic Usage

15

16

```scala

17

import org.apache.spark.sql.SparkSession

18

import org.apache.spark.sql.functions._

19

20

// Create Spark session

21

val spark = SparkSession.builder()

22

.appName("SQL Example")

23

.master("local[*]")

24

.getOrCreate()

25

26

// Read data

27

val df = spark.read

28

.option("header", "true")

29

.option("inferSchema", "true")

30

.csv("path/to/data.csv")

31

32

// DataFrame operations

33

val result = df

34

.select("name", "age", "salary")

35

.filter(col("age") > 25)

36

.groupBy("department")

37

.agg(avg("salary").alias("avg_salary"))

38

.orderBy(desc("avg_salary"))

39

40

result.show()

41

42

// SQL queries

43

df.createOrReplaceTempView("employees")

44

val sqlResult = spark.sql("""

45

SELECT department, AVG(salary) as avg_salary

46

FROM employees

47

WHERE age > 25

48

GROUP BY department

49

ORDER BY avg_salary DESC

50

""")

51

52

spark.stop()

53

```

54

55

## Capabilities

56

57

### Spark Session

58

59

The unified entry point for Spark SQL functionality, replacing SQLContext and HiveContext.

60

61

```scala { .api }

62

class SparkSession private(sparkContext: SparkContext, existingSharedState: Option[SharedState]) {

63

// Data reading

64

def read: DataFrameReader

65

def readStream: DataStreamReader

66

67

// SQL execution

68

def sql(sqlText: String): DataFrame

69

def table(tableName: String): DataFrame

70

71

// DataFrame creation

72

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

73

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

74

def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

75

def emptyDataFrame: DataFrame

76

def range(end: Long): Dataset[java.lang.Long]

77

def range(start: Long, end: Long): Dataset[java.lang.Long]

78

def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

79

80

// Catalog and metadata

81

def catalog: Catalog

82

def conf: RuntimeConfig

83

def sessionState: SessionState

84

def sharedState: SharedState

85

86

// Streaming

87

def streams: StreamingQueryManager

88

89

// Resources and control

90

def sparkContext: SparkContext

91

def version: String

92

def stop(): Unit

93

def close(): Unit

94

def newSession(): SparkSession

95

}

96

97

object SparkSession {

98

def builder(): Builder

99

def active: SparkSession

100

def getActiveSession: Option[SparkSession]

101

def getDefaultSession: Option[SparkSession]

102

def setActiveSession(session: SparkSession): Unit

103

def clearActiveSession(): Unit

104

def setDefaultSession(session: SparkSession): Unit

105

def clearDefaultSession(): Unit

106

}

107

108

class Builder {

109

def appName(name: String): Builder

110

def master(master: String): Builder

111

def config(key: String, value: String): Builder

112

def config(key: String, value: Long): Builder

113

def config(key: String, value: Double): Builder

114

def config(key: String, value: Boolean): Builder

115

def config(conf: SparkConf): Builder

116

def enableHiveSupport(): Builder

117

def getOrCreate(): SparkSession

118

}

119

```

120

121

Usage example:

122

123

```scala

124

val spark = SparkSession.builder()

125

.appName("My Application")

126

.master("local[4]")

127

.config("spark.sql.adaptive.enabled", "true")

128

.config("spark.sql.adaptive.coalescePartitions.enabled", "true")

129

.enableHiveSupport()

130

.getOrCreate()

131

```

132

133

### DataFrames and Datasets

134

135

DataFrames are Datasets of Row objects, providing a programming abstraction and DSL for structured data manipulation.

136

137

```scala { .api }

138

abstract class Dataset[T] extends Serializable {

139

// Column selection and projection

140

def select(cols: Column*): DataFrame

141

def select(col: String, cols: String*): DataFrame

142

def selectExpr(exprs: String*): DataFrame

143

def drop(colName: String): DataFrame

144

def drop(colNames: String*): DataFrame

145

def drop(col: Column): DataFrame

146

def withColumn(colName: String, col: Column): DataFrame

147

def withColumnRenamed(existingName: String, newName: String): DataFrame

148

149

// Filtering and conditions

150

def filter(condition: Column): Dataset[T]

151

def filter(conditionExpr: String): Dataset[T]

152

def where(condition: Column): Dataset[T]

153

154

// Grouping and aggregation

155

def groupBy(cols: Column*): RelationalGroupedDataset

156

def groupBy(col1: String, cols: String*): RelationalGroupedDataset

157

def rollup(cols: Column*): RelationalGroupedDataset

158

def cube(cols: Column*): RelationalGroupedDataset

159

def agg(expr: Column, exprs: Column*): DataFrame

160

def agg(exprs: Map[String, String]): DataFrame

161

162

// Joins

163

def join(right: Dataset[_]): DataFrame

164

def join(right: Dataset[_], usingColumn: String): DataFrame

165

def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame

166

def join(right: Dataset[_], joinExprs: Column): DataFrame

167

def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

168

def crossJoin(right: Dataset[_]): DataFrame

169

170

// Set operations

171

def union(other: Dataset[T]): Dataset[T]

172

def unionAll(other: Dataset[T]): Dataset[T]

173

def unionByName(other: Dataset[T]): Dataset[T]

174

def intersect(other: Dataset[T]): Dataset[T]

175

def intersectAll(other: Dataset[T]): Dataset[T]

176

def except(other: Dataset[T]): Dataset[T]

177

def exceptAll(other: Dataset[T]): Dataset[T]

178

179

// Sorting

180

def sort(sortCol: String, sortCols: String*): Dataset[T]

181

def sort(sortExprs: Column*): Dataset[T]

182

def orderBy(sortCol: String, sortCols: String*): Dataset[T]

183

def orderBy(sortExprs: Column*): Dataset[T]

184

185

// Sampling and limiting

186

def sample(fraction: Double): Dataset[T]

187

def sample(withReplacement: Boolean, fraction: Double): Dataset[T]

188

def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]

189

def limit(n: Int): Dataset[T]

190

191

// Actions

192

def collect(): Array[T]

193

def collectAsList(): java.util.List[T]

194

def count(): Long

195

def describe(cols: String*): DataFrame

196

def first(): T

197

def head(): T

198

def head(n: Int): Array[T]

199

def take(n: Int): Array[T]

200

def takeAsList(n: Int): java.util.List[T]

201

def tail(n: Int): Array[T]

202

def foreach(f: T => Unit): Unit

203

def foreachPartition(f: Iterator[T] => Unit): Unit

204

205

// Display

206

def show(): Unit

207

def show(numRows: Int): Unit

208

def show(numRows: Int, truncate: Boolean): Unit

209

def show(numRows: Int, truncate: Int): Unit

210

def show(numRows: Int, truncate: Int, vertical: Boolean): Unit

211

212

// Schema and metadata

213

def schema: StructType

214

def printSchema(): Unit

215

def dtypes: Array[(String, String)]

216

def columns: Array[String]

217

218

// Persistence

219

def cache(): Dataset[T]

220

def persist(): Dataset[T]

221

def persist(newLevel: StorageLevel): Dataset[T]

222

def unpersist(): Dataset[T]

223

def unpersist(blocking: Boolean): Dataset[T]

224

225

// Type conversion

226

def as[U : Encoder]: Dataset[U]

227

def alias(alias: String): Dataset[T]

228

229

// I/O

230

def write: DataFrameWriter[T]

231

def writeStream: DataStreamWriter[T]

232

233

// Advanced transformations

234

def unpivot(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame

235

def melt(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame

236

def transpose(): DataFrame

237

def observe(name: String, expr: Column, exprs: Column*): Dataset[T]

238

def lateralJoin(tableFunctionCall: Column): DataFrame

239

240

// Utility operations

241

def exists(condition: Column): Boolean

242

def scalar(): Any

243

244

// SQL operations

245

def createOrReplaceTempView(viewName: String): Unit

246

def createGlobalTempView(viewName: String): Unit

247

def createTempView(viewName: String): Unit

248

}

249

250

type DataFrame = Dataset[Row]

251

```

252

253

### Column Operations

254

255

Represents a column in a DataFrame and provides methods for column expressions.

256

257

```scala { .api }

258

class Column(expr: Expression) {

259

// Comparison operators

260

def ===(other: Any): Column

261

def !==(other: Any): Column

262

def >(other: Any): Column

263

def >=(other: Any): Column

264

def <(other: Any): Column

265

def <=(other: Any): Column

266

def <=> (other: Any): Column

267

268

// Logical operators

269

def &&(other: Any): Column

270

def ||(other: Any): Column

271

def unary_!: Column

272

273

// Arithmetic operators

274

def +(other: Any): Column

275

def -(other: Any): Column

276

def *(other: Any): Column

277

def /(other: Any): Column

278

def %(other: Any): Column

279

280

// Null handling

281

def isNull: Column

282

def isNotNull: Column

283

def isNaN: Column

284

285

// String operations

286

def contains(other: Any): Column

287

def startsWith(other: Column): Column

288

def startsWith(literal: String): Column

289

def endsWith(other: Column): Column

290

def endsWith(literal: String): Column

291

def substr(startPos: Column, len: Column): Column

292

def substr(startPos: Int, len: Int): Column

293

def like(literal: String): Column

294

def rlike(literal: String): Column

295

296

// Array operations

297

def getItem(key: Any): Column

298

def getField(fieldName: String): Column

299

300

// Type conversion

301

def cast(to: DataType): Column

302

def cast(to: String): Column

303

304

// Naming

305

def alias(alias: String): Column

306

def as(alias: String): Column

307

def as(alias: String, metadata: Metadata): Column

308

def name(alias: String): Column

309

310

// Sorting

311

def asc: Column

312

def asc_nulls_first: Column

313

def asc_nulls_last: Column

314

def desc: Column

315

def desc_nulls_first: Column

316

def desc_nulls_last: Column

317

318

// Window functions

319

def over(): Column

320

def over(window: WindowSpec): Column

321

}

322

```

323

324

### Built-in Functions

325

326

The functions object provides a rich set of built-in functions for DataFrame operations.

327

328

```scala { .api }

329

object functions {

330

// Column creation

331

def col(colName: String): Column

332

def column(colName: String): Column

333

def lit(literal: Any): Column

334

def typedLit[T : TypeTag](literal: T): Column

335

336

// Conditional expressions

337

def when(condition: Column, value: Any): Column

338

def coalesce(cols: Column*): Column

339

def isnull(col: Column): Column

340

def nanvl(col1: Column, col2: Column): Column

341

342

// Aggregate functions

343

def count(e: Column): Column

344

def count(columnName: String): TypedColumn[Any, Long]

345

def countDistinct(expr: Column, exprs: Column*): Column

346

def approx_count_distinct(e: Column): Column

347

def sum(e: Column): Column

348

def sum(columnName: String): TypedColumn[Any, java.lang.Double]

349

def avg(e: Column): Column

350

def avg(columnName: String): TypedColumn[Any, java.lang.Double]

351

def mean(e: Column): Column

352

def min(e: Column): Column

353

def max(e: Column): Column

354

def first(e: Column): Column

355

def first(e: Column, ignoreNulls: Boolean): Column

356

def last(e: Column): Column

357

def last(e: Column, ignoreNulls: Boolean): Column

358

def collect_list(e: Column): Column

359

def collect_set(e: Column): Column

360

361

// String functions

362

def ascii(e: Column): Column

363

def base64(e: Column): Column

364

def concat(exprs: Column*): Column

365

def concat_ws(sep: String, exprs: Column*): Column

366

def length(e: Column): Column

367

def lower(e: Column): Column

368

def upper(e: Column): Column

369

def ltrim(e: Column): Column

370

def rtrim(e: Column): Column

371

def trim(e: Column): Column

372

def regexp_extract(e: Column, pattern: String, idx: Int): Column

373

def regexp_replace(e: Column, pattern: String, replacement: String): Column

374

def split(str: Column, pattern: String): Column

375

def substring(str: Column, pos: Int, len: Int): Column

376

377

// Math functions

378

def abs(e: Column): Column

379

def acos(e: Column): Column

380

def asin(e: Column): Column

381

def atan(e: Column): Column

382

def atan2(y: Column, x: Column): Column

383

def ceil(e: Column): Column

384

def cos(e: Column): Column

385

def exp(e: Column): Column

386

def floor(e: Column): Column

387

def log(e: Column): Column

388

def log10(e: Column): Column

389

def pow(l: Column, r: Column): Column

390

def round(e: Column): Column

391

def round(e: Column, scale: Int): Column

392

def sin(e: Column): Column

393

def sqrt(e: Column): Column

394

def tan(e: Column): Column

395

396

// Date and time functions

397

def current_date(): Column

398

def current_timestamp(): Column

399

def date_add(start: Column, days: Int): Column

400

def date_sub(start: Column, days: Int): Column

401

def datediff(end: Column, start: Column): Column

402

def date_format(dateExpr: Column, format: String): Column

403

def dayofmonth(e: Column): Column

404

def dayofweek(e: Column): Column

405

def dayofyear(e: Column): Column

406

def hour(e: Column): Column

407

def minute(e: Column): Column

408

def month(e: Column): Column

409

def quarter(e: Column): Column

410

def second(e: Column): Column

411

def to_date(e: Column): Column

412

def to_date(e: Column, fmt: String): Column

413

def to_timestamp(s: Column): Column

414

def to_timestamp(s: Column, fmt: String): Column

415

def unix_timestamp(): Column

416

def unix_timestamp(s: Column): Column

417

def unix_timestamp(s: Column, p: String): Column

418

def year(e: Column): Column

419

420

// Array functions

421

def array(cols: Column*): Column

422

def array_contains(column: Column, value: Any): Column

423

def array_distinct(e: Column): Column

424

def array_max(e: Column): Column

425

def array_min(e: Column): Column

426

def array_position(column: Column, value: Any): Column

427

def array_remove(column: Column, element: Any): Column

428

def array_sort(e: Column): Column

429

def arrays_overlap(a1: Column, a2: Column): Column

430

def explode(e: Column): Column

431

def explode_outer(e: Column): Column

432

def posexplode(e: Column): Column

433

def posexplode_outer(e: Column): Column

434

def size(e: Column): Column

435

def slice(x: Column, start: Int, length: Int): Column

436

def sort_array(e: Column): Column

437

def sort_array(e: Column, asc: Boolean): Column

438

439

// Map functions

440

def map(cols: Column*): Column

441

def map_keys(e: Column): Column

442

def map_values(e: Column): Column

443

444

// Struct functions

445

def struct(cols: Column*): Column

446

447

// Window functions

448

def row_number(): Column

449

def rank(): Column

450

def dense_rank(): Column

451

def percent_rank(): Column

452

def ntile(n: Int): Column

453

def cume_dist(): Column

454

def lag(e: Column, offset: Int): Column

455

def lag(e: Column, offset: Int, defaultValue: Any): Column

456

def lead(e: Column, offset: Int): Column

457

def lead(e: Column, offset: Int, defaultValue: Any): Column

458

459

// UDF creation

460

def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction

461

def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction

462

}

463

```

464

465

### Data Types

466

467

Spark SQL data type system for schema definition.

468

469

```scala { .api }

470

abstract class DataType extends AbstractDataType {

471

def json: String

472

def prettyJson: String

473

def simpleString: String

474

def catalogString: String

475

def sql: String

476

}

477

478

object DataTypes {

479

def createArrayType(elementType: DataType): ArrayType

480

def createMapType(keyType: DataType, valueType: DataType): MapType

481

def createStructField(name: String, dataType: DataType, nullable: Boolean): StructField

482

def createStructType(fields: Array[StructField]): StructType

483

484

val StringType: StringType

485

val BinaryType: BinaryType

486

val BooleanType: BooleanType

487

val DateType: DateType

488

val TimestampType: TimestampType

489

val CalendarIntervalType: CalendarIntervalType

490

val DoubleType: DoubleType

491

val FloatType: FloatType

492

val ByteType: ByteType

493

val IntegerType: IntegerType

494

val LongType: LongType

495

val ShortType: ShortType

496

val NullType: NullType

497

}

498

499

case class StructType(fields: Array[StructField]) extends DataType {

500

def add(field: StructField): StructType

501

def add(name: String, dataType: DataType): StructType

502

def add(name: String, dataType: DataType, nullable: Boolean): StructType

503

def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType

504

def apply(name: String): StructField

505

def apply(index: Int): StructField

506

def fieldNames: Array[String]

507

def length: Int

508

def size: Int

509

}

510

511

case class StructField(

512

name: String,

513

dataType: DataType,

514

nullable: Boolean = true,

515

metadata: Metadata = Metadata.empty

516

) {

517

def getComment(): Option[String]

518

}

519

520

case class ArrayType(elementType: DataType, containsNull: Boolean = true) extends DataType

521

522

case class MapType(

523

keyType: DataType,

524

valueType: DataType,

525

valueContainsNull: Boolean = true

526

) extends DataType

527

```

528

529

### Row

530

531

Represents a row in a DataFrame.

532

533

```scala { .api }

534

trait Row extends Serializable {

535

def size: Int

536

def length: Int

537

def schema: StructType

538

def apply(i: Int): Any

539

def get(i: Int): Any

540

def getAs[T](i: Int): T

541

def getAs[T](fieldName: String): T

542

def fieldIndex(name: String): Int

543

544

// Type-specific getters

545

def isNullAt(i: Int): Boolean

546

def getBoolean(i: Int): Boolean

547

def getByte(i: Int): Byte

548

def getShort(i: Int): Short

549

def getInt(i: Int): Int

550

def getLong(i: Int): Long

551

def getFloat(i: Int): Float

552

def getDouble(i: Int): Double

553

def getString(i: Int): String

554

def getDecimal(i: Int): java.math.BigDecimal

555

def getDate(i: Int): java.sql.Date

556

def getTimestamp(i: Int): java.sql.Timestamp

557

def getSeq[T](i: Int): Seq[T]

558

def getList[T](i: Int): java.util.List[T]

559

def getMap[K, V](i: Int): scala.collection.Map[K, V]

560

def getJavaMap[K, V](i: Int): java.util.Map[K, V]

561

def getStruct(i: Int): Row

562

563

// Array conversion

564

def toSeq: Seq[Any]

565

def copy(): Row

566

}

567

568

object Row {

569

def empty: Row

570

def apply(values: Any*): Row

571

def fromSeq(values: Seq[Any]): Row

572

def fromTuple(tuple: Product): Row

573

def merge(rows: Row*): Row

574

def unapplySeq(row: Row): Some[Seq[Any]]

575

}

576

```

577

578

### Data I/O

579

580

#### DataFrameReader

581

582

Interface for loading DataFrames from external storage systems.

583

584

```scala { .api }

585

class DataFrameReader {

586

// Configuration

587

def format(source: String): DataFrameReader

588

def schema(schema: StructType): DataFrameReader

589

def schema(schemaString: String): DataFrameReader

590

def option(key: String, value: String): DataFrameReader

591

def option(key: String, value: Boolean): DataFrameReader

592

def option(key: String, value: Long): DataFrameReader

593

def option(key: String, value: Double): DataFrameReader

594

def options(options: scala.collection.Map[String, String]): DataFrameReader

595

def options(options: java.util.Map[String, String]): DataFrameReader

596

597

// Data sources

598

def csv(path: String): DataFrame

599

def csv(paths: String*): DataFrame

600

def json(path: String): DataFrame

601

def json(paths: String*): DataFrame

602

def parquet(path: String): DataFrame

603

def parquet(paths: String*): DataFrame

604

def orc(path: String): DataFrame

605

def orc(paths: String*): DataFrame

606

def text(path: String): DataFrame

607

def text(paths: String*): DataFrame

608

def textFile(path: String): Dataset[String]

609

def textFile(paths: String*): Dataset[String]

610

def table(tableName: String): DataFrame

611

def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame

612

613

// Generic load

614

def load(): DataFrame

615

def load(path: String): DataFrame

616

def load(paths: String*): DataFrame

617

}

618

```

619

620

#### DataFrameWriter

621

622

Interface for saving DataFrames to external storage systems.

623

624

```scala { .api }

625

class DataFrameWriter[T] {

626

// Configuration

627

def mode(saveMode: SaveMode): DataFrameWriter[T]

628

def mode(saveMode: String): DataFrameWriter[T]

629

def format(source: String): DataFrameWriter[T]

630

def option(key: String, value: String): DataFrameWriter[T]

631

def option(key: String, value: Boolean): DataFrameWriter[T]

632

def option(key: String, value: Long): DataFrameWriter[T]

633

def option(key: String, value: Double): DataFrameWriter[T]

634

def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

635

def options(options: java.util.Map[String, String]): DataFrameWriter[T]

636

def partitionBy(colNames: String*): DataFrameWriter[T]

637

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

638

def sortBy(colName: String, colNames: String*): DataFrameWriter[T]

639

640

// Data sources

641

def csv(path: String): Unit

642

def json(path: String): Unit

643

def parquet(path: String): Unit

644

def orc(path: String): Unit

645

def text(path: String): Unit

646

def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit

647

def insertInto(tableName: String): Unit

648

def saveAsTable(tableName: String): Unit

649

650

// Generic save

651

def save(): Unit

652

def save(path: String): Unit

653

}

654

655

object SaveMode extends Enumeration {

656

val Append, Overwrite, ErrorIfExists, Ignore = Value

657

}

658

```

659

660

Usage example:

661

662

```scala

663

// Reading data

664

val df = spark.read

665

.format("csv")

666

.option("header", "true")

667

.option("inferSchema", "true")

668

.load("path/to/file.csv")

669

670

// Writing data

671

df.write

672

.format("parquet")

673

.mode("overwrite")

674

.option("compression", "snappy")

675

.partitionBy("year", "month")

676

.save("path/to/output")

677

```

678

679

### Grouped Data Operations

680

681

```scala { .api }

682

class RelationalGroupedDataset protected(

683

df: DataFrame,

684

groupingExprs: Seq[Expression],

685

groupType: RelationalGroupedDataset.GroupType

686

) {

687

// Aggregation functions

688

def agg(expr: Column, exprs: Column*): DataFrame

689

def agg(exprs: Map[String, String]): DataFrame

690

def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame

691

def count(): DataFrame

692

def mean(colNames: String*): DataFrame

693

def max(colNames: String*): DataFrame

694

def min(colNames: String*): DataFrame

695

def sum(colNames: String*): DataFrame

696

def avg(colNames: String*): DataFrame

697

698

// Pivot operations

699

def pivot(pivotColumn: String): RelationalGroupedDataset

700

def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset

701

}

702

```

703

704

Usage example:

705

706

```scala

707

// Group by operations

708

val grouped = df.groupBy("department", "level")

709

.agg(

710

count("*").alias("employee_count"),

711

avg("salary").alias("avg_salary"),

712

max("salary").alias("max_salary")

713

)

714

715

// Pivot operations

716

val pivoted = df.groupBy("department")

717

.pivot("level")

718

.agg(avg("salary"))

719

```