Apache Spark SQL is a distributed SQL query engine that provides DataFrame and Dataset APIs for working with structured data.
DataFrames and Datasets are the core distributed data structures in Spark SQL. DataFrame is an alias for Dataset[Row], providing an untyped interface, while Dataset[T] offers compile-time type safety. Both share the same underlying APIs and optimizations through the Catalyst query engine.
type DataFrame = Dataset[Row]
class Dataset[T] extends Serializable {
def schema: StructType
def dtypes: Array[(String, String)]
def columns: Array[String]
def count(): Long
def isEmpty: Boolean
def isLocal: Boolean
def isStreaming: Boolean
}
class Row extends Serializable {
def length: Int
def size: Int
def get(i: Int): Any
def getAs[T](i: Int): T
def getAs[T](fieldName: String): T
def getString(i: Int): String
def getInt(i: Int): Int
def getLong(i: Int): Long
def getDouble(i: Int): Double
def getFloat(i: Int): Float
def getBoolean(i: Int): Boolean
def getDate(i: Int): java.sql.Date
def getTimestamp(i: Int): java.sql.Timestamp
}class Dataset[T] {
def schema: StructType
def dtypes: Array[(String, String)]
def columns: Array[String]
def printSchema(): Unit
def printSchema(level: Int): Unit
}Usage Examples:
import org.apache.spark.sql.types._
// Examine schema
val df = spark.read.json("people.json")
df.printSchema()
df.schema.foreach(field => println(s"${field.name}: ${field.dataType}"))
// Get column information
val columnNames = df.columns
val columnTypes = df.dtypes
// Check schema programmatically
val hasNameColumn = df.schema.exists(_.name == "name")
val nameField = df.schema.find(_.name == "name")class Dataset[T] {
def select(cols: Column*): DataFrame
def select(col: String, cols: String*): DataFrame
def selectExpr(exprs: String*): DataFrame
def drop(colNames: String*): Dataset[T]
def drop(col: Column): DataFrame
def withColumn(colName: String, col: Column): DataFrame
def withColumnRenamed(existingName: String, newName: String): DataFrame
}Usage Examples:
import org.apache.spark.sql.functions._
// Select columns
val selected = df.select("name", "age")
val selectedWithCol = df.select(col("name"), col("age") + 1)
// Select with expressions
val computed = df.selectExpr("name", "age + 1 as next_age", "upper(name) as upper_name")
// Add columns
val withFullName = df.withColumn("full_name", concat(col("first"), lit(" "), col("last")))
// Rename columns
val renamed = df.withColumnRenamed("old_name", "new_name")
// Drop columns
val dropped = df.drop("unwanted_column", "another_column")
val droppedByCol = df.drop(col("age"))class Dataset[T] {
def filter(condition: Column): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def where(condition: Column): Dataset[T]
def where(conditionExpr: String): Dataset[T]
}Usage Examples:
// Filter with Column expressions
val adults = df.filter(col("age") >= 18)
val activeUsers = df.filter(col("active") === true && col("last_login").isNotNull)
// Filter with SQL expressions
val filtered = df.filter("age >= 18 AND active = true")
val complex = df.where("age BETWEEN 25 AND 65 AND city IN ('New York', 'San Francisco')")
// Multiple conditions
val result = df
.filter(col("age") > 21)
.filter(col("country") === "US")
.filter(col("score").isNotNull)class Dataset[T] {
def sort(sortCol: String, sortCols: String*): Dataset[T]
def sort(sortExprs: Column*): Dataset[T]
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
def orderBy(sortExprs: Column*): Dataset[T]
}Usage Examples:
// Sort by column names
val sorted = df.sort("age", "name")
// Sort with Column expressions and directions
val ordered = df.orderBy(col("age").desc, col("name").asc)
// Complex sorting
val complexSort = df.orderBy(
col("department"),
col("salary").desc,
col("hire_date").asc
)class Dataset[T] {
def join(right: Dataset[_]): DataFrame
def join(right: Dataset[_], usingColumn: String): DataFrame
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
def join(right: Dataset[_], joinExprs: Column): DataFrame
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
def crossJoin(right: Dataset[_]): DataFrame
}Join Types: "inner", "cross", "outer", "full", "full_outer", "left", "left_outer", "right", "right_outer", "left_semi", "left_anti"
Usage Examples:
val users = spark.table("users")
val orders = spark.table("orders")
// Inner join (default)
val userOrders = users.join(orders, users("id") === orders("user_id"))
// Left outer join
val allUsers = users.join(orders, users("id") === orders("user_id"), "left_outer")
// Join on multiple columns
val joined = users.join(orders,
users("id") === orders("user_id") && users("region") === orders("region"))
// Join using column names (when columns have same names)
val simple = users.join(orders, "user_id")
val multiple = users.join(orders, Seq("user_id", "region"))
// Cross join
val cartesian = users.crossJoin(orders)class Dataset[T] {
def union(other: Dataset[T]): Dataset[T]
def unionAll(other: Dataset[T]): Dataset[T]
def unionByName(other: Dataset[T]): Dataset[T]
def intersect(other: Dataset[T]): Dataset[T]
def intersectAll(other: Dataset[T]): Dataset[T]
def except(other: Dataset[T]): Dataset[T]
def exceptAll(other: Dataset[T]): Dataset[T]
}Usage Examples:
val df1 = spark.range(1, 5).toDF("id")
val df2 = spark.range(3, 8).toDF("id")
// Union (removes duplicates in Spark 2.0+)
val combined = df1.union(df2)
// Union by column names (handles different column orders)
val byName = df1.unionByName(df2)
// Set operations
val intersection = df1.intersect(df2) // Values in both
val difference = df1.except(df2) // Values in df1 but not df2class Dataset[T] {
def distinct(): Dataset[T]
def dropDuplicates(): Dataset[T]
def dropDuplicates(colNames: Array[String]): Dataset[T]
def dropDuplicates(colNames: Seq[String]): Dataset[T]
def dropDuplicates(col1: String, cols: String*): Dataset[T]
}Usage Examples:
// Remove all duplicate rows
val unique = df.distinct()
// Remove duplicates based on specific columns
val uniqueUsers = df.dropDuplicates("user_id")
val uniqueByMultiple = df.dropDuplicates("user_id", "email")
val uniqueBySeq = df.dropDuplicates(Seq("user_id", "email"))class Dataset[T] {
def limit(n: Int): Dataset[T]
def sample(fraction: Double): Dataset[T]
def sample(fraction: Double, seed: Long): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
def sampleBy[K](col: String, fractions: Map[K, Double], seed: Long): Dataset[T]
}Usage Examples:
// Limit results
val top100 = df.limit(100)
// Random sampling
val sample10Percent = df.sample(0.1)
val sampleWithSeed = df.sample(false, 0.2, seed = 42)
// Stratified sampling by column values
val stratified = df.sampleBy("category", Map("A" -> 0.1, "B" -> 0.2), seed = 123)class Dataset[T] {
def count(): Long
def collect(): Array[T]
def collectAsList(): java.util.List[T]
def first(): T
def head(): T
def head(n: Int): Array[T]
def take(n: Int): Array[T]
def takeAsList(n: Int): java.util.List[T]
def tail(n: Int): Array[T]
def show(): Unit
def show(numRows: Int): Unit
def show(numRows: Int, truncate: Boolean): Unit
def show(numRows: Int, truncate: Int): Unit
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
}Usage Examples:
// Count rows
val totalRows = df.count()
// Collect data (use with caution on large datasets)
val allData = df.collect()
val firstRow = df.first()
val top10 = df.take(10)
// Display data
df.show() // Show 20 rows, truncate at 20 chars
df.show(50) // Show 50 rows
df.show(20, false) // Don't truncate strings
df.show(10, 100, true) // Vertical formatclass Dataset[T] {
def cache(): Dataset[T]
def persist(): Dataset[T]
def persist(newLevel: StorageLevel): Dataset[T]
def unpersist(): Dataset[T]
def unpersist(blocking: Boolean): Dataset[T]
def storageLevel: StorageLevel
def isStreaming: Boolean
}Usage Examples:
import org.apache.spark.storage.StorageLevel
// Cache in memory (default: MEMORY_AND_DISK)
val cached = df.cache()
// Persist with specific storage level
val persisted = df.persist(StorageLevel.MEMORY_ONLY)
val diskOnly = df.persist(StorageLevel.DISK_ONLY)
val memoryAndDiskSer = df.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Check storage level
val level = df.storageLevel
// Remove from cache
df.unpersist()
df.unpersist(blocking = true) // Wait for removal to completeclass Dataset[T] {
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
def map[U : Encoder](func: T => U): Dataset[U]
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U]
def filter(func: T => Boolean): Dataset[T]
def reduce(func: (T, T) => T): T
}Usage Examples:
// Type-safe operations (require Encoder for result type)
case class Person(name: String, age: Int)
val people: Dataset[Person] = spark.createDataFrame(peopleSeq).as[Person]
// Map transformation
val names = people.map(_.name)
val ages = people.map(_.age)
// Filter with function
val adults = people.filter(_.age >= 18)
// FlatMap
val words = people.flatMap(_.name.split(" "))
// Reduce
val totalAge = people.map(_.age).reduce(_ + _)
// Side effects
people.foreach(person => println(s"Person: ${person.name}"))
people.foreachPartition { iter =>
// Process partition
iter.foreach(println)
}class Dataset[T] {
def as[U : Encoder]: Dataset[U]
def toDF(): DataFrame
def toDF(colNames: String*): DataFrame
def rdd: RDD[T]
def javaRDD: JavaRDD[T]
def toJavaRDD: JavaRDD[T]
}Usage Examples:
// Convert to typed Dataset
case class User(id: Long, name: String, age: Int)
val typedUsers = df.as[User]
// Convert to DataFrame
val dataFrame = dataset.toDF()
val renamedDF = dataset.toDF("col1", "col2", "col3")
// Convert to RDD
val rdd = df.rdd
val userRDD = typedUsers.rdd
// Java interop
val javaRDD = df.toJavaRDDclass Dataset[T] {
def repartition(numPartitions: Int): Dataset[T]
def repartition(partitionExprs: Column*): Dataset[T]
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
def coalesce(numPartitions: Int): Dataset[T]
def repartitionByRange(partitionExprs: Column*): Dataset[T]
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
}Usage Examples:
// Change number of partitions
val repartitioned = df.repartition(200)
val coalesced = df.coalesce(10) // Reduce partitions without shuffle
// Partition by column values
val byDept = df.repartition(col("department"))
val byMultiple = df.repartition(col("year"), col("month"))
// Range partitioning (for ordered data)
val rangePartitioned = df.repartitionByRange(col("timestamp"))
val rangeWithCount = df.repartitionByRange(100, col("id"))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-sql-2-11