Apache Spark - Unified analytics engine for large-scale data processing
—
High-level APIs for working with structured data using DataFrames and Datasets. Built on Spark SQL with Catalyst optimizer for query optimization and code generation. Provides seamless integration with various data sources and formats.
Entry point for DataFrame and Dataset APIs. The modern way to work with Spark SQL functionality.
/**
* Entry point for DataFrame and Dataset APIs
*/
class SparkSession {
/** Get DataFrameReader for loading data */
def read: DataFrameReader
/** Get DataStreamReader for streaming data */
def readStream: DataStreamReader
/** Execute SQL query */
def sql(sqlText: String): DataFrame
/** Access table as DataFrame */
def table(tableName: String): DataFrame
/** Create Dataset of numbers */
def range(end: Long): Dataset[Long]
def range(start: Long, end: Long): Dataset[Long]
/** Create DataFrame from sequence */
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
/** Create Dataset from sequence */
def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
/** Create empty DataFrame */
def emptyDataFrame: DataFrame
/** Create empty Dataset */
def emptyDataset[T: Encoder]: Dataset[T]
/** Access catalog functions */
def catalog: Catalog
/** Access runtime configuration */
def conf: RuntimeConfig
/** Register user-defined functions */
def udf: UDFRegistration
/** Manage streaming queries */
def streams: StreamingQueryManager
/** Stop SparkSession */
def stop(): Unit
}
/**
* Builder for SparkSession
*/
object SparkSession {
def builder(): Builder
class Builder {
def master(master: String): Builder
def appName(name: String): Builder
def config(key: String, value: String): Builder
def config(conf: SparkConf): Builder
def enableHiveSupport(): Builder
def getOrCreate(): SparkSession
}
}Usage Examples:
import org.apache.spark.sql.SparkSession
// Create SparkSession
val spark = SparkSession.builder()
.appName("MyApp")
.master("local[*]")
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
// Load data
val df = spark.read
.option("header", "true")
.csv("people.csv")
// SQL queries
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
// Create from data
val data = Seq(("Alice", 25), ("Bob", 30))
val df2 = spark.createDataFrame(data).toDF("name", "age")
spark.stop()Python SparkSession API:
class SparkSession:
"""
Entry point for DataFrame and SQL APIs in Python
"""
@property
def read(self) -> DataFrameReader
@property
def readStream(self) -> DataStreamReader
def sql(self, sqlQuery: str) -> DataFrame
def table(self, tableName: str) -> DataFrame
def range(self, start: int, end: int = None, step: int = 1, numPartitions: int = None) -> DataFrame
def createDataFrame(self, data: List, schema: Optional[Union[List[str], StructType]] = None) -> DataFrame
@property
def catalog(self) -> Catalog
@property
def conf(self) -> RuntimeConfig
@property
def udf(self) -> UDFRegistration
@property
def streams(self) -> StreamingQueryManager
def stop(self) -> None
class SparkSession:
@staticmethod
def builder() -> Builder
class Builder:
def master(self, master: str) -> Builder
def appName(self, name: str) -> Builder
def config(self, key: str, value: str) -> Builder
def enableHiveSupport(self) -> Builder
def getOrCreate(self) -> SparkSessionPython Usage Examples:
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.master("local[*]") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Load data
df = spark.read \
.option("header", "true") \
.csv("people.csv")
# SQL queries
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
# Create from data
data = [("Alice", 25), ("Bob", 30)]
df2 = spark.createDataFrame(data, ["name", "age"])
spark.stop()Dataset is a distributed collection of data with compile-time type safety. DataFrame is a type alias for Dataset[Row].
/**
* Distributed collection of data with schema
*/
class Dataset[T] {
/** Display data in tabular format */
def show(numRows: Int = 20, truncate: Boolean = true): Unit
/** Print schema to console */
def printSchema(): Unit
/** Show query execution plan */
def explain(extended: Boolean = false): Unit
/** Select columns */
def select(cols: Column*): DataFrame
def select(col: String, cols: String*): DataFrame
/** Filter rows */
def filter(condition: Column): Dataset[T]
def where(condition: Column): Dataset[T]
/** Group by columns */
def groupBy(cols: Column*): RelationalGroupedDataset
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
/** Aggregate expressions */
def agg(expr: Column, exprs: Column*): DataFrame
/** Join with another Dataset */
def join(right: Dataset[_]): DataFrame
def join(right: Dataset[_], joinExprs: Column): DataFrame
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
/** Union with another Dataset */
def union(other: Dataset[T]): Dataset[T]
def unionAll(other: Dataset[T]): Dataset[T]
/** Intersection */
def intersect(other: Dataset[T]): Dataset[T]
/** Difference */
def except(other: Dataset[T]): Dataset[T]
/** Sort by columns */
def orderBy(sortExprs: Column*): Dataset[T]
def sort(sortExprs: Column*): Dataset[T]
/** Limit number of rows */
def limit(n: Int): Dataset[T]
/** Remove duplicates */
def distinct(): Dataset[T]
def dropDuplicates(): Dataset[T]
def dropDuplicates(colNames: Array[String]): Dataset[T]
/** Add or replace column */
def withColumn(colName: String, col: Column): DataFrame
/** Rename column */
def withColumnRenamed(existingName: String, newName: String): DataFrame
/** Drop column */
def drop(colName: String): DataFrame
def drop(col: Column): DataFrame
/** Collect all rows to driver */
def collect(): Array[T]
/** Collect as Java List */
def collectAsList(): java.util.List[T]
/** Count rows */
def count(): Long
/** Get first row */
def first(): T
def head(): T
/** Get first n rows */
def head(n: Int): Array[T]
def take(n: Int): Array[T]
/** Get writer for saving data */
def write: DataFrameWriter[T]
/** Get writer for streaming */
def writeStream: DataStreamWriter[T]
/** Cache Dataset */
def cache(): Dataset[T]
/** Persist with storage level */
def persist(newLevel: StorageLevel): Dataset[T]
/** Remove from cache */
def unpersist(blocking: Boolean = false): Dataset[T]
/** Convert to different type */
def as[U](implicit encoder: Encoder[U]): Dataset[U]
/** Map function with encoder */
def map[U](func: T => U)(implicit encoder: Encoder[U]): Dataset[U]
/** FlatMap with encoder */
def flatMap[U](func: T => TraversableOnce[U])(implicit encoder: Encoder[U]): Dataset[U]
/** Apply function to each element */
def foreach(f: T => Unit): Unit
/** Apply function to each partition */
def foreachPartition(f: Iterator[T] => Unit): Unit
}
/** DataFrame is Dataset[Row] */
type DataFrame = Dataset[Row]Usage Examples:
import org.apache.spark.sql.functions._
// Load and explore data
val df = spark.read.json("people.json")
df.show()
df.printSchema()
df.explain()
// Transformations
val adults = df
.select("name", "age")
.filter($"age" > 18)
.orderBy($"age".desc)
// Aggregations
val avgAge = df
.groupBy("department")
.agg(avg("age").as("avg_age"), count("*").as("count"))
// Joins
val departments = spark.read.json("departments.json")
val joined = df.join(departments, "dept_id")
// Window functions
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("department").orderBy("salary")
val ranked = df.withColumn("rank", row_number().over(windowSpec))
// Type-safe operations with case classes
case class Person(name: String, age: Int)
val people = df.as[Person]
val names = people.map(_.name.toUpperCase)Column expressions for building queries and transformations.
/**
* Column expression for DataFrame operations
*/
class Column {
/** Create alias */
def alias(alias: String): Column
def as(alias: String): Column
/** Cast to different type */
def cast(to: DataType): Column
def cast(to: String): Column
/** Null checks */
def isNull: Column
def isNotNull: Column
def isNaN: Column
/** Logical operators */
def &&(other: Any): Column
def ||(other: Any): Column
def unary_! : Column
/** Comparison operators */
def ===(other: Any): Column
def =!=(other: Any): Column
def >(other: Any): Column
def <(other: Any): Column
def >=(other: Any): Column
def <=(other: Any): Column
/** Arithmetic operators */
def +(other: Any): Column
def -(other: Any): Column
def *(other: Any): Column
def /(other: Any): Column
def %(other: Any): Column
/** String operations */
def contains(other: Any): Column
def startsWith(other: Column): Column
def endsWith(other: Column): Column
def like(literal: String): Column
def rlike(literal: String): Column
/** Sorting */
def asc: Column
def desc: Column
def asc_nulls_first: Column
def desc_nulls_last: Column
/** Array operations */
def getItem(key: Any): Column
def getField(fieldName: String): Column
}Comprehensive set of built-in functions for data manipulation.
/**
* Built-in functions for DataFrame operations
*/
object functions {
/** Column references */
def col(colName: String): Column
def column(colName: String): Column
def lit(literal: Any): Column
/** Conditional expressions */
def when(condition: Column, value: Any): Column
def coalesce(cols: Column*): Column
def isnull(col: Column): Column
def isnan(col: Column): Column
/** String functions */
def upper(e: Column): Column
def lower(e: Column): Column
def trim(e: Column): Column
def ltrim(e: Column): Column
def rtrim(e: Column): Column
def length(e: Column): Column
def substring(str: Column, pos: Int, len: Int): Column
def concat(exprs: Column*): Column
def concat_ws(sep: String, exprs: Column*): Column
def split(str: Column, pattern: String): Column
def regexp_replace(e: Column, pattern: String, replacement: String): Column
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
/** Math functions */
def abs(e: Column): Column
def sqrt(e: Column): Column
def pow(l: Column, r: Column): Column
def round(e: Column, scale: Int): Column
def ceil(e: Column): Column
def floor(e: Column): Column
def sin(e: Column): Column
def cos(e: Column): Column
def tan(e: Column): Column
def log(e: Column): Column
def exp(e: Column): Column
def greatest(exprs: Column*): Column
def least(exprs: Column*): Column
/** Aggregate functions */
def sum(e: Column): Column
def avg(e: Column): Column
def mean(e: Column): Column
def count(e: Column): Column
def countDistinct(expr: Column, exprs: Column*): Column
def min(e: Column): Column
def max(e: Column): Column
def first(e: Column): Column
def last(e: Column): Column
def stddev(e: Column): Column
def variance(e: Column): Column
def collect_list(e: Column): Column
def collect_set(e: Column): Column
/** Date/Time functions */
def current_date(): Column
def current_timestamp(): Column
def date_add(start: Column, days: Int): Column
def date_sub(start: Column, days: Int): Column
def date_format(dateExpr: Column, format: String): Column
def year(e: Column): Column
def month(e: Column): Column
def dayofmonth(e: Column): Column
def hour(e: Column): Column
def minute(e: Column): Column
def second(e: Column): Column
def unix_timestamp(): Column
def from_unixtime(ut: Column): Column
/** Array functions */
def array(cols: Column*): Column
def array_contains(column: Column, value: Any): Column
def explode(e: Column): Column
def posexplode(e: Column): Column
def size(e: Column): Column
def sort_array(e: Column): Column
def reverse(e: Column): Column
def array_distinct(e: Column): Column
/** Map functions */
def map(cols: Column*): Column
def map_keys(e: Column): Column
def map_values(e: Column): Column
/** Window functions */
def row_number(): Column
def rank(): Column
def dense_rank(): Column
def percent_rank(): Column
def ntile(n: Int): Column
def lag(e: Column, offset: Int): Column
def lead(e: Column, offset: Int): Column
def first_value(e: Column): Column
def last_value(e: Column): Column
}Reading and writing data from various sources.
/**
* Interface for loading DataFrames from external storage
*/
class DataFrameReader {
/** Specify data source format */
def format(source: String): DataFrameReader
/** Add input option */
def option(key: String, value: String): DataFrameReader
def option(key: String, value: Boolean): DataFrameReader
def option(key: String, value: Long): DataFrameReader
def option(key: String, value: Double): DataFrameReader
/** Add multiple options */
def options(options: Map[String, String]): DataFrameReader
/** Set expected schema */
def schema(schema: StructType): DataFrameReader
def schema(schemaString: String): DataFrameReader
/** Load data */
def load(): DataFrame
def load(path: String): DataFrame
def load(paths: String*): DataFrame
/** Format-specific methods */
def json(path: String): DataFrame
def json(jsonRDD: RDD[String]): DataFrame
def json(jsonDataset: Dataset[String]): DataFrame
def parquet(paths: String*): DataFrame
def text(paths: String*): DataFrame
def textFile(paths: String*): Dataset[String]
def csv(paths: String*): DataFrame
def orc(paths: String*): DataFrame
def jdbc(url: String, table: String, properties: Properties): DataFrame
def table(tableName: String): DataFrame
}
/**
* Interface for saving DataFrames to external storage
*/
class DataFrameWriter[T] {
/** Set save mode */
def mode(saveMode: SaveMode): DataFrameWriter[T]
def mode(saveMode: String): DataFrameWriter[T]
/** Specify output format */
def format(source: String): DataFrameWriter[T]
/** Add output option */
def option(key: String, value: String): DataFrameWriter[T]
def option(key: String, value: Boolean): DataFrameWriter[T]
def option(key: String, value: Long): DataFrameWriter[T]
def option(key: String, value: Double): DataFrameWriter[T]
/** Add multiple options */
def options(options: Map[String, String]): DataFrameWriter[T]
/** Partition output by columns */
def partitionBy(colNames: String*): DataFrameWriter[T]
/** Bucket output by columns */
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
/** Sort within buckets */
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
/** Save data */
def save(): Unit
def save(path: String): Unit
/** Insert into existing table */
def insertInto(tableName: String): Unit
/** Save as table */
def saveAsTable(tableName: String): Unit
/** Format-specific methods */
def json(path: String): Unit
def parquet(path: String): Unit
def text(path: String): Unit
def csv(path: String): Unit
def orc(path: String): Unit
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
}
/**
* Save modes for DataFrameWriter
*/
object SaveMode extends Enumeration {
val Append, Overwrite, ErrorIfExists, Ignore = Value
}Usage Examples:
// Reading data
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("data.csv")
// Reading with schema
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
val df2 = spark.read.schema(schema).csv("data.csv")
// Writing data
df.write
.mode(SaveMode.Overwrite)
.option("header", "true")
.csv("output")
// Partitioned output
df.write
.partitionBy("year", "month")
.parquet("partitioned_output")
// Database operations
df.write
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/test")
.option("dbtable", "people")
.option("user", "username")
.option("password", "password")
.save()Schema definition and data type system.
/**
* Base class for data types
*/
abstract class DataType
/** Primitive types */
object StringType extends DataType
object IntegerType extends DataType
object LongType extends DataType
object DoubleType extends DataType
object FloatType extends DataType
object BooleanType extends DataType
object DateType extends DataType
object TimestampType extends DataType
object BinaryType extends DataType
/** Complex types */
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType
case class StructType(fields: Array[StructField]) extends DataType
/**
* Field in a struct
*/
case class StructField(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata)
/**
* Row in a DataFrame
*/
trait Row {
def length: Int
def size: Int
def get(i: Int): Any
def getString(i: Int): String
def getBoolean(i: Int): Boolean
def getInt(i: Int): Int
def getLong(i: Int): Long
def getFloat(i: Int): Float
def getDouble(i: Int): Double
def getAs[T](i: Int): T
def getAs[T](fieldName: String): T
def isNullAt(i: Int): Boolean
def toSeq: Seq[Any]
}Common SQL exceptions:
AnalysisException - SQL analysis errors (invalid columns, type mismatches)ParseException - SQL parsing errorsStreamingQueryException - Streaming query failuresSparkSQLException - General SQL execution errorsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12