0
# Spark SQL
1
2
Spark SQL provides a programming interface for working with structured and semi-structured data. It allows querying data via SQL and the DataFrame API, with support for various data sources including JSON, Parquet, and Hive tables.
3
4
## SQLContext
5
6
The main entry point for Spark SQL functionality.
7
8
### SQLContext Class
9
10
```scala { .api }
11
class SQLContext(sparkContext: SparkContext) extends Serializable with Logging {
12
// Alternative constructor for existing HiveContext compatibility
13
def this(sparkContext: SparkContext, cacheManager: CacheManager) = this(sparkContext)
14
}
15
```
16
17
### Creating SQLContext
18
19
```scala
20
import org.apache.spark.sql.SQLContext
21
import org.apache.spark.{SparkContext, SparkConf}
22
23
val conf = new SparkConf().setAppName("SQL App").setMaster("local")
24
val sc = new SparkContext(conf)
25
val sqlContext = new SQLContext(sc)
26
27
// Import implicits for implicit conversions
28
import sqlContext.implicits._
29
```
30
31
## Data Sources
32
33
### JSON Files
34
35
**jsonFile**: Read JSON files as SchemaRDD
36
```scala { .api }
37
def jsonFile(path: String): SchemaRDD
38
```
39
40
```scala
41
// Read JSON file
42
val people = sqlContext.jsonFile("people.json")
43
44
// Show schema
45
people.printSchema()
46
47
// Show data
48
people.show()
49
50
// Example JSON structure:
51
// {"name":"Michael"}
52
// {"name":"Andy", "age":30}
53
// {"name":"Justin", "age":19}
54
```
55
56
**jsonRDD**: Create SchemaRDD from RDD of JSON strings
57
```scala { .api }
58
def jsonRDD(json: RDD[String]): SchemaRDD
59
```
60
61
```scala
62
// Create JSON RDD
63
val jsonStrings = sc.parallelize(Array(
64
"""{"name":"Alice", "age":25}""",
65
"""{"name":"Bob", "age":30}""",
66
"""{"name":"Charlie", "age":35}"""
67
))
68
69
val jsonDF = sqlContext.jsonRDD(jsonStrings)
70
```
71
72
### Parquet Files
73
74
**parquetFile**: Read Parquet files
75
```scala { .api }
76
def parquetFile(path: String): SchemaRDD
77
```
78
79
```scala
80
// Read Parquet file
81
val parquetData = sqlContext.parquetFile("users.parquet")
82
83
// Parquet automatically preserves schema
84
parquetData.printSchema()
85
```
86
87
### RDD to SchemaRDD Conversion
88
89
**createSchemaRDD**: Convert RDD to SchemaRDD
90
```scala { .api }
91
def createSchemaRDD[A <: Product : TypeTag](rdd: RDD[A]): SchemaRDD
92
```
93
94
```scala
95
// Define case class for schema
96
case class Person(name: String, age: Int)
97
98
// Create RDD of case class instances
99
val peopleRDD = sc.parallelize(Seq(
100
Person("Alice", 25),
101
Person("Bob", 30),
102
Person("Charlie", 35)
103
))
104
105
// Convert to SchemaRDD
106
val peopleDF = sqlContext.createSchemaRDD(peopleRDD)
107
```
108
109
## SchemaRDD
110
111
The main data structure for structured data in Spark SQL.
112
113
### SchemaRDD Class
114
115
```scala { .api }
116
class SchemaRDD(sqlContext: SQLContext, logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
117
// Schema operations
118
def printSchema(): Unit
119
def schema: StructType
120
121
// Registration
122
def registerAsTable(tableName: String): Unit
123
def registerTempTable(tableName: String): Unit
124
125
// Transformations
126
def select(exprs: ColumnExpression*): SchemaRDD
127
def where(condition: ColumnExpression): SchemaRDD
128
def filter(condition: ColumnExpression): SchemaRDD
129
def groupBy(cols: ColumnExpression*): GroupedSchemaRDD
130
def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD
131
def limit(n: Int): SchemaRDD
132
def unionAll(other: SchemaRDD): SchemaRDD
133
def intersect(other: SchemaRDD): SchemaRDD
134
def except(other: SchemaRDD): SchemaRDD
135
def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String): SchemaRDD
136
137
// Actions
138
def show(): Unit
139
def collect(): Array[Row]
140
def count(): Long
141
def first(): Row
142
def take(n: Int): Array[Row]
143
144
// Save operations
145
def saveAsParquetFile(path: String): Unit
146
def saveAsTable(tableName: String): Unit
147
def insertInto(tableName: String): Unit
148
def insertInto(tableName: String, overwrite: Boolean): Unit
149
}
150
```
151
152
### Schema Operations
153
154
```scala
155
val people = sqlContext.jsonFile("people.json")
156
157
// Print schema in tree format
158
people.printSchema()
159
// Output:
160
// root
161
// |-- age: long (nullable = true)
162
// |-- name: string (nullable = true)
163
164
// Access schema programmatically
165
val schema = people.schema
166
println(s"Schema has ${schema.fields.length} fields")
167
168
schema.fields.foreach { field =>
169
println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")
170
}
171
```
172
173
### Basic Operations
174
175
```scala
176
// Show first 20 rows
177
people.show()
178
179
// Show custom number of rows
180
people.show(10)
181
182
// Collect all data (be careful with large datasets)
183
val allPeople = people.collect()
184
allPeople.foreach(println)
185
186
// Take first n rows
187
val firstFive = people.take(5)
188
189
// Count rows
190
val totalCount = people.count()
191
```
192
193
## SQL Queries
194
195
### Table Registration and Querying
196
197
**sql**: Execute SQL queries
198
```scala { .api }
199
def sql(sqlText: String): SchemaRDD
200
```
201
202
**registerAsTable**: Register SchemaRDD as temporary table
203
```scala { .api }
204
def registerAsTable(tableName: String): Unit
205
```
206
207
```scala
208
val people = sqlContext.jsonFile("people.json")
209
210
// Register as temporary table
211
people.registerAsTable("people")
212
213
// Execute SQL queries
214
val adults = sqlContext.sql("SELECT name, age FROM people WHERE age >= 18")
215
adults.show()
216
217
// More complex queries
218
val summary = sqlContext.sql("""
219
SELECT
220
COUNT(*) as total_people,
221
AVG(age) as avg_age,
222
MIN(age) as min_age,
223
MAX(age) as max_age
224
FROM people
225
WHERE age IS NOT NULL
226
""")
227
228
summary.show()
229
230
// Joins between tables
231
val addresses = sqlContext.jsonFile("addresses.json")
232
addresses.registerAsTable("addresses")
233
234
val joined = sqlContext.sql("""
235
SELECT p.name, p.age, a.city
236
FROM people p
237
JOIN addresses a ON p.name = a.name
238
""")
239
```
240
241
### Built-in Functions
242
243
```scala
244
// String functions
245
sqlContext.sql("SELECT name, UPPER(name), LENGTH(name) FROM people").show()
246
247
// Math functions
248
sqlContext.sql("SELECT age, SQRT(age), ROUND(age/10.0, 2) FROM people").show()
249
250
// Date functions (if date columns exist)
251
sqlContext.sql("SELECT name, YEAR(birth_date), MONTH(birth_date) FROM people").show()
252
253
// Aggregate functions
254
sqlContext.sql("""
255
SELECT
256
COUNT(*) as count,
257
SUM(age) as total_age,
258
AVG(age) as avg_age,
259
STDDEV(age) as stddev_age
260
FROM people
261
GROUP BY FLOOR(age/10) * 10
262
""").show()
263
```
264
265
## DataFrame API (Programmatic)
266
267
Alternative to SQL for structured data operations.
268
269
### Column Expressions
270
271
```scala
272
import org.apache.spark.sql.catalyst.expressions._
273
274
// Access columns
275
val nameCol = people("name")
276
val ageCol = people("age")
277
278
// Column operations
279
val agePlus10 = people("age") + 10
280
val upperName = Upper(people("name"))
281
```
282
283
### Transformations
284
285
**select**: Choose columns and expressions
286
```scala { .api }
287
def select(exprs: ColumnExpression*): SchemaRDD
288
```
289
290
```scala
291
// Select specific columns
292
val namesAndAges = people.select(people("name"), people("age"))
293
294
// Select with expressions
295
val transformed = people.select(
296
people("name"),
297
people("age") + 1 as "age_next_year",
298
Upper(people("name")) as "upper_name"
299
)
300
```
301
302
**where/filter**: Filter rows based on conditions
303
```scala { .api }
304
def where(condition: ColumnExpression): SchemaRDD
305
def filter(condition: ColumnExpression): SchemaRDD // alias for where
306
```
307
308
```scala
309
// Filter adults
310
val adults = people.where(people("age") >= 18)
311
312
// Multiple conditions
313
val youngAdults = people.filter(
314
people("age") >= 18 && people("age") < 30
315
)
316
317
// String operations
318
val namesWithA = people.where(people("name").startsWith("A"))
319
```
320
321
**groupBy**: Group data for aggregation
322
```scala { .api }
323
def groupBy(cols: ColumnExpression*): GroupedSchemaRDD
324
```
325
326
```scala
327
// Group by age ranges
328
val ageGroups = people.groupBy(people("age") / 10 * 10)
329
val ageCounts = ageGroups.count()
330
331
// Multiple grouping columns (if available)
332
val grouped = people.groupBy(people("department"), people("age") / 10 * 10)
333
val summary = grouped.agg(
334
Count(people("name")) as "count",
335
Avg(people("age")) as "avg_age"
336
)
337
```
338
339
**orderBy**: Sort data
340
```scala { .api }
341
def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD
342
```
343
344
```scala
345
// Sort by age ascending
346
val sortedByAge = people.orderBy(people("age"))
347
348
// Sort by age descending
349
val sortedByAgeDesc = people.orderBy(people("age").desc)
350
351
// Multiple sort columns
352
val sorted = people.orderBy(people("age").desc, people("name"))
353
```
354
355
### Joins
356
357
**join**: Join with another SchemaRDD
358
```scala { .api }
359
def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String = "inner"): SchemaRDD
360
```
361
362
```scala
363
// Assume we have addresses SchemaRDD
364
val addresses = sqlContext.jsonFile("addresses.json")
365
366
// Inner join
367
val joined = people.join(
368
addresses,
369
people("name") === addresses("name"),
370
"inner"
371
)
372
373
// Left outer join
374
val leftJoined = people.join(
375
addresses,
376
people("name") === addresses("name"),
377
"left_outer"
378
)
379
380
// Join types: "inner", "left_outer", "right_outer", "full_outer"
381
```
382
383
### Set Operations
384
385
```scala
386
val people1 = sqlContext.jsonFile("people1.json")
387
val people2 = sqlContext.jsonFile("people2.json")
388
389
// Union (must have same schema)
390
val allPeople = people1.unionAll(people2)
391
392
// Intersection
393
val common = people1.intersect(people2)
394
395
// Difference
396
val unique = people1.except(people2)
397
```
398
399
## Data Types and Schema
400
401
### Data Types
402
403
```scala { .api }
404
import org.apache.spark.sql.catalyst.types._
405
406
// Primitive types
407
StringType // String
408
IntegerType // Int
409
LongType // Long
410
DoubleType // Double
411
FloatType // Float
412
BooleanType // Boolean
413
BinaryType // Array[Byte]
414
TimestampType // java.sql.Timestamp
415
416
// Complex types
417
ArrayType(elementType: DataType, containsNull: Boolean)
418
MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)
419
StructType(fields: Array[StructField])
420
```
421
422
### Schema Definition
423
424
```scala
425
import org.apache.spark.sql.catalyst.types._
426
427
// Define schema manually
428
val schema = StructType(Array(
429
StructField("name", StringType, nullable = true),
430
StructField("age", IntegerType, nullable = true),
431
StructField("addresses", ArrayType(StringType), nullable = true)
432
))
433
434
// Create SchemaRDD with predefined schema
435
val rowRDD = sc.parallelize(Seq(
436
Row("Alice", 25, Array("123 Main St", "456 Oak Ave")),
437
Row("Bob", 30, Array("789 Pine St"))
438
))
439
440
val schemaRDD = sqlContext.applySchema(rowRDD, schema)
441
```
442
443
## Working with Row Objects
444
445
### Row Class
446
447
```scala { .api }
448
abstract class Row extends Serializable {
449
def length: Int
450
def get(i: Int): Any
451
def isNullAt(i: Int): Boolean
452
453
// Typed getters
454
def getString(i: Int): String
455
def getInt(i: Int): Int
456
def getLong(i: Int): Long
457
def getDouble(i: Int): Double
458
def getFloat(i: Int): Float
459
def getBoolean(i: Int): Boolean
460
def getAs[T](i: Int): T
461
def getAs[T](fieldName: String): T
462
}
463
```
464
465
```scala
466
val people = sqlContext.jsonFile("people.json")
467
val rows = people.collect()
468
469
rows.foreach { row =>
470
val name = row.getString(0) // First field
471
val age = row.getInt(1) // Second field
472
473
// Or by field name (if available)
474
val nameByField = row.getAs[String]("name")
475
val ageByField = row.getAs[Int]("age")
476
477
println(s"$name is $age years old")
478
}
479
480
// Safe access with null checking
481
rows.foreach { row =>
482
val name = if (row.isNullAt(0)) "Unknown" else row.getString(0)
483
val age = if (row.isNullAt(1)) 0 else row.getInt(1)
484
485
println(s"$name is $age years old")
486
}
487
```
488
489
## Save Operations
490
491
### Saving Data
492
493
**saveAsParquetFile**: Save as Parquet format
494
```scala { .api }
495
def saveAsParquetFile(path: String): Unit
496
```
497
498
**saveAsTable**: Save as persistent table
499
```scala { .api }
500
def saveAsTable(tableName: String): Unit
501
```
502
503
**insertInto**: Insert into existing table
504
```scala { .api }
505
def insertInto(tableName: String): Unit
506
def insertInto(tableName: String, overwrite: Boolean): Unit
507
```
508
509
```scala
510
val people = sqlContext.jsonFile("people.json")
511
512
// Save as Parquet (recommended for performance)
513
people.saveAsParquetFile("people.parquet")
514
515
// Save as persistent table (requires Hive support)
516
people.saveAsTable("people_table")
517
518
// Insert into existing table
519
people.insertInto("existing_people_table")
520
521
// Overwrite existing table
522
people.insertInto("existing_people_table", overwrite = true)
523
```
524
525
## Caching and Performance
526
527
### Caching Tables
528
529
**cacheTable**: Cache table in memory
530
```scala { .api }
531
def cacheTable(tableName: String): Unit
532
```
533
534
**uncacheTable**: Remove table from cache
535
```scala { .api }
536
def uncacheTable(tableName: String): Unit
537
```
538
539
```scala
540
// Register and cache table
541
people.registerAsTable("people")
542
sqlContext.cacheTable("people")
543
544
// Now queries will use cached data
545
val adults = sqlContext.sql("SELECT * FROM people WHERE age >= 18")
546
val seniors = sqlContext.sql("SELECT * FROM people WHERE age >= 65")
547
548
// Remove from cache when done
549
sqlContext.uncacheTable("people")
550
```
551
552
### Performance Optimization
553
554
```scala
555
// Cache frequently accessed SchemaRDDs
556
val cachedPeople = people.cache()
557
558
// Use Parquet for better performance
559
val parquetPeople = sqlContext.parquetFile("people.parquet")
560
561
// Repartition for better parallelism
562
val repartitioned = people.repartition(10)
563
564
// Coalesce to reduce small files
565
val coalesced = people.coalesce(1)
566
```
567
568
## Configuration and Settings
569
570
```scala
571
// Access SQL configuration
572
val sqlConf = sqlContext.conf
573
574
// Set configuration properties
575
sqlConf.setConf("spark.sql.shuffle.partitions", "200")
576
sqlConf.setConf("spark.sql.codegen", "true")
577
578
// Get configuration values
579
val shufflePartitions = sqlConf.getConf("spark.sql.shuffle.partitions")
580
val codegenEnabled = sqlConf.getConf("spark.sql.codegen")
581
```
582
583
## Advanced Usage Patterns
584
585
### Complex Data Processing
586
587
```scala
588
// Complex analytical query
589
val analysis = sqlContext.sql("""
590
SELECT
591
CASE
592
WHEN age < 18 THEN 'Minor'
593
WHEN age < 65 THEN 'Adult'
594
ELSE 'Senior'
595
END as age_group,
596
COUNT(*) as count,
597
AVG(age) as avg_age
598
FROM people
599
WHERE age IS NOT NULL
600
GROUP BY
601
CASE
602
WHEN age < 18 THEN 'Minor'
603
WHEN age < 65 THEN 'Adult'
604
ELSE 'Senior'
605
END
606
ORDER BY avg_age
607
""")
608
609
analysis.show()
610
```
611
612
### Window Functions (Limited Support)
613
614
```scala
615
// Basic ranking within groups (limited in Spark 1.0)
616
val ranked = sqlContext.sql("""
617
SELECT name, age, department,
618
ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank
619
FROM employees
620
""")
621
```
622
623
This comprehensive guide covers the Spark SQL API available in Spark 1.0.0 for working with structured data using both SQL and programmatic DataFrame operations.