Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.
—
Spark SQL provides structured data processing capabilities through DataFrames and Datasets, along with a SQL query engine. It offers strong integration with various data sources and formats, and includes both batch and streaming processing capabilities.
SQL and DataFrame functionality is available through:
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Row, Column}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Create Spark session
val spark = SparkSession.builder()
.appName("SQL Example")
.master("local[*]")
.getOrCreate()
// Read data
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("path/to/data.csv")
// DataFrame operations
val result = df
.select("name", "age", "salary")
.filter(col("age") > 25)
.groupBy("department")
.agg(avg("salary").alias("avg_salary"))
.orderBy(desc("avg_salary"))
result.show()
// SQL queries
df.createOrReplaceTempView("employees")
val sqlResult = spark.sql("""
SELECT department, AVG(salary) as avg_salary
FROM employees
WHERE age > 25
GROUP BY department
ORDER BY avg_salary DESC
""")
spark.stop()The unified entry point for Spark SQL functionality, replacing SQLContext and HiveContext.
class SparkSession private(sparkContext: SparkContext, existingSharedState: Option[SharedState]) {
// Data reading
def read: DataFrameReader
def readStream: DataStreamReader
// SQL execution
def sql(sqlText: String): DataFrame
def table(tableName: String): DataFrame
// DataFrame creation
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
def emptyDataFrame: DataFrame
def range(end: Long): Dataset[java.lang.Long]
def range(start: Long, end: Long): Dataset[java.lang.Long]
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
// Catalog and metadata
def catalog: Catalog
def conf: RuntimeConfig
def sessionState: SessionState
def sharedState: SharedState
// Streaming
def streams: StreamingQueryManager
// Resources and control
def sparkContext: SparkContext
def version: String
def stop(): Unit
def close(): Unit
def newSession(): SparkSession
}
object SparkSession {
def builder(): Builder
def active: SparkSession
def getActiveSession: Option[SparkSession]
def getDefaultSession: Option[SparkSession]
def setActiveSession(session: SparkSession): Unit
def clearActiveSession(): Unit
def setDefaultSession(session: SparkSession): Unit
def clearDefaultSession(): Unit
}
class Builder {
def appName(name: String): Builder
def master(master: String): Builder
def config(key: String, value: String): Builder
def config(key: String, value: Long): Builder
def config(key: String, value: Double): Builder
def config(key: String, value: Boolean): Builder
def config(conf: SparkConf): Builder
def enableHiveSupport(): Builder
def getOrCreate(): SparkSession
}Usage example:
val spark = SparkSession.builder()
.appName("My Application")
.master("local[4]")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.enableHiveSupport()
.getOrCreate()DataFrames are Datasets of Row objects, providing a programming abstraction and DSL for structured data manipulation.
abstract class Dataset[T] extends Serializable {
// Column selection and projection
def select(cols: Column*): DataFrame
def select(col: String, cols: String*): DataFrame
def selectExpr(exprs: String*): DataFrame
def drop(colName: String): DataFrame
def drop(colNames: String*): DataFrame
def drop(col: Column): DataFrame
def withColumn(colName: String, col: Column): DataFrame
def withColumnRenamed(existingName: String, newName: String): DataFrame
// Filtering and conditions
def filter(condition: Column): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def where(condition: Column): Dataset[T]
// Grouping and aggregation
def groupBy(cols: Column*): RelationalGroupedDataset
def groupBy(col1: String, cols: String*): RelationalGroupedDataset
def rollup(cols: Column*): RelationalGroupedDataset
def cube(cols: Column*): RelationalGroupedDataset
def agg(expr: Column, exprs: Column*): DataFrame
def agg(exprs: Map[String, String]): DataFrame
// Joins
def join(right: Dataset[_]): DataFrame
def join(right: Dataset[_], usingColumn: String): DataFrame
def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
def join(right: Dataset[_], joinExprs: Column): DataFrame
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
def crossJoin(right: Dataset[_]): DataFrame
// Set operations
def union(other: Dataset[T]): Dataset[T]
def unionAll(other: Dataset[T]): Dataset[T]
def unionByName(other: Dataset[T]): Dataset[T]
def intersect(other: Dataset[T]): Dataset[T]
def intersectAll(other: Dataset[T]): Dataset[T]
def except(other: Dataset[T]): Dataset[T]
def exceptAll(other: Dataset[T]): Dataset[T]
// Sorting
def sort(sortCol: String, sortCols: String*): Dataset[T]
def sort(sortExprs: Column*): Dataset[T]
def orderBy(sortCol: String, sortCols: String*): Dataset[T]
def orderBy(sortExprs: Column*): Dataset[T]
// Sampling and limiting
def sample(fraction: Double): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double): Dataset[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
def limit(n: Int): Dataset[T]
// Actions
def collect(): Array[T]
def collectAsList(): java.util.List[T]
def count(): Long
def describe(cols: String*): DataFrame
def first(): T
def head(): T
def head(n: Int): Array[T]
def take(n: Int): Array[T]
def takeAsList(n: Int): java.util.List[T]
def tail(n: Int): Array[T]
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
// Display
def show(): Unit
def show(numRows: Int): Unit
def show(numRows: Int, truncate: Boolean): Unit
def show(numRows: Int, truncate: Int): Unit
def show(numRows: Int, truncate: Int, vertical: Boolean): Unit
// Schema and metadata
def schema: StructType
def printSchema(): Unit
def dtypes: Array[(String, String)]
def columns: Array[String]
// Persistence
def cache(): Dataset[T]
def persist(): Dataset[T]
def persist(newLevel: StorageLevel): Dataset[T]
def unpersist(): Dataset[T]
def unpersist(blocking: Boolean): Dataset[T]
// Type conversion
def as[U : Encoder]: Dataset[U]
def alias(alias: String): Dataset[T]
// I/O
def write: DataFrameWriter[T]
def writeStream: DataStreamWriter[T]
// Advanced transformations
def unpivot(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame
def melt(ids: Array[Column], values: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame
def transpose(): DataFrame
def observe(name: String, expr: Column, exprs: Column*): Dataset[T]
def lateralJoin(tableFunctionCall: Column): DataFrame
// Utility operations
def exists(condition: Column): Boolean
def scalar(): Any
// SQL operations
def createOrReplaceTempView(viewName: String): Unit
def createGlobalTempView(viewName: String): Unit
def createTempView(viewName: String): Unit
}
type DataFrame = Dataset[Row]Represents a column in a DataFrame and provides methods for column expressions.
class Column(expr: Expression) {
// Comparison operators
def ===(other: Any): Column
def !==(other: Any): Column
def >(other: Any): Column
def >=(other: Any): Column
def <(other: Any): Column
def <=(other: Any): Column
def <=> (other: Any): Column
// Logical operators
def &&(other: Any): Column
def ||(other: Any): Column
def unary_!: Column
// Arithmetic operators
def +(other: Any): Column
def -(other: Any): Column
def *(other: Any): Column
def /(other: Any): Column
def %(other: Any): Column
// Null handling
def isNull: Column
def isNotNull: Column
def isNaN: Column
// String operations
def contains(other: Any): Column
def startsWith(other: Column): Column
def startsWith(literal: String): Column
def endsWith(other: Column): Column
def endsWith(literal: String): Column
def substr(startPos: Column, len: Column): Column
def substr(startPos: Int, len: Int): Column
def like(literal: String): Column
def rlike(literal: String): Column
// Array operations
def getItem(key: Any): Column
def getField(fieldName: String): Column
// Type conversion
def cast(to: DataType): Column
def cast(to: String): Column
// Naming
def alias(alias: String): Column
def as(alias: String): Column
def as(alias: String, metadata: Metadata): Column
def name(alias: String): Column
// Sorting
def asc: Column
def asc_nulls_first: Column
def asc_nulls_last: Column
def desc: Column
def desc_nulls_first: Column
def desc_nulls_last: Column
// Window functions
def over(): Column
def over(window: WindowSpec): Column
}The functions object provides a rich set of built-in functions for DataFrame operations.
object functions {
// Column creation
def col(colName: String): Column
def column(colName: String): Column
def lit(literal: Any): Column
def typedLit[T : TypeTag](literal: T): Column
// Conditional expressions
def when(condition: Column, value: Any): Column
def coalesce(cols: Column*): Column
def isnull(col: Column): Column
def nanvl(col1: Column, col2: Column): Column
// Aggregate functions
def count(e: Column): Column
def count(columnName: String): TypedColumn[Any, Long]
def countDistinct(expr: Column, exprs: Column*): Column
def approx_count_distinct(e: Column): Column
def sum(e: Column): Column
def sum(columnName: String): TypedColumn[Any, java.lang.Double]
def avg(e: Column): Column
def avg(columnName: String): TypedColumn[Any, java.lang.Double]
def mean(e: Column): Column
def min(e: Column): Column
def max(e: Column): Column
def first(e: Column): Column
def first(e: Column, ignoreNulls: Boolean): Column
def last(e: Column): Column
def last(e: Column, ignoreNulls: Boolean): Column
def collect_list(e: Column): Column
def collect_set(e: Column): Column
// String functions
def ascii(e: Column): Column
def base64(e: Column): Column
def concat(exprs: Column*): Column
def concat_ws(sep: String, exprs: Column*): Column
def length(e: Column): Column
def lower(e: Column): Column
def upper(e: Column): Column
def ltrim(e: Column): Column
def rtrim(e: Column): Column
def trim(e: Column): Column
def regexp_extract(e: Column, pattern: String, idx: Int): Column
def regexp_replace(e: Column, pattern: String, replacement: String): Column
def split(str: Column, pattern: String): Column
def substring(str: Column, pos: Int, len: Int): Column
// Math functions
def abs(e: Column): Column
def acos(e: Column): Column
def asin(e: Column): Column
def atan(e: Column): Column
def atan2(y: Column, x: Column): Column
def ceil(e: Column): Column
def cos(e: Column): Column
def exp(e: Column): Column
def floor(e: Column): Column
def log(e: Column): Column
def log10(e: Column): Column
def pow(l: Column, r: Column): Column
def round(e: Column): Column
def round(e: Column, scale: Int): Column
def sin(e: Column): Column
def sqrt(e: Column): Column
def tan(e: Column): Column
// Date and time functions
def current_date(): Column
def current_timestamp(): Column
def date_add(start: Column, days: Int): Column
def date_sub(start: Column, days: Int): Column
def datediff(end: Column, start: Column): Column
def date_format(dateExpr: Column, format: String): Column
def dayofmonth(e: Column): Column
def dayofweek(e: Column): Column
def dayofyear(e: Column): Column
def hour(e: Column): Column
def minute(e: Column): Column
def month(e: Column): Column
def quarter(e: Column): Column
def second(e: Column): Column
def to_date(e: Column): Column
def to_date(e: Column, fmt: String): Column
def to_timestamp(s: Column): Column
def to_timestamp(s: Column, fmt: String): Column
def unix_timestamp(): Column
def unix_timestamp(s: Column): Column
def unix_timestamp(s: Column, p: String): Column
def year(e: Column): Column
// Array functions
def array(cols: Column*): Column
def array_contains(column: Column, value: Any): Column
def array_distinct(e: Column): Column
def array_max(e: Column): Column
def array_min(e: Column): Column
def array_position(column: Column, value: Any): Column
def array_remove(column: Column, element: Any): Column
def array_sort(e: Column): Column
def arrays_overlap(a1: Column, a2: Column): Column
def explode(e: Column): Column
def explode_outer(e: Column): Column
def posexplode(e: Column): Column
def posexplode_outer(e: Column): Column
def size(e: Column): Column
def slice(x: Column, start: Int, length: Int): Column
def sort_array(e: Column): Column
def sort_array(e: Column, asc: Boolean): Column
// Map functions
def map(cols: Column*): Column
def map_keys(e: Column): Column
def map_values(e: Column): Column
// Struct functions
def struct(cols: Column*): Column
// Window functions
def row_number(): Column
def rank(): Column
def dense_rank(): Column
def percent_rank(): Column
def ntile(n: Int): Column
def cume_dist(): Column
def lag(e: Column, offset: Int): Column
def lag(e: Column, offset: Int, defaultValue: Any): Column
def lead(e: Column, offset: Int): Column
def lead(e: Column, offset: Int, defaultValue: Any): Column
// UDF creation
def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction
def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction
}Spark SQL data type system for schema definition.
abstract class DataType extends AbstractDataType {
def json: String
def prettyJson: String
def simpleString: String
def catalogString: String
def sql: String
}
object DataTypes {
def createArrayType(elementType: DataType): ArrayType
def createMapType(keyType: DataType, valueType: DataType): MapType
def createStructField(name: String, dataType: DataType, nullable: Boolean): StructField
def createStructType(fields: Array[StructField]): StructType
val StringType: StringType
val BinaryType: BinaryType
val BooleanType: BooleanType
val DateType: DateType
val TimestampType: TimestampType
val CalendarIntervalType: CalendarIntervalType
val DoubleType: DoubleType
val FloatType: FloatType
val ByteType: ByteType
val IntegerType: IntegerType
val LongType: LongType
val ShortType: ShortType
val NullType: NullType
}
case class StructType(fields: Array[StructField]) extends DataType {
def add(field: StructField): StructType
def add(name: String, dataType: DataType): StructType
def add(name: String, dataType: DataType, nullable: Boolean): StructType
def add(name: String, dataType: DataType, nullable: Boolean, metadata: Metadata): StructType
def apply(name: String): StructField
def apply(index: Int): StructField
def fieldNames: Array[String]
def length: Int
def size: Int
}
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty
) {
def getComment(): Option[String]
}
case class ArrayType(elementType: DataType, containsNull: Boolean = true) extends DataType
case class MapType(
keyType: DataType,
valueType: DataType,
valueContainsNull: Boolean = true
) extends DataTypeRepresents a row in a DataFrame.
trait Row extends Serializable {
def size: Int
def length: Int
def schema: StructType
def apply(i: Int): Any
def get(i: Int): Any
def getAs[T](i: Int): T
def getAs[T](fieldName: String): T
def fieldIndex(name: String): Int
// Type-specific getters
def isNullAt(i: Int): Boolean
def getBoolean(i: Int): Boolean
def getByte(i: Int): Byte
def getShort(i: Int): Short
def getInt(i: Int): Int
def getLong(i: Int): Long
def getFloat(i: Int): Float
def getDouble(i: Int): Double
def getString(i: Int): String
def getDecimal(i: Int): java.math.BigDecimal
def getDate(i: Int): java.sql.Date
def getTimestamp(i: Int): java.sql.Timestamp
def getSeq[T](i: Int): Seq[T]
def getList[T](i: Int): java.util.List[T]
def getMap[K, V](i: Int): scala.collection.Map[K, V]
def getJavaMap[K, V](i: Int): java.util.Map[K, V]
def getStruct(i: Int): Row
// Array conversion
def toSeq: Seq[Any]
def copy(): Row
}
object Row {
def empty: Row
def apply(values: Any*): Row
def fromSeq(values: Seq[Any]): Row
def fromTuple(tuple: Product): Row
def merge(rows: Row*): Row
def unapplySeq(row: Row): Some[Seq[Any]]
}Interface for loading DataFrames from external storage systems.
class DataFrameReader {
// Configuration
def format(source: String): DataFrameReader
def schema(schema: StructType): DataFrameReader
def schema(schemaString: String): DataFrameReader
def option(key: String, value: String): DataFrameReader
def option(key: String, value: Boolean): DataFrameReader
def option(key: String, value: Long): DataFrameReader
def option(key: String, value: Double): DataFrameReader
def options(options: scala.collection.Map[String, String]): DataFrameReader
def options(options: java.util.Map[String, String]): DataFrameReader
// Data sources
def csv(path: String): DataFrame
def csv(paths: String*): DataFrame
def json(path: String): DataFrame
def json(paths: String*): DataFrame
def parquet(path: String): DataFrame
def parquet(paths: String*): DataFrame
def orc(path: String): DataFrame
def orc(paths: String*): DataFrame
def text(path: String): DataFrame
def text(paths: String*): DataFrame
def textFile(path: String): Dataset[String]
def textFile(paths: String*): Dataset[String]
def table(tableName: String): DataFrame
def jdbc(url: String, table: String, properties: java.util.Properties): DataFrame
// Generic load
def load(): DataFrame
def load(path: String): DataFrame
def load(paths: String*): DataFrame
}Interface for saving DataFrames to external storage systems.
class DataFrameWriter[T] {
// Configuration
def mode(saveMode: SaveMode): DataFrameWriter[T]
def mode(saveMode: String): DataFrameWriter[T]
def format(source: String): DataFrameWriter[T]
def option(key: String, value: String): DataFrameWriter[T]
def option(key: String, value: Boolean): DataFrameWriter[T]
def option(key: String, value: Long): DataFrameWriter[T]
def option(key: String, value: Double): DataFrameWriter[T]
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T]
def options(options: java.util.Map[String, String]): DataFrameWriter[T]
def partitionBy(colNames: String*): DataFrameWriter[T]
def bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]
def sortBy(colName: String, colNames: String*): DataFrameWriter[T]
// Data sources
def csv(path: String): Unit
def json(path: String): Unit
def parquet(path: String): Unit
def orc(path: String): Unit
def text(path: String): Unit
def jdbc(url: String, table: String, connectionProperties: java.util.Properties): Unit
def insertInto(tableName: String): Unit
def saveAsTable(tableName: String): Unit
// Generic save
def save(): Unit
def save(path: String): Unit
}
object SaveMode extends Enumeration {
val Append, Overwrite, ErrorIfExists, Ignore = Value
}Usage example:
// Reading data
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("path/to/file.csv")
// Writing data
df.write
.format("parquet")
.mode("overwrite")
.option("compression", "snappy")
.partitionBy("year", "month")
.save("path/to/output")class RelationalGroupedDataset protected(
df: DataFrame,
groupingExprs: Seq[Expression],
groupType: RelationalGroupedDataset.GroupType
) {
// Aggregation functions
def agg(expr: Column, exprs: Column*): DataFrame
def agg(exprs: Map[String, String]): DataFrame
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
def count(): DataFrame
def mean(colNames: String*): DataFrame
def max(colNames: String*): DataFrame
def min(colNames: String*): DataFrame
def sum(colNames: String*): DataFrame
def avg(colNames: String*): DataFrame
// Pivot operations
def pivot(pivotColumn: String): RelationalGroupedDataset
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
}Usage example:
// Group by operations
val grouped = df.groupBy("department", "level")
.agg(
count("*").alias("employee_count"),
avg("salary").alias("avg_salary"),
max("salary").alias("max_salary")
)
// Pivot operations
val pivoted = df.groupBy("department")
.pivot("level")
.agg(avg("salary"))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13