or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mddeployment.mdgraphx.mdindex.mdml.mdsql.mdstreaming.md

sql.mddocs/

0

# SQL and DataFrames

1

2

High-level APIs for working with structured data using DataFrames and Datasets. Built on Spark SQL with Catalyst optimizer for query optimization and code generation. Provides seamless integration with various data sources and formats.

3

4

## Capabilities

5

6

### SparkSession

7

8

Entry point for DataFrame and Dataset APIs. The modern way to work with Spark SQL functionality.

9

10

```scala { .api }

11

/**

12

* Entry point for DataFrame and Dataset APIs

13

*/

14

class SparkSession {

15

/** Get DataFrameReader for loading data */

16

def read: DataFrameReader

17

/** Get DataStreamReader for streaming data */

18

def readStream: DataStreamReader

19

/** Execute SQL query */

20

def sql(sqlText: String): DataFrame

21

/** Access table as DataFrame */

22

def table(tableName: String): DataFrame

23

/** Create Dataset of numbers */

24

def range(end: Long): Dataset[Long]

25

def range(start: Long, end: Long): Dataset[Long]

26

/** Create DataFrame from sequence */

27

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

28

/** Create Dataset from sequence */

29

def createDataset[T : Encoder](data: Seq[T]): Dataset[T]

30

/** Create empty DataFrame */

31

def emptyDataFrame: DataFrame

32

/** Create empty Dataset */

33

def emptyDataset[T: Encoder]: Dataset[T]

34

/** Access catalog functions */

35

def catalog: Catalog

36

/** Access runtime configuration */

37

def conf: RuntimeConfig

38

/** Register user-defined functions */

39

def udf: UDFRegistration

40

/** Manage streaming queries */

41

def streams: StreamingQueryManager

42

/** Stop SparkSession */

43

def stop(): Unit

44

}

45

46

/**

47

* Builder for SparkSession

48

*/

49

object SparkSession {

50

def builder(): Builder

51

52

class Builder {

53

def master(master: String): Builder

54

def appName(name: String): Builder

55

def config(key: String, value: String): Builder

56

def config(conf: SparkConf): Builder

57

def enableHiveSupport(): Builder

58

def getOrCreate(): SparkSession

59

}

60

}

61

```

62

63

**Usage Examples:**

64

65

```scala

66

import org.apache.spark.sql.SparkSession

67

68

// Create SparkSession

69

val spark = SparkSession.builder()

70

.appName("MyApp")

71

.master("local[*]")

72

.config("spark.sql.adaptive.enabled", "true")

73

.getOrCreate()

74

75

// Load data

76

val df = spark.read

77

.option("header", "true")

78

.csv("people.csv")

79

80

// SQL queries

81

spark.sql("SELECT name, age FROM people WHERE age > 21").show()

82

83

// Create from data

84

val data = Seq(("Alice", 25), ("Bob", 30))

85

val df2 = spark.createDataFrame(data).toDF("name", "age")

86

87

spark.stop()

88

```

89

90

**Python SparkSession API:**

91

92

```python { .api }

93

class SparkSession:

94

"""

95

Entry point for DataFrame and SQL APIs in Python

96

"""

97

@property

98

def read(self) -> DataFrameReader

99

@property

100

def readStream(self) -> DataStreamReader

101

def sql(self, sqlQuery: str) -> DataFrame

102

def table(self, tableName: str) -> DataFrame

103

def range(self, start: int, end: int = None, step: int = 1, numPartitions: int = None) -> DataFrame

104

def createDataFrame(self, data: List, schema: Optional[Union[List[str], StructType]] = None) -> DataFrame

105

@property

106

def catalog(self) -> Catalog

107

@property

108

def conf(self) -> RuntimeConfig

109

@property

110

def udf(self) -> UDFRegistration

111

@property

112

def streams(self) -> StreamingQueryManager

113

def stop(self) -> None

114

115

class SparkSession:

116

@staticmethod

117

def builder() -> Builder

118

119

class Builder:

120

def master(self, master: str) -> Builder

121

def appName(self, name: str) -> Builder

122

def config(self, key: str, value: str) -> Builder

123

def enableHiveSupport(self) -> Builder

124

def getOrCreate(self) -> SparkSession

125

```

126

127

**Python Usage Examples:**

128

129

```python

130

from pyspark.sql import SparkSession

131

132

# Create SparkSession

133

spark = SparkSession.builder \

134

.appName("MyApp") \

135

.master("local[*]") \

136

.config("spark.sql.adaptive.enabled", "true") \

137

.getOrCreate()

138

139

# Load data

140

df = spark.read \

141

.option("header", "true") \

142

.csv("people.csv")

143

144

# SQL queries

145

spark.sql("SELECT name, age FROM people WHERE age > 21").show()

146

147

# Create from data

148

data = [("Alice", 25), ("Bob", 30)]

149

df2 = spark.createDataFrame(data, ["name", "age"])

150

151

spark.stop()

152

```

153

154

### Dataset[T] and DataFrame

155

156

Dataset is a distributed collection of data with compile-time type safety. DataFrame is a type alias for Dataset[Row].

157

158

```scala { .api }

159

/**

160

* Distributed collection of data with schema

161

*/

162

class Dataset[T] {

163

/** Display data in tabular format */

164

def show(numRows: Int = 20, truncate: Boolean = true): Unit

165

/** Print schema to console */

166

def printSchema(): Unit

167

/** Show query execution plan */

168

def explain(extended: Boolean = false): Unit

169

170

/** Select columns */

171

def select(cols: Column*): DataFrame

172

def select(col: String, cols: String*): DataFrame

173

/** Filter rows */

174

def filter(condition: Column): Dataset[T]

175

def where(condition: Column): Dataset[T]

176

/** Group by columns */

177

def groupBy(cols: Column*): RelationalGroupedDataset

178

def groupBy(col1: String, cols: String*): RelationalGroupedDataset

179

/** Aggregate expressions */

180

def agg(expr: Column, exprs: Column*): DataFrame

181

182

/** Join with another Dataset */

183

def join(right: Dataset[_]): DataFrame

184

def join(right: Dataset[_], joinExprs: Column): DataFrame

185

def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

186

/** Union with another Dataset */

187

def union(other: Dataset[T]): Dataset[T]

188

def unionAll(other: Dataset[T]): Dataset[T]

189

/** Intersection */

190

def intersect(other: Dataset[T]): Dataset[T]

191

/** Difference */

192

def except(other: Dataset[T]): Dataset[T]

193

194

/** Sort by columns */

195

def orderBy(sortExprs: Column*): Dataset[T]

196

def sort(sortExprs: Column*): Dataset[T]

197

/** Limit number of rows */

198

def limit(n: Int): Dataset[T]

199

/** Remove duplicates */

200

def distinct(): Dataset[T]

201

def dropDuplicates(): Dataset[T]

202

def dropDuplicates(colNames: Array[String]): Dataset[T]

203

204

/** Add or replace column */

205

def withColumn(colName: String, col: Column): DataFrame

206

/** Rename column */

207

def withColumnRenamed(existingName: String, newName: String): DataFrame

208

/** Drop column */

209

def drop(colName: String): DataFrame

210

def drop(col: Column): DataFrame

211

212

/** Collect all rows to driver */

213

def collect(): Array[T]

214

/** Collect as Java List */

215

def collectAsList(): java.util.List[T]

216

/** Count rows */

217

def count(): Long

218

/** Get first row */

219

def first(): T

220

def head(): T

221

/** Get first n rows */

222

def head(n: Int): Array[T]

223

def take(n: Int): Array[T]

224

225

/** Get writer for saving data */

226

def write: DataFrameWriter[T]

227

/** Get writer for streaming */

228

def writeStream: DataStreamWriter[T]

229

230

/** Cache Dataset */

231

def cache(): Dataset[T]

232

/** Persist with storage level */

233

def persist(newLevel: StorageLevel): Dataset[T]

234

/** Remove from cache */

235

def unpersist(blocking: Boolean = false): Dataset[T]

236

237

/** Convert to different type */

238

def as[U](implicit encoder: Encoder[U]): Dataset[U]

239

/** Map function with encoder */

240

def map[U](func: T => U)(implicit encoder: Encoder[U]): Dataset[U]

241

/** FlatMap with encoder */

242

def flatMap[U](func: T => TraversableOnce[U])(implicit encoder: Encoder[U]): Dataset[U]

243

/** Apply function to each element */

244

def foreach(f: T => Unit): Unit

245

/** Apply function to each partition */

246

def foreachPartition(f: Iterator[T] => Unit): Unit

247

}

248

249

/** DataFrame is Dataset[Row] */

250

type DataFrame = Dataset[Row]

251

```

252

253

**Usage Examples:**

254

255

```scala

256

import org.apache.spark.sql.functions._

257

258

// Load and explore data

259

val df = spark.read.json("people.json")

260

df.show()

261

df.printSchema()

262

df.explain()

263

264

// Transformations

265

val adults = df

266

.select("name", "age")

267

.filter($"age" > 18)

268

.orderBy($"age".desc)

269

270

// Aggregations

271

val avgAge = df

272

.groupBy("department")

273

.agg(avg("age").as("avg_age"), count("*").as("count"))

274

275

// Joins

276

val departments = spark.read.json("departments.json")

277

val joined = df.join(departments, "dept_id")

278

279

// Window functions

280

import org.apache.spark.sql.expressions.Window

281

val windowSpec = Window.partitionBy("department").orderBy("salary")

282

val ranked = df.withColumn("rank", row_number().over(windowSpec))

283

284

// Type-safe operations with case classes

285

case class Person(name: String, age: Int)

286

val people = df.as[Person]

287

val names = people.map(_.name.toUpperCase)

288

```

289

290

### Column Expressions

291

292

Column expressions for building queries and transformations.

293

294

```scala { .api }

295

/**

296

* Column expression for DataFrame operations

297

*/

298

class Column {

299

/** Create alias */

300

def alias(alias: String): Column

301

def as(alias: String): Column

302

/** Cast to different type */

303

def cast(to: DataType): Column

304

def cast(to: String): Column

305

306

/** Null checks */

307

def isNull: Column

308

def isNotNull: Column

309

def isNaN: Column

310

311

/** Logical operators */

312

def &&(other: Any): Column

313

def ||(other: Any): Column

314

def unary_! : Column

315

316

/** Comparison operators */

317

def ===(other: Any): Column

318

def =!=(other: Any): Column

319

def >(other: Any): Column

320

def <(other: Any): Column

321

def >=(other: Any): Column

322

def <=(other: Any): Column

323

324

/** Arithmetic operators */

325

def +(other: Any): Column

326

def -(other: Any): Column

327

def *(other: Any): Column

328

def /(other: Any): Column

329

def %(other: Any): Column

330

331

/** String operations */

332

def contains(other: Any): Column

333

def startsWith(other: Column): Column

334

def endsWith(other: Column): Column

335

def like(literal: String): Column

336

def rlike(literal: String): Column

337

338

/** Sorting */

339

def asc: Column

340

def desc: Column

341

def asc_nulls_first: Column

342

def desc_nulls_last: Column

343

344

/** Array operations */

345

def getItem(key: Any): Column

346

def getField(fieldName: String): Column

347

}

348

```

349

350

### Built-in Functions

351

352

Comprehensive set of built-in functions for data manipulation.

353

354

```scala { .api }

355

/**

356

* Built-in functions for DataFrame operations

357

*/

358

object functions {

359

/** Column references */

360

def col(colName: String): Column

361

def column(colName: String): Column

362

def lit(literal: Any): Column

363

364

/** Conditional expressions */

365

def when(condition: Column, value: Any): Column

366

def coalesce(cols: Column*): Column

367

def isnull(col: Column): Column

368

def isnan(col: Column): Column

369

370

/** String functions */

371

def upper(e: Column): Column

372

def lower(e: Column): Column

373

def trim(e: Column): Column

374

def ltrim(e: Column): Column

375

def rtrim(e: Column): Column

376

def length(e: Column): Column

377

def substring(str: Column, pos: Int, len: Int): Column

378

def concat(exprs: Column*): Column

379

def concat_ws(sep: String, exprs: Column*): Column

380

def split(str: Column, pattern: String): Column

381

def regexp_replace(e: Column, pattern: String, replacement: String): Column

382

def regexp_extract(e: Column, exp: String, groupIdx: Int): Column

383

384

/** Math functions */

385

def abs(e: Column): Column

386

def sqrt(e: Column): Column

387

def pow(l: Column, r: Column): Column

388

def round(e: Column, scale: Int): Column

389

def ceil(e: Column): Column

390

def floor(e: Column): Column

391

def sin(e: Column): Column

392

def cos(e: Column): Column

393

def tan(e: Column): Column

394

def log(e: Column): Column

395

def exp(e: Column): Column

396

def greatest(exprs: Column*): Column

397

def least(exprs: Column*): Column

398

399

/** Aggregate functions */

400

def sum(e: Column): Column

401

def avg(e: Column): Column

402

def mean(e: Column): Column

403

def count(e: Column): Column

404

def countDistinct(expr: Column, exprs: Column*): Column

405

def min(e: Column): Column

406

def max(e: Column): Column

407

def first(e: Column): Column

408

def last(e: Column): Column

409

def stddev(e: Column): Column

410

def variance(e: Column): Column

411

def collect_list(e: Column): Column

412

def collect_set(e: Column): Column

413

414

/** Date/Time functions */

415

def current_date(): Column

416

def current_timestamp(): Column

417

def date_add(start: Column, days: Int): Column

418

def date_sub(start: Column, days: Int): Column

419

def date_format(dateExpr: Column, format: String): Column

420

def year(e: Column): Column

421

def month(e: Column): Column

422

def dayofmonth(e: Column): Column

423

def hour(e: Column): Column

424

def minute(e: Column): Column

425

def second(e: Column): Column

426

def unix_timestamp(): Column

427

def from_unixtime(ut: Column): Column

428

429

/** Array functions */

430

def array(cols: Column*): Column

431

def array_contains(column: Column, value: Any): Column

432

def explode(e: Column): Column

433

def posexplode(e: Column): Column

434

def size(e: Column): Column

435

def sort_array(e: Column): Column

436

def reverse(e: Column): Column

437

def array_distinct(e: Column): Column

438

439

/** Map functions */

440

def map(cols: Column*): Column

441

def map_keys(e: Column): Column

442

def map_values(e: Column): Column

443

444

/** Window functions */

445

def row_number(): Column

446

def rank(): Column

447

def dense_rank(): Column

448

def percent_rank(): Column

449

def ntile(n: Int): Column

450

def lag(e: Column, offset: Int): Column

451

def lead(e: Column, offset: Int): Column

452

def first_value(e: Column): Column

453

def last_value(e: Column): Column

454

}

455

```

456

457

### Data I/O

458

459

Reading and writing data from various sources.

460

461

```scala { .api }

462

/**

463

* Interface for loading DataFrames from external storage

464

*/

465

class DataFrameReader {

466

/** Specify data source format */

467

def format(source: String): DataFrameReader

468

/** Add input option */

469

def option(key: String, value: String): DataFrameReader

470

def option(key: String, value: Boolean): DataFrameReader

471

def option(key: String, value: Long): DataFrameReader

472

def option(key: String, value: Double): DataFrameReader

473

/** Add multiple options */

474

def options(options: Map[String, String]): DataFrameReader

475

/** Set expected schema */

476

def schema(schema: StructType): DataFrameReader

477

def schema(schemaString: String): DataFrameReader

478

479

/** Load data */

480

def load(): DataFrame

481

def load(path: String): DataFrame

482

def load(paths: String*): DataFrame

483

484

/** Format-specific methods */

485

def json(path: String): DataFrame

486

def json(jsonRDD: RDD[String]): DataFrame

487

def json(jsonDataset: Dataset[String]): DataFrame

488

def parquet(paths: String*): DataFrame

489

def text(paths: String*): DataFrame

490

def textFile(paths: String*): Dataset[String]

491

def csv(paths: String*): DataFrame

492

def orc(paths: String*): DataFrame

493

def jdbc(url: String, table: String, properties: Properties): DataFrame

494

def table(tableName: String): DataFrame

495

}

496

497

/**

498

* Interface for saving DataFrames to external storage

499

*/

500

class DataFrameWriter[T] {

501

/** Set save mode */

502

def mode(saveMode: SaveMode): DataFrameWriter[T]

503

def mode(saveMode: String): DataFrameWriter[T]

504

/** Specify output format */

505

def format(source: String): DataFrameWriter[T]

506

/** Add output option */

507

def option(key: String, value: String): DataFrameWriter[T]

508

def option(key: String, value: Boolean): DataFrameWriter[T]

509

def option(key: String, value: Long): DataFrameWriter[T]

510

def option(key: String, value: Double): DataFrameWriter[T]

511

/** Add multiple options */

512

def options(options: Map[String, String]): DataFrameWriter[T]

513

/** Partition output by columns */

514

def partitionBy(colNames: String*): DataFrameWriter[T]

515

/** Bucket output by columns */

516

def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

517

/** Sort within buckets */

518

def sortBy(colName: String, colNames: String*): DataFrameWriter[T]

519

520

/** Save data */

521

def save(): Unit

522

def save(path: String): Unit

523

/** Insert into existing table */

524

def insertInto(tableName: String): Unit

525

/** Save as table */

526

def saveAsTable(tableName: String): Unit

527

528

/** Format-specific methods */

529

def json(path: String): Unit

530

def parquet(path: String): Unit

531

def text(path: String): Unit

532

def csv(path: String): Unit

533

def orc(path: String): Unit

534

def jdbc(url: String, table: String, connectionProperties: Properties): Unit

535

}

536

537

/**

538

* Save modes for DataFrameWriter

539

*/

540

object SaveMode extends Enumeration {

541

val Append, Overwrite, ErrorIfExists, Ignore = Value

542

}

543

```

544

545

**Usage Examples:**

546

547

```scala

548

// Reading data

549

val df = spark.read

550

.format("csv")

551

.option("header", "true")

552

.option("inferSchema", "true")

553

.load("data.csv")

554

555

// Reading with schema

556

import org.apache.spark.sql.types._

557

val schema = StructType(Array(

558

StructField("name", StringType, true),

559

StructField("age", IntegerType, true)

560

))

561

val df2 = spark.read.schema(schema).csv("data.csv")

562

563

// Writing data

564

df.write

565

.mode(SaveMode.Overwrite)

566

.option("header", "true")

567

.csv("output")

568

569

// Partitioned output

570

df.write

571

.partitionBy("year", "month")

572

.parquet("partitioned_output")

573

574

// Database operations

575

df.write

576

.format("jdbc")

577

.option("url", "jdbc:postgresql://localhost/test")

578

.option("dbtable", "people")

579

.option("user", "username")

580

.option("password", "password")

581

.save()

582

```

583

584

### Data Types and Schema

585

586

Schema definition and data type system.

587

588

```scala { .api }

589

/**

590

* Base class for data types

591

*/

592

abstract class DataType

593

594

/** Primitive types */

595

object StringType extends DataType

596

object IntegerType extends DataType

597

object LongType extends DataType

598

object DoubleType extends DataType

599

object FloatType extends DataType

600

object BooleanType extends DataType

601

object DateType extends DataType

602

object TimestampType extends DataType

603

object BinaryType extends DataType

604

605

/** Complex types */

606

case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType

607

case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType

608

case class StructType(fields: Array[StructField]) extends DataType

609

610

/**

611

* Field in a struct

612

*/

613

case class StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata)

614

615

/**

616

* Row in a DataFrame

617

*/

618

trait Row {

619

def length: Int

620

def size: Int

621

def get(i: Int): Any

622

def getString(i: Int): String

623

def getBoolean(i: Int): Boolean

624

def getInt(i: Int): Int

625

def getLong(i: Int): Long

626

def getFloat(i: Int): Float

627

def getDouble(i: Int): Double

628

def getAs[T](i: Int): T

629

def getAs[T](fieldName: String): T

630

def isNullAt(i: Int): Boolean

631

def toSeq: Seq[Any]

632

}

633

```

634

635

## Error Handling

636

637

Common SQL exceptions:

638

639

- `AnalysisException` - SQL analysis errors (invalid columns, type mismatches)

640

- `ParseException` - SQL parsing errors

641

- `StreamingQueryException` - Streaming query failures

642

- `SparkSQLException` - General SQL execution errors