or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

sql.mddocs/

0

# Spark SQL

1

2

Spark SQL provides a programming interface for working with structured and semi-structured data. It allows querying data via SQL and the DataFrame API, with support for various data sources including JSON, Parquet, and Hive tables.

3

4

## SQLContext

5

6

The main entry point for Spark SQL functionality.

7

8

### SQLContext Class

9

10

```scala { .api }

11

class SQLContext(sparkContext: SparkContext) extends Serializable with Logging {

12

// Alternative constructor for existing HiveContext compatibility

13

def this(sparkContext: SparkContext, cacheManager: CacheManager) = this(sparkContext)

14

}

15

```

16

17

### Creating SQLContext

18

19

```scala

20

import org.apache.spark.sql.SQLContext

21

import org.apache.spark.{SparkContext, SparkConf}

22

23

val conf = new SparkConf().setAppName("SQL App").setMaster("local")

24

val sc = new SparkContext(conf)

25

val sqlContext = new SQLContext(sc)

26

27

// Import implicits for implicit conversions

28

import sqlContext.implicits._

29

```

30

31

## Data Sources

32

33

### JSON Files

34

35

**jsonFile**: Read JSON files as SchemaRDD

36

```scala { .api }

37

def jsonFile(path: String): SchemaRDD

38

```

39

40

```scala

41

// Read JSON file

42

val people = sqlContext.jsonFile("people.json")

43

44

// Show schema

45

people.printSchema()

46

47

// Show data

48

people.show()

49

50

// Example JSON structure:

51

// {"name":"Michael"}

52

// {"name":"Andy", "age":30}

53

// {"name":"Justin", "age":19}

54

```

55

56

**jsonRDD**: Create SchemaRDD from RDD of JSON strings

57

```scala { .api }

58

def jsonRDD(json: RDD[String]): SchemaRDD

59

```

60

61

```scala

62

// Create JSON RDD

63

val jsonStrings = sc.parallelize(Array(

64

"""{"name":"Alice", "age":25}""",

65

"""{"name":"Bob", "age":30}""",

66

"""{"name":"Charlie", "age":35}"""

67

))

68

69

val jsonDF = sqlContext.jsonRDD(jsonStrings)

70

```

71

72

### Parquet Files

73

74

**parquetFile**: Read Parquet files

75

```scala { .api }

76

def parquetFile(path: String): SchemaRDD

77

```

78

79

```scala

80

// Read Parquet file

81

val parquetData = sqlContext.parquetFile("users.parquet")

82

83

// Parquet automatically preserves schema

84

parquetData.printSchema()

85

```

86

87

### RDD to SchemaRDD Conversion

88

89

**createSchemaRDD**: Convert RDD to SchemaRDD

90

```scala { .api }

91

def createSchemaRDD[A <: Product : TypeTag](rdd: RDD[A]): SchemaRDD

92

```

93

94

```scala

95

// Define case class for schema

96

case class Person(name: String, age: Int)

97

98

// Create RDD of case class instances

99

val peopleRDD = sc.parallelize(Seq(

100

Person("Alice", 25),

101

Person("Bob", 30),

102

Person("Charlie", 35)

103

))

104

105

// Convert to SchemaRDD

106

val peopleDF = sqlContext.createSchemaRDD(peopleRDD)

107

```

108

109

## SchemaRDD

110

111

The main data structure for structured data in Spark SQL.

112

113

### SchemaRDD Class

114

115

```scala { .api }

116

class SchemaRDD(sqlContext: SQLContext, logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {

117

// Schema operations

118

def printSchema(): Unit

119

def schema: StructType

120

121

// Registration

122

def registerAsTable(tableName: String): Unit

123

def registerTempTable(tableName: String): Unit

124

125

// Transformations

126

def select(exprs: ColumnExpression*): SchemaRDD

127

def where(condition: ColumnExpression): SchemaRDD

128

def filter(condition: ColumnExpression): SchemaRDD

129

def groupBy(cols: ColumnExpression*): GroupedSchemaRDD

130

def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD

131

def limit(n: Int): SchemaRDD

132

def unionAll(other: SchemaRDD): SchemaRDD

133

def intersect(other: SchemaRDD): SchemaRDD

134

def except(other: SchemaRDD): SchemaRDD

135

def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String): SchemaRDD

136

137

// Actions

138

def show(): Unit

139

def collect(): Array[Row]

140

def count(): Long

141

def first(): Row

142

def take(n: Int): Array[Row]

143

144

// Save operations

145

def saveAsParquetFile(path: String): Unit

146

def saveAsTable(tableName: String): Unit

147

def insertInto(tableName: String): Unit

148

def insertInto(tableName: String, overwrite: Boolean): Unit

149

}

150

```

151

152

### Schema Operations

153

154

```scala

155

val people = sqlContext.jsonFile("people.json")

156

157

// Print schema in tree format

158

people.printSchema()

159

// Output:

160

// root

161

// |-- age: long (nullable = true)

162

// |-- name: string (nullable = true)

163

164

// Access schema programmatically

165

val schema = people.schema

166

println(s"Schema has ${schema.fields.length} fields")

167

168

schema.fields.foreach { field =>

169

println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")

170

}

171

```

172

173

### Basic Operations

174

175

```scala

176

// Show first 20 rows

177

people.show()

178

179

// Show custom number of rows

180

people.show(10)

181

182

// Collect all data (be careful with large datasets)

183

val allPeople = people.collect()

184

allPeople.foreach(println)

185

186

// Take first n rows

187

val firstFive = people.take(5)

188

189

// Count rows

190

val totalCount = people.count()

191

```

192

193

## SQL Queries

194

195

### Table Registration and Querying

196

197

**sql**: Execute SQL queries

198

```scala { .api }

199

def sql(sqlText: String): SchemaRDD

200

```

201

202

**registerAsTable**: Register SchemaRDD as temporary table

203

```scala { .api }

204

def registerAsTable(tableName: String): Unit

205

```

206

207

```scala

208

val people = sqlContext.jsonFile("people.json")

209

210

// Register as temporary table

211

people.registerAsTable("people")

212

213

// Execute SQL queries

214

val adults = sqlContext.sql("SELECT name, age FROM people WHERE age >= 18")

215

adults.show()

216

217

// More complex queries

218

val summary = sqlContext.sql("""

219

SELECT

220

COUNT(*) as total_people,

221

AVG(age) as avg_age,

222

MIN(age) as min_age,

223

MAX(age) as max_age

224

FROM people

225

WHERE age IS NOT NULL

226

""")

227

228

summary.show()

229

230

// Joins between tables

231

val addresses = sqlContext.jsonFile("addresses.json")

232

addresses.registerAsTable("addresses")

233

234

val joined = sqlContext.sql("""

235

SELECT p.name, p.age, a.city

236

FROM people p

237

JOIN addresses a ON p.name = a.name

238

""")

239

```

240

241

### Built-in Functions

242

243

```scala

244

// String functions

245

sqlContext.sql("SELECT name, UPPER(name), LENGTH(name) FROM people").show()

246

247

// Math functions

248

sqlContext.sql("SELECT age, SQRT(age), ROUND(age/10.0, 2) FROM people").show()

249

250

// Date functions (if date columns exist)

251

sqlContext.sql("SELECT name, YEAR(birth_date), MONTH(birth_date) FROM people").show()

252

253

// Aggregate functions

254

sqlContext.sql("""

255

SELECT

256

COUNT(*) as count,

257

SUM(age) as total_age,

258

AVG(age) as avg_age,

259

STDDEV(age) as stddev_age

260

FROM people

261

GROUP BY FLOOR(age/10) * 10

262

""").show()

263

```

264

265

## DataFrame API (Programmatic)

266

267

Alternative to SQL for structured data operations.

268

269

### Column Expressions

270

271

```scala

272

import org.apache.spark.sql.catalyst.expressions._

273

274

// Access columns

275

val nameCol = people("name")

276

val ageCol = people("age")

277

278

// Column operations

279

val agePlus10 = people("age") + 10

280

val upperName = Upper(people("name"))

281

```

282

283

### Transformations

284

285

**select**: Choose columns and expressions

286

```scala { .api }

287

def select(exprs: ColumnExpression*): SchemaRDD

288

```

289

290

```scala

291

// Select specific columns

292

val namesAndAges = people.select(people("name"), people("age"))

293

294

// Select with expressions

295

val transformed = people.select(

296

people("name"),

297

people("age") + 1 as "age_next_year",

298

Upper(people("name")) as "upper_name"

299

)

300

```

301

302

**where/filter**: Filter rows based on conditions

303

```scala { .api }

304

def where(condition: ColumnExpression): SchemaRDD

305

def filter(condition: ColumnExpression): SchemaRDD // alias for where

306

```

307

308

```scala

309

// Filter adults

310

val adults = people.where(people("age") >= 18)

311

312

// Multiple conditions

313

val youngAdults = people.filter(

314

people("age") >= 18 && people("age") < 30

315

)

316

317

// String operations

318

val namesWithA = people.where(people("name").startsWith("A"))

319

```

320

321

**groupBy**: Group data for aggregation

322

```scala { .api }

323

def groupBy(cols: ColumnExpression*): GroupedSchemaRDD

324

```

325

326

```scala

327

// Group by age ranges

328

val ageGroups = people.groupBy(people("age") / 10 * 10)

329

val ageCounts = ageGroups.count()

330

331

// Multiple grouping columns (if available)

332

val grouped = people.groupBy(people("department"), people("age") / 10 * 10)

333

val summary = grouped.agg(

334

Count(people("name")) as "count",

335

Avg(people("age")) as "avg_age"

336

)

337

```

338

339

**orderBy**: Sort data

340

```scala { .api }

341

def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD

342

```

343

344

```scala

345

// Sort by age ascending

346

val sortedByAge = people.orderBy(people("age"))

347

348

// Sort by age descending

349

val sortedByAgeDesc = people.orderBy(people("age").desc)

350

351

// Multiple sort columns

352

val sorted = people.orderBy(people("age").desc, people("name"))

353

```

354

355

### Joins

356

357

**join**: Join with another SchemaRDD

358

```scala { .api }

359

def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String = "inner"): SchemaRDD

360

```

361

362

```scala

363

// Assume we have addresses SchemaRDD

364

val addresses = sqlContext.jsonFile("addresses.json")

365

366

// Inner join

367

val joined = people.join(

368

addresses,

369

people("name") === addresses("name"),

370

"inner"

371

)

372

373

// Left outer join

374

val leftJoined = people.join(

375

addresses,

376

people("name") === addresses("name"),

377

"left_outer"

378

)

379

380

// Join types: "inner", "left_outer", "right_outer", "full_outer"

381

```

382

383

### Set Operations

384

385

```scala

386

val people1 = sqlContext.jsonFile("people1.json")

387

val people2 = sqlContext.jsonFile("people2.json")

388

389

// Union (must have same schema)

390

val allPeople = people1.unionAll(people2)

391

392

// Intersection

393

val common = people1.intersect(people2)

394

395

// Difference

396

val unique = people1.except(people2)

397

```

398

399

## Data Types and Schema

400

401

### Data Types

402

403

```scala { .api }

404

import org.apache.spark.sql.catalyst.types._

405

406

// Primitive types

407

StringType // String

408

IntegerType // Int

409

LongType // Long

410

DoubleType // Double

411

FloatType // Float

412

BooleanType // Boolean

413

BinaryType // Array[Byte]

414

TimestampType // java.sql.Timestamp

415

416

// Complex types

417

ArrayType(elementType: DataType, containsNull: Boolean)

418

MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)

419

StructType(fields: Array[StructField])

420

```

421

422

### Schema Definition

423

424

```scala

425

import org.apache.spark.sql.catalyst.types._

426

427

// Define schema manually

428

val schema = StructType(Array(

429

StructField("name", StringType, nullable = true),

430

StructField("age", IntegerType, nullable = true),

431

StructField("addresses", ArrayType(StringType), nullable = true)

432

))

433

434

// Create SchemaRDD with predefined schema

435

val rowRDD = sc.parallelize(Seq(

436

Row("Alice", 25, Array("123 Main St", "456 Oak Ave")),

437

Row("Bob", 30, Array("789 Pine St"))

438

))

439

440

val schemaRDD = sqlContext.applySchema(rowRDD, schema)

441

```

442

443

## Working with Row Objects

444

445

### Row Class

446

447

```scala { .api }

448

abstract class Row extends Serializable {

449

def length: Int

450

def get(i: Int): Any

451

def isNullAt(i: Int): Boolean

452

453

// Typed getters

454

def getString(i: Int): String

455

def getInt(i: Int): Int

456

def getLong(i: Int): Long

457

def getDouble(i: Int): Double

458

def getFloat(i: Int): Float

459

def getBoolean(i: Int): Boolean

460

def getAs[T](i: Int): T

461

def getAs[T](fieldName: String): T

462

}

463

```

464

465

```scala

466

val people = sqlContext.jsonFile("people.json")

467

val rows = people.collect()

468

469

rows.foreach { row =>

470

val name = row.getString(0) // First field

471

val age = row.getInt(1) // Second field

472

473

// Or by field name (if available)

474

val nameByField = row.getAs[String]("name")

475

val ageByField = row.getAs[Int]("age")

476

477

println(s"$name is $age years old")

478

}

479

480

// Safe access with null checking

481

rows.foreach { row =>

482

val name = if (row.isNullAt(0)) "Unknown" else row.getString(0)

483

val age = if (row.isNullAt(1)) 0 else row.getInt(1)

484

485

println(s"$name is $age years old")

486

}

487

```

488

489

## Save Operations

490

491

### Saving Data

492

493

**saveAsParquetFile**: Save as Parquet format

494

```scala { .api }

495

def saveAsParquetFile(path: String): Unit

496

```

497

498

**saveAsTable**: Save as persistent table

499

```scala { .api }

500

def saveAsTable(tableName: String): Unit

501

```

502

503

**insertInto**: Insert into existing table

504

```scala { .api }

505

def insertInto(tableName: String): Unit

506

def insertInto(tableName: String, overwrite: Boolean): Unit

507

```

508

509

```scala

510

val people = sqlContext.jsonFile("people.json")

511

512

// Save as Parquet (recommended for performance)

513

people.saveAsParquetFile("people.parquet")

514

515

// Save as persistent table (requires Hive support)

516

people.saveAsTable("people_table")

517

518

// Insert into existing table

519

people.insertInto("existing_people_table")

520

521

// Overwrite existing table

522

people.insertInto("existing_people_table", overwrite = true)

523

```

524

525

## Caching and Performance

526

527

### Caching Tables

528

529

**cacheTable**: Cache table in memory

530

```scala { .api }

531

def cacheTable(tableName: String): Unit

532

```

533

534

**uncacheTable**: Remove table from cache

535

```scala { .api }

536

def uncacheTable(tableName: String): Unit

537

```

538

539

```scala

540

// Register and cache table

541

people.registerAsTable("people")

542

sqlContext.cacheTable("people")

543

544

// Now queries will use cached data

545

val adults = sqlContext.sql("SELECT * FROM people WHERE age >= 18")

546

val seniors = sqlContext.sql("SELECT * FROM people WHERE age >= 65")

547

548

// Remove from cache when done

549

sqlContext.uncacheTable("people")

550

```

551

552

### Performance Optimization

553

554

```scala

555

// Cache frequently accessed SchemaRDDs

556

val cachedPeople = people.cache()

557

558

// Use Parquet for better performance

559

val parquetPeople = sqlContext.parquetFile("people.parquet")

560

561

// Repartition for better parallelism

562

val repartitioned = people.repartition(10)

563

564

// Coalesce to reduce small files

565

val coalesced = people.coalesce(1)

566

```

567

568

## Configuration and Settings

569

570

```scala

571

// Access SQL configuration

572

val sqlConf = sqlContext.conf

573

574

// Set configuration properties

575

sqlConf.setConf("spark.sql.shuffle.partitions", "200")

576

sqlConf.setConf("spark.sql.codegen", "true")

577

578

// Get configuration values

579

val shufflePartitions = sqlConf.getConf("spark.sql.shuffle.partitions")

580

val codegenEnabled = sqlConf.getConf("spark.sql.codegen")

581

```

582

583

## Advanced Usage Patterns

584

585

### Complex Data Processing

586

587

```scala

588

// Complex analytical query

589

val analysis = sqlContext.sql("""

590

SELECT

591

CASE

592

WHEN age < 18 THEN 'Minor'

593

WHEN age < 65 THEN 'Adult'

594

ELSE 'Senior'

595

END as age_group,

596

COUNT(*) as count,

597

AVG(age) as avg_age

598

FROM people

599

WHERE age IS NOT NULL

600

GROUP BY

601

CASE

602

WHEN age < 18 THEN 'Minor'

603

WHEN age < 65 THEN 'Adult'

604

ELSE 'Senior'

605

END

606

ORDER BY avg_age

607

""")

608

609

analysis.show()

610

```

611

612

### Window Functions (Limited Support)

613

614

```scala

615

// Basic ranking within groups (limited in Spark 1.0)

616

val ranked = sqlContext.sql("""

617

SELECT name, age, department,

618

ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank

619

FROM employees

620

""")

621

```

622

623

This comprehensive guide covers the Spark SQL API available in Spark 1.0.0 for working with structured data using both SQL and programmatic DataFrame operations.