or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdcolumns-functions.mddata-io.mddataset-dataframe.mdindex.mdsession-management.mdstreaming.mdtypes-encoders.mdudfs.md
tile.json

dataset-dataframe.mddocs/

Dataset and DataFrame Operations

Core data structures and operations for structured data manipulation. Dataset provides type-safe operations while DataFrame offers untyped flexibility. Both support functional and SQL-style operations with lazy evaluation and Catalyst optimization.

Capabilities

Dataset[T]

Strongly-typed collection of domain objects that can be transformed using functional and relational operations.

/**
 * Strongly-typed collection of domain objects
 * @tparam T The type of the objects in the dataset
 */
class Dataset[T] {
  /** Dataset schema */
  def schema: StructType
  
  /** Column names */
  def columns: Array[String]
  
  /** Print schema to console */
  def printSchema(): Unit
  
  /** Convert to DataFrame (untyped) */
  def toDF(): DataFrame
  def toDF(colNames: String*): DataFrame
  
  /** Convert to different type */
  def as[U: Encoder]: Dataset[U]
  
  /** Get write interface */
  def write: DataFrameWriter[T]
  
  /** Get write interface for streaming */
  def writeStream: DataStreamWriter[T]
}

// DataFrame is type alias for Dataset[Row]
type DataFrame = Dataset[Row]

Dataset Transformations

Lazy transformations that return new Datasets without triggering computation.

class Dataset[T] {
  /** Select columns by name or expression */
  def select(cols: Column*): DataFrame
  def select(col: String, cols: String*): DataFrame
  def selectExpr(exprs: String*): DataFrame
  
  /** Filter rows based on condition */
  def filter(condition: Column): Dataset[T]  
  def filter(conditionExpr: String): Dataset[T]
  def where(condition: Column): Dataset[T]
  
  /** Add or replace column */
  def withColumn(colName: String, col: Column): DataFrame
  
  /** Rename column */
  def withColumnRenamed(existingName: String, newName: String): DataFrame
  
  /** Drop columns */
  def drop(colName: String): DataFrame
  def drop(colNames: String*): DataFrame
  def drop(col: Column): DataFrame
  
  /** Remove duplicate rows */
  def distinct(): Dataset[T]
  def dropDuplicates(): Dataset[T]
  def dropDuplicates(colNames: String*): Dataset[T]
  
  /** Limit number of rows */
  def limit(n: Int): Dataset[T]
  
  /** Sample fraction of rows */
  def sample(fraction: Double): Dataset[T]
  def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
  
  /** Sort rows */
  def sort(sortCol: String, sortCols: String*): Dataset[T]
  def sort(sortExprs: Column*): Dataset[T]
  def orderBy(sortCol: String, sortCols: String*): Dataset[T]
  def orderBy(sortExprs: Column*): Dataset[T]
  
  /** Repartition */
  def repartition(numPartitions: Int): Dataset[T]
  def repartition(partitionExprs: Column*): Dataset[T]
  def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
  def coalesce(numPartitions: Int): Dataset[T]
}

Dataset Actions

Eager operations that trigger computation and return results.

class Dataset[T] {
  /** Display DataFrame contents */
  def show(): Unit
  def show(numRows: Int): Unit
  def show(numRows: Int, truncate: Boolean): Unit
  def show(numRows: Int, truncate: Int): Unit
  def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
  
  /** Collect all rows to driver */
  def collect(): Array[T]
  def collectAsList(): java.util.List[T]
  
  /** Take first N rows */
  def take(n: Int): Array[T]
  def takeAsList(n: Int): java.util.List[T]
  
  /** Get first row */
  def first(): T
  def head(): T
  def head(n: Int): Array[T]
  
  /** Count rows */
  def count(): Long
  
  /** Check if Dataset is empty */
  def isEmpty: Boolean
  
  /** Apply function to each row */
  def foreach(f: T => Unit): Unit
  def foreachPartition(f: Iterator[T] => Unit): Unit
  
  /** Reduce rows to single value */
  def reduce(func: (T, T) => T): T
  
  /** Cache Dataset in memory */
  def cache(): Dataset[T]
  def persist(): Dataset[T]
  def persist(newLevel: StorageLevel): Dataset[T]
  def unpersist(): Dataset[T]
  def unpersist(blocking: Boolean): Dataset[T]
}

Usage Examples:

import org.apache.spark.sql.functions._

// Basic transformations
val people = spark.read.json("people.json")

val adults = people
  .filter(col("age") > 18)
  .select("name", "age")
  .orderBy(col("age").desc)

adults.show()

// Column operations
val enriched = people
  .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
  .withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
  .drop("first_name", "last_name")

// Actions
val totalCount = people.count()
val firstPerson = people.first()
val allPeople = people.collect()

Joins and Set Operations

Operations for combining multiple Datasets.

class Dataset[T] {
  /** Join with another Dataset */
  def join(right: Dataset[_]): DataFrame
  def join(right: Dataset[_], usingColumn: String): DataFrame
  def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
  def join(right: Dataset[_], joinExprs: Column): DataFrame
  def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
  
  /** Cross join (Cartesian product) */
  def crossJoin(right: Dataset[_]): DataFrame
  
  /** Union operations */
  def union(other: Dataset[T]): Dataset[T]
  def unionAll(other: Dataset[T]): Dataset[T]
  def unionByName(other: Dataset[T]): Dataset[T]
  
  /** Set operations */
  def intersect(other: Dataset[T]): Dataset[T]
  def intersectAll(other: Dataset[T]): Dataset[T]
  def except(other: Dataset[T]): Dataset[T]
  def exceptAll(other: Dataset[T]): Dataset[T]
}

Usage Examples:

val employees = spark.table("employees")
val departments = spark.table("departments")

// Inner join
val employeesWithDept = employees.join(departments, 
  employees("dept_id") === departments("id"))

// Left outer join
val allEmployees = employees.join(departments, 
  employees("dept_id") === departments("id"), "left_outer")

// Join on multiple columns
val result = employees.join(departments, 
  Seq("dept_id", "location"), "inner")

// Union datasets
val currentEmployees = spark.table("current_employees")
val formerEmployees = spark.table("former_employees")
val allEmployees = currentEmployees.union(formerEmployees)

Aggregations and Grouping

Group data and perform aggregate operations.

class Dataset[T] {
  /** Group by columns */
  def groupBy(cols: Column*): RelationalGroupedDataset
  def groupBy(col1: String, cols: String*): RelationalGroupedDataset
  
  /** Aggregate without grouping */
  def agg(expr: Column, exprs: Column*): DataFrame
  def agg(exprs: Map[String, String]): DataFrame
}

/**
 * Dataset that has been logically grouped by user specified grouping key
 */
class RelationalGroupedDataset {
  /** Count rows in each group */
  def count(): DataFrame
  
  /** Sum columns for each group */
  def sum(colNames: String*): DataFrame
  
  /** Average columns for each group */
  def avg(colNames: String*): DataFrame
  def mean(colNames: String*): DataFrame
  
  /** Min/Max columns for each group */
  def min(colNames: String*): DataFrame
  def max(colNames: String*): DataFrame
  
  /** Aggregate with expressions */
  def agg(expr: Column, exprs: Column*): DataFrame
  def agg(exprs: Map[String, String]): DataFrame
  
  /** Pivot on column values */
  def pivot(pivotColumn: String): RelationalGroupedDataset
  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
}

Usage Examples:

import org.apache.spark.sql.functions._

val sales = spark.table("sales")

// Simple aggregation
val totalSales = sales.agg(sum("amount").alias("total"))

// Group by with aggregation
val salesByRegion = sales
  .groupBy("region")
  .agg(
    sum("amount").alias("total_sales"),
    avg("amount").alias("avg_sale"),
    count("*").alias("num_transactions")
  )

// Multiple group by columns
val salesByRegionAndMonth = sales
  .groupBy("region", "month")
  .sum("amount")

// Pivot table
val salesPivot = sales
  .groupBy("region")
  .pivot("month")
  .sum("amount")

Typed Dataset Operations

Type-safe functional operations on Datasets.

class Dataset[T] {
  /** Transform each element */
  def map[U: Encoder](func: T => U): Dataset[U]
  
  /** Transform each element to zero or more elements */
  def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
  
  /** Transform each partition */  
  def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
  
  /** Group by key and apply function to groups */
  def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
}

/**
 * Dataset that has been logically grouped by a user specified grouping key
 */
class KeyValueGroupedDataset[K, V] {
  /** Apply function to each group */  
  def mapGroups[U: Encoder](f: (K, Iterator[V]) => U): Dataset[U]
  
  /** Transform values in each group */
  def mapValues[U: Encoder](func: V => U): KeyValueGroupedDataset[K, U]
  
  /** Aggregate each group */
  def agg[U: Encoder](column: TypedColumn[V, U]): Dataset[(K, U)]
  
  /** Reduce values in each group */
  def reduceGroups(f: (V, V) => V): Dataset[(K, V)]
  
  /** Count elements in each group */
  def count(): Dataset[(K, Long)]
}

Usage Examples:

case class Person(name: String, age: Int, city: String)

val people = spark.read.json("people.json").as[Person]

// Type-safe transformations
val adults = people.filter(_.age >= 18)
val names = people.map(_.name)
val cityAges = people.map(p => (p.city, p.age))

// Group by key
val peopleByCity = people.groupByKey(_.city)
val avgAgeByCity = peopleByCity.agg(avg(col("age")).as[Double])

// Reduce groups
val oldestByCity = peopleByCity.reduceGroups((p1, p2) => 
  if (p1.age > p2.age) p1 else p2
)