Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

sql.md docs/

1
# Spark SQL
2
3
Spark SQL provides a programming interface for working with structured and semi-structured data. It allows querying data via SQL and the DataFrame API, with support for various data sources including JSON, Parquet, and Hive tables.
4
5
## SQLContext
6
7
The main entry point for Spark SQL functionality.
8
9
### SQLContext Class
10
11
```scala { .api }
12
class SQLContext(sparkContext: SparkContext) extends Serializable with Logging {
13
// Alternative constructor for existing HiveContext compatibility
14
def this(sparkContext: SparkContext, cacheManager: CacheManager) = this(sparkContext)
15
}
16
```
17
18
### Creating SQLContext
19
20
```scala
21
import org.apache.spark.sql.SQLContext
22
import org.apache.spark.{SparkContext, SparkConf}
23
24
val conf = new SparkConf().setAppName("SQL App").setMaster("local")
25
val sc = new SparkContext(conf)
26
val sqlContext = new SQLContext(sc)
27
28
// Import implicits for implicit conversions
29
import sqlContext.implicits._
30
```
31
32
## Data Sources
33
34
### JSON Files
35
36
**jsonFile**: Read JSON files as SchemaRDD
37
```scala { .api }
38
def jsonFile(path: String): SchemaRDD
39
```
40
41
```scala
42
// Read JSON file
43
val people = sqlContext.jsonFile("people.json")
44
45
// Show schema
46
people.printSchema()
47
48
// Show data
49
people.show()
50
51
// Example JSON structure:
52
// {"name":"Michael"}
53
// {"name":"Andy", "age":30}
54
// {"name":"Justin", "age":19}
55
```
56
57
**jsonRDD**: Create SchemaRDD from RDD of JSON strings
58
```scala { .api }
59
def jsonRDD(json: RDD[String]): SchemaRDD
60
```
61
62
```scala
63
// Create JSON RDD
64
val jsonStrings = sc.parallelize(Array(
65
"""{"name":"Alice", "age":25}""",
66
"""{"name":"Bob", "age":30}""",
67
"""{"name":"Charlie", "age":35}"""
68
))
69
70
val jsonDF = sqlContext.jsonRDD(jsonStrings)
71
```
72
73
### Parquet Files
74
75
**parquetFile**: Read Parquet files
76
```scala { .api }
77
def parquetFile(path: String): SchemaRDD
78
```
79
80
```scala
81
// Read Parquet file
82
val parquetData = sqlContext.parquetFile("users.parquet")
83
84
// Parquet automatically preserves schema
85
parquetData.printSchema()
86
```
87
88
### RDD to SchemaRDD Conversion
89
90
**createSchemaRDD**: Convert RDD to SchemaRDD
91
```scala { .api }
92
def createSchemaRDD[A <: Product : TypeTag](rdd: RDD[A]): SchemaRDD
93
```
94
95
```scala
96
// Define case class for schema
97
case class Person(name: String, age: Int)
98
99
// Create RDD of case class instances
100
val peopleRDD = sc.parallelize(Seq(
101
Person("Alice", 25),
102
Person("Bob", 30),
103
Person("Charlie", 35)
104
))
105
106
// Convert to SchemaRDD
107
val peopleDF = sqlContext.createSchemaRDD(peopleRDD)
108
```
109
110
## SchemaRDD
111
112
The main data structure for structured data in Spark SQL.
113
114
### SchemaRDD Class
115
116
```scala { .api }
117
class SchemaRDD(sqlContext: SQLContext, logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
118
// Schema operations
119
def printSchema(): Unit
120
def schema: StructType
121
122
// Registration
123
def registerAsTable(tableName: String): Unit
124
def registerTempTable(tableName: String): Unit
125
126
// Transformations
127
def select(exprs: ColumnExpression*): SchemaRDD
128
def where(condition: ColumnExpression): SchemaRDD
129
def filter(condition: ColumnExpression): SchemaRDD
130
def groupBy(cols: ColumnExpression*): GroupedSchemaRDD
131
def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD
132
def limit(n: Int): SchemaRDD
133
def unionAll(other: SchemaRDD): SchemaRDD
134
def intersect(other: SchemaRDD): SchemaRDD
135
def except(other: SchemaRDD): SchemaRDD
136
def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String): SchemaRDD
137
138
// Actions
139
def show(): Unit
140
def collect(): Array[Row]
141
def count(): Long
142
def first(): Row
143
def take(n: Int): Array[Row]
144
145
// Save operations
146
def saveAsParquetFile(path: String): Unit
147
def saveAsTable(tableName: String): Unit
148
def insertInto(tableName: String): Unit
149
def insertInto(tableName: String, overwrite: Boolean): Unit
150
}
151
```
152
153
### Schema Operations
154
155
```scala
156
val people = sqlContext.jsonFile("people.json")
157
158
// Print schema in tree format
159
people.printSchema()
160
// Output:
161
// root
162
// |-- age: long (nullable = true)
163
// |-- name: string (nullable = true)
164
165
// Access schema programmatically
166
val schema = people.schema
167
println(s"Schema has ${schema.fields.length} fields")
168
169
schema.fields.foreach { field =>
170
println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")
171
}
172
```
173
174
### Basic Operations
175
176
```scala
177
// Show first 20 rows
178
people.show()
179
180
// Show custom number of rows
181
people.show(10)
182
183
// Collect all data (be careful with large datasets)
184
val allPeople = people.collect()
185
allPeople.foreach(println)
186
187
// Take first n rows
188
val firstFive = people.take(5)
189
190
// Count rows
191
val totalCount = people.count()
192
```
193
194
## SQL Queries
195
196
### Table Registration and Querying
197
198
**sql**: Execute SQL queries
199
```scala { .api }
200
def sql(sqlText: String): SchemaRDD
201
```
202
203
**registerAsTable**: Register SchemaRDD as temporary table
204
```scala { .api }
205
def registerAsTable(tableName: String): Unit
206
```
207
208
```scala
209
val people = sqlContext.jsonFile("people.json")
210
211
// Register as temporary table
212
people.registerAsTable("people")
213
214
// Execute SQL queries
215
val adults = sqlContext.sql("SELECT name, age FROM people WHERE age >= 18")
216
adults.show()
217
218
// More complex queries
219
val summary = sqlContext.sql("""
220
SELECT
221
COUNT(*) as total_people,
222
AVG(age) as avg_age,
223
MIN(age) as min_age,
224
MAX(age) as max_age
225
FROM people
226
WHERE age IS NOT NULL
227
""")
228
229
summary.show()
230
231
// Joins between tables
232
val addresses = sqlContext.jsonFile("addresses.json")
233
addresses.registerAsTable("addresses")
234
235
val joined = sqlContext.sql("""
236
SELECT p.name, p.age, a.city
237
FROM people p
238
JOIN addresses a ON p.name = a.name
239
""")
240
```
241
242
### Built-in Functions
243
244
```scala
245
// String functions
246
sqlContext.sql("SELECT name, UPPER(name), LENGTH(name) FROM people").show()
247
248
// Math functions
249
sqlContext.sql("SELECT age, SQRT(age), ROUND(age/10.0, 2) FROM people").show()
250
251
// Date functions (if date columns exist)
252
sqlContext.sql("SELECT name, YEAR(birth_date), MONTH(birth_date) FROM people").show()
253
254
// Aggregate functions
255
sqlContext.sql("""
256
SELECT
257
COUNT(*) as count,
258
SUM(age) as total_age,
259
AVG(age) as avg_age,
260
STDDEV(age) as stddev_age
261
FROM people
262
GROUP BY FLOOR(age/10) * 10
263
""").show()
264
```
265
266
## DataFrame API (Programmatic)
267
268
Alternative to SQL for structured data operations.
269
270
### Column Expressions
271
272
```scala
273
import org.apache.spark.sql.catalyst.expressions._
274
275
// Access columns
276
val nameCol = people("name")
277
val ageCol = people("age")
278
279
// Column operations
280
val agePlus10 = people("age") + 10
281
val upperName = Upper(people("name"))
282
```
283
284
### Transformations
285
286
**select**: Choose columns and expressions
287
```scala { .api }
288
def select(exprs: ColumnExpression*): SchemaRDD
289
```
290
291
```scala
292
// Select specific columns
293
val namesAndAges = people.select(people("name"), people("age"))
294
295
// Select with expressions
296
val transformed = people.select(
297
people("name"),
298
people("age") + 1 as "age_next_year",
299
Upper(people("name")) as "upper_name"
300
)
301
```
302
303
**where/filter**: Filter rows based on conditions
304
```scala { .api }
305
def where(condition: ColumnExpression): SchemaRDD
306
def filter(condition: ColumnExpression): SchemaRDD // alias for where
307
```
308
309
```scala
310
// Filter adults
311
val adults = people.where(people("age") >= 18)
312
313
// Multiple conditions
314
val youngAdults = people.filter(
315
people("age") >= 18 && people("age") < 30
316
)
317
318
// String operations
319
val namesWithA = people.where(people("name").startsWith("A"))
320
```
321
322
**groupBy**: Group data for aggregation
323
```scala { .api }
324
def groupBy(cols: ColumnExpression*): GroupedSchemaRDD
325
```
326
327
```scala
328
// Group by age ranges
329
val ageGroups = people.groupBy(people("age") / 10 * 10)
330
val ageCounts = ageGroups.count()
331
332
// Multiple grouping columns (if available)
333
val grouped = people.groupBy(people("department"), people("age") / 10 * 10)
334
val summary = grouped.agg(
335
Count(people("name")) as "count",
336
Avg(people("age")) as "avg_age"
337
)
338
```
339
340
**orderBy**: Sort data
341
```scala { .api }
342
def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD
343
```
344
345
```scala
346
// Sort by age ascending
347
val sortedByAge = people.orderBy(people("age"))
348
349
// Sort by age descending
350
val sortedByAgeDesc = people.orderBy(people("age").desc)
351
352
// Multiple sort columns
353
val sorted = people.orderBy(people("age").desc, people("name"))
354
```
355
356
### Joins
357
358
**join**: Join with another SchemaRDD
359
```scala { .api }
360
def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String = "inner"): SchemaRDD
361
```
362
363
```scala
364
// Assume we have addresses SchemaRDD
365
val addresses = sqlContext.jsonFile("addresses.json")
366
367
// Inner join
368
val joined = people.join(
369
addresses,
370
people("name") === addresses("name"),
371
"inner"
372
)
373
374
// Left outer join
375
val leftJoined = people.join(
376
addresses,
377
people("name") === addresses("name"),
378
"left_outer"
379
)
380
381
// Join types: "inner", "left_outer", "right_outer", "full_outer"
382
```
383
384
### Set Operations
385
386
```scala
387
val people1 = sqlContext.jsonFile("people1.json")
388
val people2 = sqlContext.jsonFile("people2.json")
389
390
// Union (must have same schema)
391
val allPeople = people1.unionAll(people2)
392
393
// Intersection
394
val common = people1.intersect(people2)
395
396
// Difference
397
val unique = people1.except(people2)
398
```
399
400
## Data Types and Schema
401
402
### Data Types
403
404
```scala { .api }
405
import org.apache.spark.sql.catalyst.types._
406
407
// Primitive types
408
StringType // String
409
IntegerType // Int
410
LongType // Long
411
DoubleType // Double
412
FloatType // Float
413
BooleanType // Boolean
414
BinaryType // Array[Byte]
415
TimestampType // java.sql.Timestamp
416
417
// Complex types
418
ArrayType(elementType: DataType, containsNull: Boolean)
419
MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)
420
StructType(fields: Array[StructField])
421
```
422
423
### Schema Definition
424
425
```scala
426
import org.apache.spark.sql.catalyst.types._
427
428
// Define schema manually
429
val schema = StructType(Array(
430
StructField("name", StringType, nullable = true),
431
StructField("age", IntegerType, nullable = true),
432
StructField("addresses", ArrayType(StringType), nullable = true)
433
))
434
435
// Create SchemaRDD with predefined schema
436
val rowRDD = sc.parallelize(Seq(
437
Row("Alice", 25, Array("123 Main St", "456 Oak Ave")),
438
Row("Bob", 30, Array("789 Pine St"))
439
))
440
441
val schemaRDD = sqlContext.applySchema(rowRDD, schema)
442
```
443
444
## Working with Row Objects
445
446
### Row Class
447
448
```scala { .api }
449
abstract class Row extends Serializable {
450
def length: Int
451
def get(i: Int): Any
452
def isNullAt(i: Int): Boolean
453
454
// Typed getters
455
def getString(i: Int): String
456
def getInt(i: Int): Int
457
def getLong(i: Int): Long
458
def getDouble(i: Int): Double
459
def getFloat(i: Int): Float
460
def getBoolean(i: Int): Boolean
461
def getAs[T](i: Int): T
462
def getAs[T](fieldName: String): T
463
}
464
```
465
466
```scala
467
val people = sqlContext.jsonFile("people.json")
468
val rows = people.collect()
469
470
rows.foreach { row =>
471
val name = row.getString(0) // First field
472
val age = row.getInt(1) // Second field
473
474
// Or by field name (if available)
475
val nameByField = row.getAs[String]("name")
476
val ageByField = row.getAs[Int]("age")
477
478
println(s"$name is $age years old")
479
}
480
481
// Safe access with null checking
482
rows.foreach { row =>
483
val name = if (row.isNullAt(0)) "Unknown" else row.getString(0)
484
val age = if (row.isNullAt(1)) 0 else row.getInt(1)
485
486
println(s"$name is $age years old")
487
}
488
```
489
490
## Save Operations
491
492
### Saving Data
493
494
**saveAsParquetFile**: Save as Parquet format
495
```scala { .api }
496
def saveAsParquetFile(path: String): Unit
497
```
498
499
**saveAsTable**: Save as persistent table
500
```scala { .api }
501
def saveAsTable(tableName: String): Unit
502
```
503
504
**insertInto**: Insert into existing table
505
```scala { .api }
506
def insertInto(tableName: String): Unit
507
def insertInto(tableName: String, overwrite: Boolean): Unit
508
```
509
510
```scala
511
val people = sqlContext.jsonFile("people.json")
512
513
// Save as Parquet (recommended for performance)
514
people.saveAsParquetFile("people.parquet")
515
516
// Save as persistent table (requires Hive support)
517
people.saveAsTable("people_table")
518
519
// Insert into existing table
520
people.insertInto("existing_people_table")
521
522
// Overwrite existing table
523
people.insertInto("existing_people_table", overwrite = true)
524
```
525
526
## Caching and Performance
527
528
### Caching Tables
529
530
**cacheTable**: Cache table in memory
531
```scala { .api }
532
def cacheTable(tableName: String): Unit
533
```
534
535
**uncacheTable**: Remove table from cache
536
```scala { .api }
537
def uncacheTable(tableName: String): Unit
538
```
539
540
```scala
541
// Register and cache table
542
people.registerAsTable("people")
543
sqlContext.cacheTable("people")
544
545
// Now queries will use cached data
546
val adults = sqlContext.sql("SELECT * FROM people WHERE age >= 18")
547
val seniors = sqlContext.sql("SELECT * FROM people WHERE age >= 65")
548
549
// Remove from cache when done
550
sqlContext.uncacheTable("people")
551
```
552
553
### Performance Optimization
554
555
```scala
556
// Cache frequently accessed SchemaRDDs
557
val cachedPeople = people.cache()
558
559
// Use Parquet for better performance
560
val parquetPeople = sqlContext.parquetFile("people.parquet")
561
562
// Repartition for better parallelism
563
val repartitioned = people.repartition(10)
564
565
// Coalesce to reduce small files
566
val coalesced = people.coalesce(1)
567
```
568
569
## Configuration and Settings
570
571
```scala
572
// Access SQL configuration
573
val sqlConf = sqlContext.conf
574
575
// Set configuration properties
576
sqlConf.setConf("spark.sql.shuffle.partitions", "200")
577
sqlConf.setConf("spark.sql.codegen", "true")
578
579
// Get configuration values
580
val shufflePartitions = sqlConf.getConf("spark.sql.shuffle.partitions")
581
val codegenEnabled = sqlConf.getConf("spark.sql.codegen")
582
```
583
584
## Advanced Usage Patterns
585
586
### Complex Data Processing
587
588
```scala
589
// Complex analytical query
590
val analysis = sqlContext.sql("""
591
SELECT
592
CASE
593
WHEN age < 18 THEN 'Minor'
594
WHEN age < 65 THEN 'Adult'
595
ELSE 'Senior'
596
END as age_group,
597
COUNT(*) as count,
598
AVG(age) as avg_age
599
FROM people
600
WHERE age IS NOT NULL
601
GROUP BY
602
CASE
603
WHEN age < 18 THEN 'Minor'
604
WHEN age < 65 THEN 'Adult'
605
ELSE 'Senior'
606
END
607
ORDER BY avg_age
608
""")
609
610
analysis.show()
611
```
612
613
### Window Functions (Limited Support)
614
615
```scala
616
// Basic ranking within groups (limited in Spark 1.0)
617
val ranked = sqlContext.sql("""
618
SELECT name, age, department,
619
ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank
620
FROM employees
621
""")
622
```
623
624
This comprehensive guide covers the Spark SQL API available in Spark 1.0.0 for working with structured data using both SQL and programmatic DataFrame operations.