DataFrames and Datasets are the core distributed data structures in Spark SQL. DataFrame is an alias for Dataset[Row], providing an untyped interface, while Dataset[T] offers compile-time type safety. Both share the same underlying APIs and optimizations through the Catalyst query engine.
type DataFrame = Dataset[Row]
class Dataset[T] extends Serializable {
def schema: StructType
def dtypes: Array[(String, String)]
def columns: Array[String]
def count(): Long
def isEmpty: Boolean
def isLocal: Boolean
def isStreaming: Boolean
}
class Row extends Serializable {
def length: Int
def size: Int
def get(i: Int): Any
def getAs[T](i: Int): T
def getAs[T](fieldName: String): T
def getString(i: Int): String
def getInt(i: Int): Int
def getLong(i: Int): Long
def getDouble(i: Int): Double
def getFloat(i: Int): Float
def getBoolean(i: Int): Boolean
def getDate(i: Int): java.sql.Date
def getTimestamp(i: Int): java.sql.Timestamp
}class Dataset[T] {
def schema: StructType
def dtypes: Array[(String, String)]
def columns: Array[String]
def printSchema(): Unit
def printSchema(level: Int): Unit
}Usage Examples:
import org.apache.spark.sql.types._
// Examine schema
val df = spark.read.json("people.json")
df.printSchema()
df.schema.foreach(field => println(s"${field.name}: ${field.dataType}"))
// Get column information
val columnNames = df.columns
val columnTypes = df.dtypes
// Check schema programmatically
val hasNameColumn = df.schema.exists(_.name == "name")
val nameField = df.schema.find(_.name == "name")class Dataset[T] {
def select(cols: Column*): DataFrame
def select(col: String, cols: String*): DataFrame
def selectExpr(exprs: String*): DataFrame
def drop(colNames: String*): Dataset[T]
def drop(col: Column): DataFrame
def withColumn(colName: String, col: Column): DataFrame
def withColumnRenamed(existingName: String, newName: String): DataFrame
}Usage Examples:
import org.apache.spark.sql.functions._
// Select columns
val selected = df.select("name", "age")
val selectedWithCol = df.select(col("name"), col("age") + 1)
// Select with expressions
val computed = df.selectExpr("name", "age + 1 as next_age", "upper(name) as upper_name")
// Add columns
val withFullName = df.withColumn("full_name", concat(col("first"), lit(" "), col("last")))
// Rename columns
val renamed = df.withColumnRenamed("old_name", "new_name")
// Drop columns
val dropped = df.drop("unwanted_column", "another_column")
val droppedByCol = df.drop(col("age"))class Dataset[T] {
def filter(condition: Column): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def where(condition: Column): Dataset[T]
def where(conditionExpr: String): Dataset[T]
}Usage Examples:
// Filter with Column expressions
val adults = df.filter(col("age") >= 18)
val activeUsers = df.filter(col("active") === true && col("last_login").isNotNull)
// Filter with SQL expressions
val filtered = df.filter("age >= 18 AND active = true")
val complex = df.where("age BETWEEN 25 AND 65 AND city IN ('New York', 'San Francisco')")
// Multiple conditions
val result = df
.filter(col("age") > 21)
.filter(col("country") === "US")
.filter(col("score").isNotNull)class Dataset[T] {
def sort(sortCol: String, sortCols: String*): Dataset[T]
def sort(sortExprs: Column*): Dataset[T]
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
def orderBy(sortExprs: Column*): Dataset[T]
}Usage Examples:
// Sort by column names
val sorted = df.sort("age", "name")
// Sort with Column expressions and directions
val ordered = df.orderBy(col("age").desc, col("name").asc)
// Complex sorting
val complexSort = df.orderBy(
col("department"),
col("salary").desc,
col("hire_date").asc
)class Dataset[T] {
def join(right: Dataset[_]): DataFrame
def join(right: Dataset[_], usingColumn: String): DataFrame
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
def join(right: Dataset[_], joinExprs: Column): DataFrame
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
def crossJoin(right: Dataset[_]): DataFrame
}Join Types: "inner", "cross", "outer", "full", "full_outer", "left", "left_outer", "right", "right_outer", "left_semi", "left_anti"
Usage Examples:
val users = spark.table("users")
val orders = spark.table("orders")
// Inner join (default)
val userOrders = users.join(orders, users("id") === orders("user_id"))
// Left outer join
val allUsers = users.join(orders, users("id") === orders("user_id"), "left_outer")
// Join on multiple columns
val joined = users.join(orders,
users("id") === orders("user_id") && users("region") === orders("region"))
// Join using column names (when columns have same names)
val simple = users.join(orders, "user_id")
val multiple = users.join(orders, Seq("user_id", "region"))
// Cross join
val cartesian = users.crossJoin(orders)class Dataset[T] {
def union(other: Dataset[T]): Dataset[T]
def unionAll(other: Dataset[T]): Dataset[T]
def unionByName(other: Dataset[T]): Dataset[T]
def intersect(other: Dataset[T]): Dataset[T]
def intersectAll(other: Dataset[T]): Dataset[T]
def except(other: Dataset[T]): Dataset[T]
def exceptAll(other: Dataset[T]): Dataset[T]
}Usage Examples:
val df1 = spark.range(1, 5).toDF("id")
val df2 = spark.range(3, 8).toDF("id")
// Union (removes duplicates in Spark 2.0+)
val combined = df1.union(df2)
// Union by column names (handles different column orders)
val byName = df1.unionByName(df2)
// Set operations
val intersection = df1.intersect(df2) // Values in both
val difference = df1.except(df2) // Values in df1 but not df2class Dataset[T] {
def distinct(): Dataset[T]
def dropDuplicates(): Dataset[T]
def dropDuplicates(colNames: Array[String]): Dataset[T]
def dropDuplicates(colNames: Seq[String]): Dataset[T]
def dropDuplicates(col1: String, cols: String*): Dataset[T]
}Usage Examples:
// Remove all duplicate rows
val unique = df.distinct()
// Remove duplicates based on specific columns
val uniqueUsers = df.dropDuplicates("user_id")
val uniqueByMultiple = df.dropDuplicates("user_id", "email")
val uniqueBySeq = df.dropDuplicates(Seq("user_id", "email"))class Dataset[T] {
def limit(n: Int): Dataset[T]
def sample(fraction: Double): Dataset[T]
def sample(fraction: Double, seed: Long): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
def sampleBy[K](col: String, fractions: Map[K, Double], seed: Long): Dataset[T]
}Usage Examples:
// Limit results
val top100 = df.limit(100)
// Random sampling
val sample10Percent = df.sample(0.1)
val sampleWithSeed = df.sample(false, 0.2, seed = 42)
// Stratified sampling by column values
val stratified = df.sampleBy("category", Map("A" -> 0.1, "B" -> 0.2), seed = 123)class Dataset[T] {
def count(): Long
def collect(): Array[T]
def collectAsList(): java.util.List[T]
def first(): T
def head(): T
def head(n: Int): Array[T]
def take(n: Int): Array[T]
def takeAsList(n: Int): java.util.List[T]
def tail(n: Int): Array[T]
def show(): Unit
def show(numRows: Int): Unit
def show(numRows: Int, truncate: Boolean): Unit
def show(numRows: Int, truncate: Int): Unit
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
}Usage Examples:
// Count rows
val totalRows = df.count()
// Collect data (use with caution on large datasets)
val allData = df.collect()
val firstRow = df.first()
val top10 = df.take(10)
// Display data
df.show() // Show 20 rows, truncate at 20 chars
df.show(50) // Show 50 rows
df.show(20, false) // Don't truncate strings
df.show(10, 100, true) // Vertical formatclass Dataset[T] {
def cache(): Dataset[T]
def persist(): Dataset[T]
def persist(newLevel: StorageLevel): Dataset[T]
def unpersist(): Dataset[T]
def unpersist(blocking: Boolean): Dataset[T]
def storageLevel: StorageLevel
def isStreaming: Boolean
}Usage Examples:
import org.apache.spark.storage.StorageLevel
// Cache in memory (default: MEMORY_AND_DISK)
val cached = df.cache()
// Persist with specific storage level
val persisted = df.persist(StorageLevel.MEMORY_ONLY)
val diskOnly = df.persist(StorageLevel.DISK_ONLY)
val memoryAndDiskSer = df.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Check storage level
val level = df.storageLevel
// Remove from cache
df.unpersist()
df.unpersist(blocking = true) // Wait for removal to completeclass Dataset[T] {
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
def map[U : Encoder](func: T => U): Dataset[U]
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U]
def filter(func: T => Boolean): Dataset[T]
def reduce(func: (T, T) => T): T
}Usage Examples:
// Type-safe operations (require Encoder for result type)
case class Person(name: String, age: Int)
val people: Dataset[Person] = spark.createDataFrame(peopleSeq).as[Person]
// Map transformation
val names = people.map(_.name)
val ages = people.map(_.age)
// Filter with function
val adults = people.filter(_.age >= 18)
// FlatMap
val words = people.flatMap(_.name.split(" "))
// Reduce
val totalAge = people.map(_.age).reduce(_ + _)
// Side effects
people.foreach(person => println(s"Person: ${person.name}"))
people.foreachPartition { iter =>
// Process partition
iter.foreach(println)
}class Dataset[T] {
def as[U : Encoder]: Dataset[U]
def toDF(): DataFrame
def toDF(colNames: String*): DataFrame
def rdd: RDD[T]
def javaRDD: JavaRDD[T]
def toJavaRDD: JavaRDD[T]
}Usage Examples:
// Convert to typed Dataset
case class User(id: Long, name: String, age: Int)
val typedUsers = df.as[User]
// Convert to DataFrame
val dataFrame = dataset.toDF()
val renamedDF = dataset.toDF("col1", "col2", "col3")
// Convert to RDD
val rdd = df.rdd
val userRDD = typedUsers.rdd
// Java interop
val javaRDD = df.toJavaRDDclass Dataset[T] {
def repartition(numPartitions: Int): Dataset[T]
def repartition(partitionExprs: Column*): Dataset[T]
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
def coalesce(numPartitions: Int): Dataset[T]
def repartitionByRange(partitionExprs: Column*): Dataset[T]
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
}Usage Examples:
// Change number of partitions
val repartitioned = df.repartition(200)
val coalesced = df.coalesce(10) // Reduce partitions without shuffle
// Partition by column values
val byDept = df.repartition(col("department"))
val byMultiple = df.repartition(col("year"), col("month"))
// Range partitioning (for ordered data)
val rangePartitioned = df.repartitionByRange(col("timestamp"))
val rangeWithCount = df.repartitionByRange(100, col("id"))