or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md
tile.json

dataframe-dataset.mddocs/

DataFrame and Dataset Operations

DataFrames and Datasets are the core distributed data structures in Spark SQL. DataFrame is an alias for Dataset[Row], providing an untyped interface, while Dataset[T] offers compile-time type safety. Both share the same underlying APIs and optimizations through the Catalyst query engine.

Core Types

type DataFrame = Dataset[Row]

class Dataset[T] extends Serializable {
  def schema: StructType
  def dtypes: Array[(String, String)]
  def columns: Array[String]
  def count(): Long
  def isEmpty: Boolean
  def isLocal: Boolean
  def isStreaming: Boolean
}

class Row extends Serializable {
  def length: Int
  def size: Int
  def get(i: Int): Any
  def getAs[T](i: Int): T
  def getAs[T](fieldName: String): T
  def getString(i: Int): String
  def getInt(i: Int): Int
  def getLong(i: Int): Long
  def getDouble(i: Int): Double
  def getFloat(i: Int): Float
  def getBoolean(i: Int): Boolean
  def getDate(i: Int): java.sql.Date
  def getTimestamp(i: Int): java.sql.Timestamp
}

Schema Operations

class Dataset[T] {
  def schema: StructType
  def dtypes: Array[(String, String)]
  def columns: Array[String]
  def printSchema(): Unit
  def printSchema(level: Int): Unit
}

Usage Examples:

import org.apache.spark.sql.types._

// Examine schema
val df = spark.read.json("people.json")
df.printSchema()
df.schema.foreach(field => println(s"${field.name}: ${field.dataType}"))

// Get column information
val columnNames = df.columns
val columnTypes = df.dtypes

// Check schema programmatically
val hasNameColumn = df.schema.exists(_.name == "name")
val nameField = df.schema.find(_.name == "name")

Column Selection and Projection

class Dataset[T] {
  def select(cols: Column*): DataFrame
  def select(col: String, cols: String*): DataFrame  
  def selectExpr(exprs: String*): DataFrame
  def drop(colNames: String*): Dataset[T]
  def drop(col: Column): DataFrame
  def withColumn(colName: String, col: Column): DataFrame
  def withColumnRenamed(existingName: String, newName: String): DataFrame
}

Usage Examples:

import org.apache.spark.sql.functions._

// Select columns
val selected = df.select("name", "age")
val selectedWithCol = df.select(col("name"), col("age") + 1)

// Select with expressions
val computed = df.selectExpr("name", "age + 1 as next_age", "upper(name) as upper_name")

// Add columns
val withFullName = df.withColumn("full_name", concat(col("first"), lit(" "), col("last")))

// Rename columns
val renamed = df.withColumnRenamed("old_name", "new_name")

// Drop columns
val dropped = df.drop("unwanted_column", "another_column")
val droppedByCol = df.drop(col("age"))

Filtering and Conditions

class Dataset[T] {
  def filter(condition: Column): Dataset[T]
  def filter(conditionExpr: String): Dataset[T]
  def where(condition: Column): Dataset[T]
  def where(conditionExpr: String): Dataset[T]
}

Usage Examples:

// Filter with Column expressions
val adults = df.filter(col("age") >= 18)
val activeUsers = df.filter(col("active") === true && col("last_login").isNotNull)

// Filter with SQL expressions
val filtered = df.filter("age >= 18 AND active = true")
val complex = df.where("age BETWEEN 25 AND 65 AND city IN ('New York', 'San Francisco')")

// Multiple conditions
val result = df
  .filter(col("age") > 21)
  .filter(col("country") === "US")
  .filter(col("score").isNotNull)

Sorting and Ordering

class Dataset[T] {
  def sort(sortCol: String, sortCols: String*): Dataset[T]
  def sort(sortExprs: Column*): Dataset[T]
  def orderBy(sortCol: String, sortCols: String*): Dataset[T]  
  def orderBy(sortExprs: Column*): Dataset[T]
}

Usage Examples:

// Sort by column names
val sorted = df.sort("age", "name")

// Sort with Column expressions and directions
val ordered = df.orderBy(col("age").desc, col("name").asc)

// Complex sorting
val complexSort = df.orderBy(
  col("department"),
  col("salary").desc,
  col("hire_date").asc
)

Joins

class Dataset[T] {
  def join(right: Dataset[_]): DataFrame
  def join(right: Dataset[_], usingColumn: String): DataFrame
  def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
  def join(right: Dataset[_], joinExprs: Column): DataFrame
  def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
  
  def crossJoin(right: Dataset[_]): DataFrame
}

Join Types: "inner", "cross", "outer", "full", "full_outer", "left", "left_outer", "right", "right_outer", "left_semi", "left_anti"

Usage Examples:

val users = spark.table("users")  
val orders = spark.table("orders")

// Inner join (default)
val userOrders = users.join(orders, users("id") === orders("user_id"))

// Left outer join
val allUsers = users.join(orders, users("id") === orders("user_id"), "left_outer")

// Join on multiple columns
val joined = users.join(orders, 
  users("id") === orders("user_id") && users("region") === orders("region"))

// Join using column names (when columns have same names)
val simple = users.join(orders, "user_id")
val multiple = users.join(orders, Seq("user_id", "region"))

// Cross join
val cartesian = users.crossJoin(orders)

Set Operations

class Dataset[T] {
  def union(other: Dataset[T]): Dataset[T]
  def unionAll(other: Dataset[T]): Dataset[T]
  def unionByName(other: Dataset[T]): Dataset[T]
  def intersect(other: Dataset[T]): Dataset[T]
  def intersectAll(other: Dataset[T]): Dataset[T]
  def except(other: Dataset[T]): Dataset[T]
  def exceptAll(other: Dataset[T]): Dataset[T]
}

Usage Examples:

val df1 = spark.range(1, 5).toDF("id")
val df2 = spark.range(3, 8).toDF("id")

// Union (removes duplicates in Spark 2.0+)
val combined = df1.union(df2)

// Union by column names (handles different column orders)
val byName = df1.unionByName(df2)

// Set operations
val intersection = df1.intersect(df2)  // Values in both
val difference = df1.except(df2)       // Values in df1 but not df2

Deduplication

class Dataset[T] {
  def distinct(): Dataset[T]
  def dropDuplicates(): Dataset[T]
  def dropDuplicates(colNames: Array[String]): Dataset[T]
  def dropDuplicates(colNames: Seq[String]): Dataset[T]
  def dropDuplicates(col1: String, cols: String*): Dataset[T]
}

Usage Examples:

// Remove all duplicate rows
val unique = df.distinct()

// Remove duplicates based on specific columns
val uniqueUsers = df.dropDuplicates("user_id")
val uniqueByMultiple = df.dropDuplicates("user_id", "email")
val uniqueBySeq = df.dropDuplicates(Seq("user_id", "email"))

Sampling and Limiting

class Dataset[T] {
  def limit(n: Int): Dataset[T]
  def sample(fraction: Double): Dataset[T]
  def sample(fraction: Double, seed: Long): Dataset[T]
  def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
  def sampleBy[K](col: String, fractions: Map[K, Double], seed: Long): Dataset[T]
}

Usage Examples:

// Limit results
val top100 = df.limit(100)

// Random sampling
val sample10Percent = df.sample(0.1)
val sampleWithSeed = df.sample(false, 0.2, seed = 42)

// Stratified sampling by column values
val stratified = df.sampleBy("category", Map("A" -> 0.1, "B" -> 0.2), seed = 123)

Action Operations

class Dataset[T] {
  def count(): Long
  def collect(): Array[T]
  def collectAsList(): java.util.List[T]
  def first(): T
  def head(): T
  def head(n: Int): Array[T]
  def take(n: Int): Array[T]
  def takeAsList(n: Int): java.util.List[T]
  def tail(n: Int): Array[T]
  def show(): Unit
  def show(numRows: Int): Unit
  def show(numRows: Int, truncate: Boolean): Unit
  def show(numRows: Int, truncate: Int): Unit
  def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
}

Usage Examples:

// Count rows
val totalRows = df.count()

// Collect data (use with caution on large datasets)
val allData = df.collect()
val firstRow = df.first()
val top10 = df.take(10)

// Display data
df.show()                    // Show 20 rows, truncate at 20 chars
df.show(50)                  // Show 50 rows
df.show(20, false)           // Don't truncate strings
df.show(10, 100, true)       // Vertical format

Persistence and Caching

class Dataset[T] {
  def cache(): Dataset[T]
  def persist(): Dataset[T]
  def persist(newLevel: StorageLevel): Dataset[T]
  def unpersist(): Dataset[T]
  def unpersist(blocking: Boolean): Dataset[T]
  def storageLevel: StorageLevel
  def isStreaming: Boolean
}

Usage Examples:

import org.apache.spark.storage.StorageLevel

// Cache in memory (default: MEMORY_AND_DISK)
val cached = df.cache()

// Persist with specific storage level
val persisted = df.persist(StorageLevel.MEMORY_ONLY)
val diskOnly = df.persist(StorageLevel.DISK_ONLY)
val memoryAndDiskSer = df.persist(StorageLevel.MEMORY_AND_DISK_SER)

// Check storage level
val level = df.storageLevel

// Remove from cache
df.unpersist()
df.unpersist(blocking = true)  // Wait for removal to complete

Iteration and Functional Operations

class Dataset[T] {
  def foreach(f: T => Unit): Unit
  def foreachPartition(f: Iterator[T] => Unit): Unit
  def map[U : Encoder](func: T => U): Dataset[U]
  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U]
  def filter(func: T => Boolean): Dataset[T]
  def reduce(func: (T, T) => T): T
}

Usage Examples:

// Type-safe operations (require Encoder for result type)
case class Person(name: String, age: Int)
val people: Dataset[Person] = spark.createDataFrame(peopleSeq).as[Person]

// Map transformation
val names = people.map(_.name)
val ages = people.map(_.age)

// Filter with function
val adults = people.filter(_.age >= 18)

// FlatMap
val words = people.flatMap(_.name.split(" "))

// Reduce
val totalAge = people.map(_.age).reduce(_ + _)

// Side effects
people.foreach(person => println(s"Person: ${person.name}"))
people.foreachPartition { iter =>
  // Process partition
  iter.foreach(println)
}

Type Conversion

class Dataset[T] {
  def as[U : Encoder]: Dataset[U]
  def toDF(): DataFrame  
  def toDF(colNames: String*): DataFrame
  def rdd: RDD[T]
  def javaRDD: JavaRDD[T]
  def toJavaRDD: JavaRDD[T]
}

Usage Examples:

// Convert to typed Dataset
case class User(id: Long, name: String, age: Int)
val typedUsers = df.as[User]

// Convert to DataFrame
val dataFrame = dataset.toDF()
val renamedDF = dataset.toDF("col1", "col2", "col3")

// Convert to RDD
val rdd = df.rdd
val userRDD = typedUsers.rdd

// Java interop
val javaRDD = df.toJavaRDD

Partitioning Operations

class Dataset[T] {
  def repartition(numPartitions: Int): Dataset[T]
  def repartition(partitionExprs: Column*): Dataset[T]
  def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
  def coalesce(numPartitions: Int): Dataset[T]
  def repartitionByRange(partitionExprs: Column*): Dataset[T]
  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
}

Usage Examples:

// Change number of partitions
val repartitioned = df.repartition(200)
val coalesced = df.coalesce(10)  // Reduce partitions without shuffle

// Partition by column values
val byDept = df.repartition(col("department"))
val byMultiple = df.repartition(col("year"), col("month"))

// Range partitioning (for ordered data)
val rangePartitioned = df.repartitionByRange(col("timestamp"))
val rangeWithCount = df.repartitionByRange(100, col("id"))