CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

sql.mddocs/

Spark SQL

Spark SQL provides a programming interface for working with structured and semi-structured data. It allows querying data via SQL and the DataFrame API, with support for various data sources including JSON, Parquet, and Hive tables.

SQLContext

The main entry point for Spark SQL functionality.

SQLContext Class

class SQLContext(sparkContext: SparkContext) extends Serializable with Logging {
  // Alternative constructor for existing HiveContext compatibility
  def this(sparkContext: SparkContext, cacheManager: CacheManager) = this(sparkContext)
}

Creating SQLContext

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

val conf = new SparkConf().setAppName("SQL App").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// Import implicits for implicit conversions
import sqlContext.implicits._

Data Sources

JSON Files

jsonFile: Read JSON files as SchemaRDD

def jsonFile(path: String): SchemaRDD
// Read JSON file
val people = sqlContext.jsonFile("people.json")

// Show schema
people.printSchema()

// Show data
people.show()

// Example JSON structure:
// {"name":"Michael"}
// {"name":"Andy", "age":30}
// {"name":"Justin", "age":19}

jsonRDD: Create SchemaRDD from RDD of JSON strings

def jsonRDD(json: RDD[String]): SchemaRDD
// Create JSON RDD
val jsonStrings = sc.parallelize(Array(
  """{"name":"Alice", "age":25}""",
  """{"name":"Bob", "age":30}""",
  """{"name":"Charlie", "age":35}"""
))

val jsonDF = sqlContext.jsonRDD(jsonStrings)

Parquet Files

parquetFile: Read Parquet files

def parquetFile(path: String): SchemaRDD
// Read Parquet file
val parquetData = sqlContext.parquetFile("users.parquet")

// Parquet automatically preserves schema
parquetData.printSchema()

RDD to SchemaRDD Conversion

createSchemaRDD: Convert RDD to SchemaRDD

def createSchemaRDD[A <: Product : TypeTag](rdd: RDD[A]): SchemaRDD
// Define case class for schema
case class Person(name: String, age: Int)

// Create RDD of case class instances
val peopleRDD = sc.parallelize(Seq(
  Person("Alice", 25),
  Person("Bob", 30),
  Person("Charlie", 35)
))

// Convert to SchemaRDD
val peopleDF = sqlContext.createSchemaRDD(peopleRDD)

SchemaRDD

The main data structure for structured data in Spark SQL.

SchemaRDD Class

class SchemaRDD(sqlContext: SQLContext, logicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
  // Schema operations
  def printSchema(): Unit
  def schema: StructType
  
  // Registration
  def registerAsTable(tableName: String): Unit
  def registerTempTable(tableName: String): Unit
  
  // Transformations  
  def select(exprs: ColumnExpression*): SchemaRDD
  def where(condition: ColumnExpression): SchemaRDD
  def filter(condition: ColumnExpression): SchemaRDD
  def groupBy(cols: ColumnExpression*): GroupedSchemaRDD
  def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD
  def limit(n: Int): SchemaRDD
  def unionAll(other: SchemaRDD): SchemaRDD
  def intersect(other: SchemaRDD): SchemaRDD
  def except(other: SchemaRDD): SchemaRDD
  def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String): SchemaRDD
  
  // Actions
  def show(): Unit
  def collect(): Array[Row]
  def count(): Long
  def first(): Row
  def take(n: Int): Array[Row]
  
  // Save operations
  def saveAsParquetFile(path: String): Unit
  def saveAsTable(tableName: String): Unit
  def insertInto(tableName: String): Unit
  def insertInto(tableName: String, overwrite: Boolean): Unit
}

Schema Operations

val people = sqlContext.jsonFile("people.json")

// Print schema in tree format
people.printSchema()
// Output:
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Access schema programmatically
val schema = people.schema
println(s"Schema has ${schema.fields.length} fields")

schema.fields.foreach { field =>
  println(s"Field: ${field.name}, Type: ${field.dataType}, Nullable: ${field.nullable}")
}

Basic Operations

// Show first 20 rows
people.show()

// Show custom number of rows
people.show(10)

// Collect all data (be careful with large datasets)
val allPeople = people.collect()
allPeople.foreach(println)

// Take first n rows
val firstFive = people.take(5)

// Count rows
val totalCount = people.count()

SQL Queries

Table Registration and Querying

sql: Execute SQL queries

def sql(sqlText: String): SchemaRDD

registerAsTable: Register SchemaRDD as temporary table

def registerAsTable(tableName: String): Unit
val people = sqlContext.jsonFile("people.json")

// Register as temporary table
people.registerAsTable("people")

// Execute SQL queries
val adults = sqlContext.sql("SELECT name, age FROM people WHERE age >= 18")
adults.show()

// More complex queries
val summary = sqlContext.sql("""
  SELECT 
    COUNT(*) as total_people,
    AVG(age) as avg_age,
    MIN(age) as min_age,
    MAX(age) as max_age
  FROM people
  WHERE age IS NOT NULL
""")

summary.show()

// Joins between tables
val addresses = sqlContext.jsonFile("addresses.json")
addresses.registerAsTable("addresses")

val joined = sqlContext.sql("""
  SELECT p.name, p.age, a.city
  FROM people p
  JOIN addresses a ON p.name = a.name
""")

Built-in Functions

// String functions
sqlContext.sql("SELECT name, UPPER(name), LENGTH(name) FROM people").show()

// Math functions
sqlContext.sql("SELECT age, SQRT(age), ROUND(age/10.0, 2) FROM people").show()

// Date functions (if date columns exist)
sqlContext.sql("SELECT name, YEAR(birth_date), MONTH(birth_date) FROM people").show()

// Aggregate functions
sqlContext.sql("""
  SELECT 
    COUNT(*) as count,
    SUM(age) as total_age,
    AVG(age) as avg_age,
    STDDEV(age) as stddev_age
  FROM people
  GROUP BY FLOOR(age/10) * 10
""").show()

DataFrame API (Programmatic)

Alternative to SQL for structured data operations.

Column Expressions

import org.apache.spark.sql.catalyst.expressions._

// Access columns
val nameCol = people("name")
val ageCol = people("age")

// Column operations
val agePlus10 = people("age") + 10
val upperName = Upper(people("name"))

Transformations

select: Choose columns and expressions

def select(exprs: ColumnExpression*): SchemaRDD
// Select specific columns
val namesAndAges = people.select(people("name"), people("age"))

// Select with expressions
val transformed = people.select(
  people("name"),
  people("age") + 1 as "age_next_year",
  Upper(people("name")) as "upper_name"
)

where/filter: Filter rows based on conditions

def where(condition: ColumnExpression): SchemaRDD
def filter(condition: ColumnExpression): SchemaRDD  // alias for where
// Filter adults
val adults = people.where(people("age") >= 18)

// Multiple conditions
val youngAdults = people.filter(
  people("age") >= 18 && people("age") < 30
)

// String operations
val namesWithA = people.where(people("name").startsWith("A"))

groupBy: Group data for aggregation

def groupBy(cols: ColumnExpression*): GroupedSchemaRDD
// Group by age ranges
val ageGroups = people.groupBy(people("age") / 10 * 10)
val ageCounts = ageGroups.count()

// Multiple grouping columns (if available)
val grouped = people.groupBy(people("department"), people("age") / 10 * 10)
val summary = grouped.agg(
  Count(people("name")) as "count",
  Avg(people("age")) as "avg_age"
)

orderBy: Sort data

def orderBy(sortCol: ColumnExpression, sortCols: ColumnExpression*): SchemaRDD
// Sort by age ascending
val sortedByAge = people.orderBy(people("age"))

// Sort by age descending
val sortedByAgeDesc = people.orderBy(people("age").desc)

// Multiple sort columns
val sorted = people.orderBy(people("age").desc, people("name"))

Joins

join: Join with another SchemaRDD

def join(right: SchemaRDD, joinExprs: ColumnExpression, joinType: String = "inner"): SchemaRDD
// Assume we have addresses SchemaRDD
val addresses = sqlContext.jsonFile("addresses.json")

// Inner join
val joined = people.join(
  addresses,
  people("name") === addresses("name"),
  "inner"
)

// Left outer join
val leftJoined = people.join(
  addresses,
  people("name") === addresses("name"),
  "left_outer"
)

// Join types: "inner", "left_outer", "right_outer", "full_outer"

Set Operations

val people1 = sqlContext.jsonFile("people1.json")
val people2 = sqlContext.jsonFile("people2.json")

// Union (must have same schema)
val allPeople = people1.unionAll(people2)

// Intersection
val common = people1.intersect(people2)

// Difference
val unique = people1.except(people2)

Data Types and Schema

Data Types

import org.apache.spark.sql.catalyst.types._

// Primitive types
StringType      // String
IntegerType     // Int
LongType        // Long
DoubleType      // Double
FloatType       // Float
BooleanType     // Boolean
BinaryType      // Array[Byte]
TimestampType   // java.sql.Timestamp

// Complex types
ArrayType(elementType: DataType, containsNull: Boolean)
MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)
StructType(fields: Array[StructField])

Schema Definition

import org.apache.spark.sql.catalyst.types._

// Define schema manually
val schema = StructType(Array(
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("addresses", ArrayType(StringType), nullable = true)
))

// Create SchemaRDD with predefined schema
val rowRDD = sc.parallelize(Seq(
  Row("Alice", 25, Array("123 Main St", "456 Oak Ave")),
  Row("Bob", 30, Array("789 Pine St"))
))

val schemaRDD = sqlContext.applySchema(rowRDD, schema)

Working with Row Objects

Row Class

abstract class Row extends Serializable {
  def length: Int
  def get(i: Int): Any
  def isNullAt(i: Int): Boolean
  
  // Typed getters
  def getString(i: Int): String
  def getInt(i: Int): Int
  def getLong(i: Int): Long
  def getDouble(i: Int): Double
  def getFloat(i: Int): Float
  def getBoolean(i: Int): Boolean
  def getAs[T](i: Int): T
  def getAs[T](fieldName: String): T
}
val people = sqlContext.jsonFile("people.json")
val rows = people.collect()

rows.foreach { row =>
  val name = row.getString(0)  // First field
  val age = row.getInt(1)      // Second field
  
  // Or by field name (if available)
  val nameByField = row.getAs[String]("name")
  val ageByField = row.getAs[Int]("age")
  
  println(s"$name is $age years old")
}

// Safe access with null checking
rows.foreach { row =>
  val name = if (row.isNullAt(0)) "Unknown" else row.getString(0)
  val age = if (row.isNullAt(1)) 0 else row.getInt(1)
  
  println(s"$name is $age years old")
}

Save Operations

Saving Data

saveAsParquetFile: Save as Parquet format

def saveAsParquetFile(path: String): Unit

saveAsTable: Save as persistent table

def saveAsTable(tableName: String): Unit

insertInto: Insert into existing table

def insertInto(tableName: String): Unit
def insertInto(tableName: String, overwrite: Boolean): Unit
val people = sqlContext.jsonFile("people.json")

// Save as Parquet (recommended for performance)
people.saveAsParquetFile("people.parquet")

// Save as persistent table (requires Hive support)
people.saveAsTable("people_table")

// Insert into existing table
people.insertInto("existing_people_table")

// Overwrite existing table
people.insertInto("existing_people_table", overwrite = true)

Caching and Performance

Caching Tables

cacheTable: Cache table in memory

def cacheTable(tableName: String): Unit

uncacheTable: Remove table from cache

def uncacheTable(tableName: String): Unit
// Register and cache table
people.registerAsTable("people")
sqlContext.cacheTable("people")

// Now queries will use cached data
val adults = sqlContext.sql("SELECT * FROM people WHERE age >= 18")
val seniors = sqlContext.sql("SELECT * FROM people WHERE age >= 65")

// Remove from cache when done
sqlContext.uncacheTable("people")

Performance Optimization

// Cache frequently accessed SchemaRDDs
val cachedPeople = people.cache()

// Use Parquet for better performance
val parquetPeople = sqlContext.parquetFile("people.parquet")

// Repartition for better parallelism
val repartitioned = people.repartition(10)

// Coalesce to reduce small files
val coalesced = people.coalesce(1)

Configuration and Settings

// Access SQL configuration
val sqlConf = sqlContext.conf

// Set configuration properties
sqlConf.setConf("spark.sql.shuffle.partitions", "200")
sqlConf.setConf("spark.sql.codegen", "true")

// Get configuration values
val shufflePartitions = sqlConf.getConf("spark.sql.shuffle.partitions")
val codegenEnabled = sqlConf.getConf("spark.sql.codegen")

Advanced Usage Patterns

Complex Data Processing

// Complex analytical query
val analysis = sqlContext.sql("""
  SELECT 
    CASE 
      WHEN age < 18 THEN 'Minor'
      WHEN age < 65 THEN 'Adult'
      ELSE 'Senior'
    END as age_group,
    COUNT(*) as count,
    AVG(age) as avg_age
  FROM people
  WHERE age IS NOT NULL
  GROUP BY 
    CASE 
      WHEN age < 18 THEN 'Minor'
      WHEN age < 65 THEN 'Adult'
      ELSE 'Senior'
    END
  ORDER BY avg_age
""")

analysis.show()

Window Functions (Limited Support)

// Basic ranking within groups (limited in Spark 1.0)
val ranked = sqlContext.sql("""
  SELECT name, age, department,
    ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank
  FROM employees
""")

This comprehensive guide covers the Spark SQL API available in Spark 1.0.0 for working with structured data using both SQL and programmatic DataFrame operations.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json