Core data structures and operations for structured data manipulation. Dataset provides type-safe operations while DataFrame offers untyped flexibility. Both support functional and SQL-style operations with lazy evaluation and Catalyst optimization.
Strongly-typed collection of domain objects that can be transformed using functional and relational operations.
/**
* Strongly-typed collection of domain objects
* @tparam T The type of the objects in the dataset
*/
class Dataset[T] {
/** Dataset schema */
def schema: StructType
/** Column names */
def columns: Array[String]
/** Print schema to console */
def printSchema(): Unit
/** Convert to DataFrame (untyped) */
def toDF(): DataFrame
def toDF(colNames: String*): DataFrame
/** Convert to different type */
def as[U: Encoder]: Dataset[U]
/** Get write interface */
def write: DataFrameWriter[T]
/** Get write interface for streaming */
def writeStream: DataStreamWriter[T]
}
// DataFrame is type alias for Dataset[Row]
type DataFrame = Dataset[Row]Lazy transformations that return new Datasets without triggering computation.
class Dataset[T] {
/** Select columns by name or expression */
def select(cols: Column*): DataFrame
def select(col: String, cols: String*): DataFrame
def selectExpr(exprs: String*): DataFrame
/** Filter rows based on condition */
def filter(condition: Column): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def where(condition: Column): Dataset[T]
/** Add or replace column */
def withColumn(colName: String, col: Column): DataFrame
/** Rename column */
def withColumnRenamed(existingName: String, newName: String): DataFrame
/** Drop columns */
def drop(colName: String): DataFrame
def drop(colNames: String*): DataFrame
def drop(col: Column): DataFrame
/** Remove duplicate rows */
def distinct(): Dataset[T]
def dropDuplicates(): Dataset[T]
def dropDuplicates(colNames: String*): Dataset[T]
/** Limit number of rows */
def limit(n: Int): Dataset[T]
/** Sample fraction of rows */
def sample(fraction: Double): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
/** Sort rows */
def sort(sortCol: String, sortCols: String*): Dataset[T]
def sort(sortExprs: Column*): Dataset[T]
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
def orderBy(sortExprs: Column*): Dataset[T]
/** Repartition */
def repartition(numPartitions: Int): Dataset[T]
def repartition(partitionExprs: Column*): Dataset[T]
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
def coalesce(numPartitions: Int): Dataset[T]
}Eager operations that trigger computation and return results.
class Dataset[T] {
/** Display DataFrame contents */
def show(): Unit
def show(numRows: Int): Unit
def show(numRows: Int, truncate: Boolean): Unit
def show(numRows: Int, truncate: Int): Unit
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
/** Collect all rows to driver */
def collect(): Array[T]
def collectAsList(): java.util.List[T]
/** Take first N rows */
def take(n: Int): Array[T]
def takeAsList(n: Int): java.util.List[T]
/** Get first row */
def first(): T
def head(): T
def head(n: Int): Array[T]
/** Count rows */
def count(): Long
/** Check if Dataset is empty */
def isEmpty: Boolean
/** Apply function to each row */
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
/** Reduce rows to single value */
def reduce(func: (T, T) => T): T
/** Cache Dataset in memory */
def cache(): Dataset[T]
def persist(): Dataset[T]
def persist(newLevel: StorageLevel): Dataset[T]
def unpersist(): Dataset[T]
def unpersist(blocking: Boolean): Dataset[T]
}Usage Examples:
import org.apache.spark.sql.functions._
// Basic transformations
val people = spark.read.json("people.json")
val adults = people
.filter(col("age") > 18)
.select("name", "age")
.orderBy(col("age").desc)
adults.show()
// Column operations
val enriched = people
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
.drop("first_name", "last_name")
// Actions
val totalCount = people.count()
val firstPerson = people.first()
val allPeople = people.collect()Operations for combining multiple Datasets.
class Dataset[T] {
/** Join with another Dataset */
def join(right: Dataset[_]): DataFrame
def join(right: Dataset[_], usingColumn: String): DataFrame
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
def join(right: Dataset[_], joinExprs: Column): DataFrame
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
/** Cross join (Cartesian product) */
def crossJoin(right: Dataset[_]): DataFrame
/** Union operations */
def union(other: Dataset[T]): Dataset[T]
def unionAll(other: Dataset[T]): Dataset[T]
def unionByName(other: Dataset[T]): Dataset[T]
/** Set operations */
def intersect(other: Dataset[T]): Dataset[T]
def intersectAll(other: Dataset[T]): Dataset[T]
def except(other: Dataset[T]): Dataset[T]
def exceptAll(other: Dataset[T]): Dataset[T]
}Usage Examples:
val employees = spark.table("employees")
val departments = spark.table("departments")
// Inner join
val employeesWithDept = employees.join(departments,
employees("dept_id") === departments("id"))
// Left outer join
val allEmployees = employees.join(departments,
employees("dept_id") === departments("id"), "left_outer")
// Join on multiple columns
val result = employees.join(departments,
Seq("dept_id", "location"), "inner")
// Union datasets
val currentEmployees = spark.table("current_employees")
val formerEmployees = spark.table("former_employees")
val allEmployees = currentEmployees.union(formerEmployees)Group data and perform aggregate operations.
class Dataset[T] {
/** Group by columns */
def groupBy(cols: Column*): RelationalGroupedDataset
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
/** Aggregate without grouping */
def agg(expr: Column, exprs: Column*): DataFrame
def agg(exprs: Map[String, String]): DataFrame
}
/**
* Dataset that has been logically grouped by user specified grouping key
*/
class RelationalGroupedDataset {
/** Count rows in each group */
def count(): DataFrame
/** Sum columns for each group */
def sum(colNames: String*): DataFrame
/** Average columns for each group */
def avg(colNames: String*): DataFrame
def mean(colNames: String*): DataFrame
/** Min/Max columns for each group */
def min(colNames: String*): DataFrame
def max(colNames: String*): DataFrame
/** Aggregate with expressions */
def agg(expr: Column, exprs: Column*): DataFrame
def agg(exprs: Map[String, String]): DataFrame
/** Pivot on column values */
def pivot(pivotColumn: String): RelationalGroupedDataset
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
}Usage Examples:
import org.apache.spark.sql.functions._
val sales = spark.table("sales")
// Simple aggregation
val totalSales = sales.agg(sum("amount").alias("total"))
// Group by with aggregation
val salesByRegion = sales
.groupBy("region")
.agg(
sum("amount").alias("total_sales"),
avg("amount").alias("avg_sale"),
count("*").alias("num_transactions")
)
// Multiple group by columns
val salesByRegionAndMonth = sales
.groupBy("region", "month")
.sum("amount")
// Pivot table
val salesPivot = sales
.groupBy("region")
.pivot("month")
.sum("amount")Type-safe functional operations on Datasets.
class Dataset[T] {
/** Transform each element */
def map[U: Encoder](func: T => U): Dataset[U]
/** Transform each element to zero or more elements */
def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
/** Transform each partition */
def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
/** Group by key and apply function to groups */
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
}
/**
* Dataset that has been logically grouped by a user specified grouping key
*/
class KeyValueGroupedDataset[K, V] {
/** Apply function to each group */
def mapGroups[U: Encoder](f: (K, Iterator[V]) => U): Dataset[U]
/** Transform values in each group */
def mapValues[U: Encoder](func: V => U): KeyValueGroupedDataset[K, U]
/** Aggregate each group */
def agg[U: Encoder](column: TypedColumn[V, U]): Dataset[(K, U)]
/** Reduce values in each group */
def reduceGroups(f: (V, V) => V): Dataset[(K, V)]
/** Count elements in each group */
def count(): Dataset[(K, Long)]
}Usage Examples:
case class Person(name: String, age: Int, city: String)
val people = spark.read.json("people.json").as[Person]
// Type-safe transformations
val adults = people.filter(_.age >= 18)
val names = people.map(_.name)
val cityAges = people.map(p => (p.city, p.age))
// Group by key
val peopleByCity = people.groupByKey(_.city)
val avgAgeByCity = peopleByCity.agg(avg(col("age")).as[Double])
// Reduce groups
val oldestByCity = peopleByCity.reduceGroups((p1, p2) =>
if (p1.age > p2.age) p1 else p2
)