Apache Spark's distributed SQL engine and structured data processing framework for manipulating structured data using SQL queries and DataFrame/Dataset APIs
npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql_2-12@2.4.0Apache Spark SQL is a distributed SQL engine and structured data processing framework that provides a programming interface for manipulating structured data using SQL queries and DataFrame/Dataset APIs. It serves as the foundation for relational query processing in the Apache Spark ecosystem, offering a unified engine that can execute SQL queries, work with various data sources, and integrate seamlessly with Spark's distributed computing capabilities.
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.8"import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Column, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._For Java:
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
// Create SparkSession (entry point)
val spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local[*]")
.getOrCreate()
// Create DataFrame from data
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("path/to/data.csv")
// SQL operations
val result = df
.select("name", "age")
.filter(col("age") > 18)
.groupBy("department")
.agg(avg("age").alias("avg_age"))
// Show results
result.show()
// Execute SQL queries
val sqlResult = spark.sql("""
SELECT department, AVG(age) as avg_age
FROM employees
WHERE age > 18
GROUP BY department
""")
spark.stop()Apache Spark SQL is built around several key components:
Core session management and configuration for Spark SQL applications. SparkSession serves as the main entry point providing access to all functionality.
object SparkSession {
def builder(): Builder
def active: SparkSession
}
class SparkSession {
def conf: RuntimeConfig
def catalog: Catalog
def udf: UDFRegistration
def sql(sqlText: String): DataFrame
def read: DataFrameReader
def readStream: DataStreamReader
}Core data structures and operations for structured data manipulation. Dataset provides type-safe operations while DataFrame offers untyped flexibility.
class Dataset[T] {
def select(cols: Column*): DataFrame
def filter(condition: Column): Dataset[T]
def groupBy(cols: Column*): RelationalGroupedDataset
def join(right: Dataset[_]): DataFrame
def agg(expr: Column, exprs: Column*): DataFrame
}
type DataFrame = Dataset[Row]Dataset and DataFrame Operations
Column expressions and built-in SQL functions for data transformation. Provides both operator overloading and function-based APIs.
class Column {
def ===(other: Any): Column
def >(other: Any): Column
def &&(other: Column): Column
def as(alias: String): Column
def cast(to: DataType): Column
}
object functions {
def col(colName: String): Column
def lit(literal: Any): Column
def when(condition: Column, value: Any): Column
def avg(e: Column): Column
def sum(e: Column): Column
}Column Operations and Functions
Reading and writing data from various sources and formats. Supports batch and streaming data with extensive configuration options.
class DataFrameReader {
def format(source: String): DataFrameReader
def option(key: String, value: String): DataFrameReader
def schema(schema: StructType): DataFrameReader
def load(): DataFrame
def json(path: String): DataFrame
def parquet(path: String): DataFrame
def csv(path: String): DataFrame
}
class DataFrameWriter[T] {
def mode(saveMode: SaveMode): DataFrameWriter[T]
def format(source: String): DataFrameWriter[T]
def save(path: String): Unit
def saveAsTable(tableName: String): Unit
}Real-time data processing with structured streaming. Provides unified batch and streaming APIs with exactly-once processing guarantees.
class DataStreamReader {
def format(source: String): DataStreamReader
def option(key: String, value: String): DataStreamReader
def load(): DataFrame
def kafka(): DataFrame
def socket(host: String, port: Int): DataFrame
}
class DataStreamWriter[T] {
def outputMode(outputMode: String): DataStreamWriter[T]
def trigger(trigger: Trigger): DataStreamWriter[T]
def start(): StreamingQuery
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
}Data type definitions and encoders for converting between JVM objects and Spark SQL internal representations.
object DataTypes {
val StringType: DataType
val IntegerType: DataType
val DoubleType: DataType
val BooleanType: DataType
val DateType: DataType
val TimestampType: DataType
}
object Encoders {
def STRING: Encoder[String]
def INT: Encoder[Int]
def product[T <: Product]: Encoder[T]
def bean[T](beanClass: Class[T]): Encoder[T]
}
trait Row {
def getString(i: Int): String
def getInt(i: Int): Int
def getDouble(i: Int): Double
def isNullAt(i: Int): Boolean
}Registration and usage of custom user-defined functions (UDFs) and user-defined aggregate functions (UDAFs).
class UDFRegistration {
def register[RT](name: String, func: () => RT): UserDefinedFunction
def register[RT, A1](name: String, func: A1 => RT): UserDefinedFunction
def register[RT, A1, A2](name: String, func: (A1, A2) => RT): UserDefinedFunction
}
class UserDefinedFunction {
def apply(exprs: Column*): Column
}Database, table, and metadata management through the catalog interface. Provides programmatic access to metastore operations.
trait Catalog {
def currentDatabase(): String
def setCurrentDatabase(dbName: String): Unit
def listDatabases(): Dataset[Database]
def listTables(): Dataset[Table]
def listColumns(tableName: String): Dataset[Column]
def tableExists(tableName: String): Boolean
def cacheTable(tableName: String): Unit
}