Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
Spark SQL provides a programming interface for working with structured and semi-structured data. It allows querying data via SQL and the DataFrame API, with support for various data sources including JSON, Parquet, and Hive tables.
The main entry point for Spark SQL functionality.
class SQLContext(sparkContext: SparkContext) extends Serializable with Logging {
// Alternative constructor for existing HiveContext compatibility
def this(sparkContext: SparkContext, cacheManager: CacheManager) = this(sparkContext)
}import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
val conf = new SparkConf().setAppName("SQL App").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Import implicits for implicit conversions
import sqlContext.implicits._jsonFile: Read JSON files as SchemaRDD
def jsonFile(path: String): SchemaRDD// Read JSON file
val people = sqlContext.jsonFile("people.json")
// Show schema
people.printSchema()
// Show data
people.show()
// Example JSON structure:
// {"name":"Michael"}
// {"name":"Andy", "age":30}
// {"name":"Justin", "age":19}jsonRDD: Create SchemaRDD from RDD of JSON strings
def jsonRDD(json: RDD[String]): SchemaRDD// Create JSON RDD
val jsonStrings = sc.parallelize(Array(
"""{"name":"Alice", "age":25}""",
"""{"name":"Bob", "age":30}""",
"""{"name":"Charlie", "age":35}"""
))
val jsonDF = sqlContext.jsonRDD(jsonStrings)parquetFile: Read Parquet files
def parquetFile(path: String): SchemaRDD// Read Parquet file
val parquetData = sqlContext.parquetFile("users.parquet")
// Parquet automatically preserves schema
parquetData.printSchema()createSchemaRDD: Convert RDD to SchemaRDD
def createSchemaRDD[A <: Product : TypeTag](rdd: RDD[A]): SchemaRDD// Define case class for schema
case class Person(name: String, age: Int)
// Create RDD of case class instances
val peopleRDD = sc.parallelize(Seq(
Person("Alice", 25),
Person("Bob", 30),
Person("Charlie", 35)
))
// Convert to SchemaRDD
val peopleDF = sqlContext.createSchemaRDD(peopleRDD)The main data structure for structured data in Spark SQL.
class SchemaRDD(sqlContext: SQLContext, logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
// Schema operations
def printSchema(): Unit
def schema: StructType
// Registration
def registerAsTable(tableName: String): Unit
def registerTempTable(tableName: String): Unit
// Transformations
def select(exprs: ColumnExpression*): SchemaRDD
def where(condition: ColumnExpression): SchemaRDD
def filter(condition: ColumnExpression): SchemaRDD
def groupBy(cols: ColumnExpression*): GroupedSchemaRDD
def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD
def limit(n: Int): SchemaRDD
def unionAll(other: SchemaRDD): SchemaRDD
def intersect(other: SchemaRDD): SchemaRDD
def except(other: SchemaRDD): SchemaRDD
def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String): SchemaRDD
// Actions
def show(): Unit
def collect(): Array[Row]
def count(): Long
def first(): Row
def take(n: Int): Array[Row]
// Save operations
def saveAsParquetFile(path: String): Unit
def saveAsTable(tableName: String): Unit
def insertInto(tableName: String): Unit
def insertInto(tableName: String, overwrite: Boolean): Unit
}val people = sqlContext.jsonFile("people.json")
// Print schema in tree format
people.printSchema()
// Output:
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Access schema programmatically
val schema = people.schema
println(s"Schema has ${schema.fields.length} fields")
schema.fields.foreach { field =>
println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")
}// Show first 20 rows
people.show()
// Show custom number of rows
people.show(10)
// Collect all data (be careful with large datasets)
val allPeople = people.collect()
allPeople.foreach(println)
// Take first n rows
val firstFive = people.take(5)
// Count rows
val totalCount = people.count()sql: Execute SQL queries
def sql(sqlText: String): SchemaRDDregisterAsTable: Register SchemaRDD as temporary table
def registerAsTable(tableName: String): Unitval people = sqlContext.jsonFile("people.json")
// Register as temporary table
people.registerAsTable("people")
// Execute SQL queries
val adults = sqlContext.sql("SELECT name, age FROM people WHERE age >= 18")
adults.show()
// More complex queries
val summary = sqlContext.sql("""
SELECT
COUNT(*) as total_people,
AVG(age) as avg_age,
MIN(age) as min_age,
MAX(age) as max_age
FROM people
WHERE age IS NOT NULL
""")
summary.show()
// Joins between tables
val addresses = sqlContext.jsonFile("addresses.json")
addresses.registerAsTable("addresses")
val joined = sqlContext.sql("""
SELECT p.name, p.age, a.city
FROM people p
JOIN addresses a ON p.name = a.name
""")// String functions
sqlContext.sql("SELECT name, UPPER(name), LENGTH(name) FROM people").show()
// Math functions
sqlContext.sql("SELECT age, SQRT(age), ROUND(age/10.0, 2) FROM people").show()
// Date functions (if date columns exist)
sqlContext.sql("SELECT name, YEAR(birth_date), MONTH(birth_date) FROM people").show()
// Aggregate functions
sqlContext.sql("""
SELECT
COUNT(*) as count,
SUM(age) as total_age,
AVG(age) as avg_age,
STDDEV(age) as stddev_age
FROM people
GROUP BY FLOOR(age/10) * 10
""").show()Alternative to SQL for structured data operations.
import org.apache.spark.sql.catalyst.expressions._
// Access columns
val nameCol = people("name")
val ageCol = people("age")
// Column operations
val agePlus10 = people("age") + 10
val upperName = Upper(people("name"))select: Choose columns and expressions
def select(exprs: ColumnExpression*): SchemaRDD// Select specific columns
val namesAndAges = people.select(people("name"), people("age"))
// Select with expressions
val transformed = people.select(
people("name"),
people("age") + 1 as "age_next_year",
Upper(people("name")) as "upper_name"
)where/filter: Filter rows based on conditions
def where(condition: ColumnExpression): SchemaRDD
def filter(condition: ColumnExpression): SchemaRDD // alias for where// Filter adults
val adults = people.where(people("age") >= 18)
// Multiple conditions
val youngAdults = people.filter(
people("age") >= 18 && people("age") < 30
)
// String operations
val namesWithA = people.where(people("name").startsWith("A"))groupBy: Group data for aggregation
def groupBy(cols: ColumnExpression*): GroupedSchemaRDD// Group by age ranges
val ageGroups = people.groupBy(people("age") / 10 * 10)
val ageCounts = ageGroups.count()
// Multiple grouping columns (if available)
val grouped = people.groupBy(people("department"), people("age") / 10 * 10)
val summary = grouped.agg(
Count(people("name")) as "count",
Avg(people("age")) as "avg_age"
)orderBy: Sort data
def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD// Sort by age ascending
val sortedByAge = people.orderBy(people("age"))
// Sort by age descending
val sortedByAgeDesc = people.orderBy(people("age").desc)
// Multiple sort columns
val sorted = people.orderBy(people("age").desc, people("name"))join: Join with another SchemaRDD
def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String = "inner"): SchemaRDD// Assume we have addresses SchemaRDD
val addresses = sqlContext.jsonFile("addresses.json")
// Inner join
val joined = people.join(
addresses,
people("name") === addresses("name"),
"inner"
)
// Left outer join
val leftJoined = people.join(
addresses,
people("name") === addresses("name"),
"left_outer"
)
// Join types: "inner", "left_outer", "right_outer", "full_outer"val people1 = sqlContext.jsonFile("people1.json")
val people2 = sqlContext.jsonFile("people2.json")
// Union (must have same schema)
val allPeople = people1.unionAll(people2)
// Intersection
val common = people1.intersect(people2)
// Difference
val unique = people1.except(people2)import org.apache.spark.sql.catalyst.types._
// Primitive types
StringType // String
IntegerType // Int
LongType // Long
DoubleType // Double
FloatType // Float
BooleanType // Boolean
BinaryType // Array[Byte]
TimestampType // java.sql.Timestamp
// Complex types
ArrayType(elementType: DataType, containsNull: Boolean)
MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)
StructType(fields: Array[StructField])import org.apache.spark.sql.catalyst.types._
// Define schema manually
val schema = StructType(Array(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("addresses", ArrayType(StringType), nullable = true)
))
// Create SchemaRDD with predefined schema
val rowRDD = sc.parallelize(Seq(
Row("Alice", 25, Array("123 Main St", "456 Oak Ave")),
Row("Bob", 30, Array("789 Pine St"))
))
val schemaRDD = sqlContext.applySchema(rowRDD, schema)abstract class Row extends Serializable {
def length: Int
def get(i: Int): Any
def isNullAt(i: Int): Boolean
// Typed getters
def getString(i: Int): String
def getInt(i: Int): Int
def getLong(i: Int): Long
def getDouble(i: Int): Double
def getFloat(i: Int): Float
def getBoolean(i: Int): Boolean
def getAs[T](i: Int): T
def getAs[T](fieldName: String): T
}val people = sqlContext.jsonFile("people.json")
val rows = people.collect()
rows.foreach { row =>
val name = row.getString(0) // First field
val age = row.getInt(1) // Second field
// Or by field name (if available)
val nameByField = row.getAs[String]("name")
val ageByField = row.getAs[Int]("age")
println(s"$name is $age years old")
}
// Safe access with null checking
rows.foreach { row =>
val name = if (row.isNullAt(0)) "Unknown" else row.getString(0)
val age = if (row.isNullAt(1)) 0 else row.getInt(1)
println(s"$name is $age years old")
}saveAsParquetFile: Save as Parquet format
def saveAsParquetFile(path: String): UnitsaveAsTable: Save as persistent table
def saveAsTable(tableName: String): UnitinsertInto: Insert into existing table
def insertInto(tableName: String): Unit
def insertInto(tableName: String, overwrite: Boolean): Unitval people = sqlContext.jsonFile("people.json")
// Save as Parquet (recommended for performance)
people.saveAsParquetFile("people.parquet")
// Save as persistent table (requires Hive support)
people.saveAsTable("people_table")
// Insert into existing table
people.insertInto("existing_people_table")
// Overwrite existing table
people.insertInto("existing_people_table", overwrite = true)cacheTable: Cache table in memory
def cacheTable(tableName: String): UnituncacheTable: Remove table from cache
def uncacheTable(tableName: String): Unit// Register and cache table
people.registerAsTable("people")
sqlContext.cacheTable("people")
// Now queries will use cached data
val adults = sqlContext.sql("SELECT * FROM people WHERE age >= 18")
val seniors = sqlContext.sql("SELECT * FROM people WHERE age >= 65")
// Remove from cache when done
sqlContext.uncacheTable("people")// Cache frequently accessed SchemaRDDs
val cachedPeople = people.cache()
// Use Parquet for better performance
val parquetPeople = sqlContext.parquetFile("people.parquet")
// Repartition for better parallelism
val repartitioned = people.repartition(10)
// Coalesce to reduce small files
val coalesced = people.coalesce(1)// Access SQL configuration
val sqlConf = sqlContext.conf
// Set configuration properties
sqlConf.setConf("spark.sql.shuffle.partitions", "200")
sqlConf.setConf("spark.sql.codegen", "true")
// Get configuration values
val shufflePartitions = sqlConf.getConf("spark.sql.shuffle.partitions")
val codegenEnabled = sqlConf.getConf("spark.sql.codegen")// Complex analytical query
val analysis = sqlContext.sql("""
SELECT
CASE
WHEN age < 18 THEN 'Minor'
WHEN age < 65 THEN 'Adult'
ELSE 'Senior'
END as age_group,
COUNT(*) as count,
AVG(age) as avg_age
FROM people
WHERE age IS NOT NULL
GROUP BY
CASE
WHEN age < 18 THEN 'Minor'
WHEN age < 65 THEN 'Adult'
ELSE 'Senior'
END
ORDER BY avg_age
""")
analysis.show()// Basic ranking within groups (limited in Spark 1.0)
val ranked = sqlContext.sql("""
SELECT name, age, department,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank
FROM employees
""")This comprehensive guide covers the Spark SQL API available in Spark 1.0.0 for working with structured data using both SQL and programmatic DataFrame operations.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark