Apache Spark SQL is a distributed SQL query engine that provides DataFrame and Dataset APIs for working with structured data.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql_2-11@2.4.0Apache Spark SQL is a distributed SQL query engine that provides DataFrame and Dataset APIs for working with structured data. It enables users to run SQL queries against structured data sources like JSON, Parquet, and JDBC databases, while providing seamless integration with Spark's core RDD API. The module includes the Catalyst optimizer framework for logical query planning and optimization, support for both SQL and DataFrame/Dataset APIs, integration with Hive for metadata and SerDes, and the ability to cache data in memory for faster query performance.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
</dependency>import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Column, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._For Java:
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
// Create SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local[*]")
.getOrCreate()
// Load data
val df = spark.read
.format("json")
.option("multiline", "true")
.load("path/to/data.json")
// Transform data
val result = df
.select(col("name"), col("age").cast("int"))
.filter(col("age") > 18)
.groupBy(col("name"))
.agg(avg(col("age")).alias("avg_age"))
// Execute and show results
result.show()
// SQL queries
df.createOrReplaceTempView("people")
val sqlResult = spark.sql("SELECT name, AVG(age) as avg_age FROM people WHERE age > 18 GROUP BY name")
sqlResult.show()
spark.stop()Apache Spark SQL is built around several key components:
SparkSession serves as the unified entry point for all Spark SQL operations, providing access to DataFrames, SQL execution, and configuration management.
object SparkSession {
def builder(): SparkSession.Builder
}
class SparkSession {
def sql(sqlText: String): DataFrame
def read: DataFrameReader
def readStream: DataStreamReader
def catalog: Catalog
def conf: RuntimeConfig
def table(tableName: String): DataFrame
def stop(): Unit
}Core distributed data structures with schema information and type safety, supporting both typed (Dataset[T]) and untyped (DataFrame) operations.
class Dataset[T] {
def select(cols: Column*): DataFrame
def filter(condition: Column): Dataset[T]
def groupBy(cols: Column*): RelationalGroupedDataset
def join(right: Dataset[_]): DataFrame
def union(other: Dataset[T]): Dataset[T]
def count(): Long
def collect(): Array[T]
def show(): Unit
}
type DataFrame = Dataset[Row]DataFrame and Dataset Operations
Comprehensive I/O capabilities for reading from and writing to various data sources including files, databases, and streaming sources.
class DataFrameReader {
def format(source: String): DataFrameReader
def schema(schema: StructType): DataFrameReader
def option(key: String, value: String): DataFrameReader
def load(): DataFrame
def json(path: String): DataFrame
def parquet(path: String): DataFrame
def jdbc(url: String, table: String, properties: Properties): DataFrame
}
class DataFrameWriter[T] {
def mode(saveMode: SaveMode): DataFrameWriter[T]
def format(source: String): DataFrameWriter[T]
def option(key: String, value: String): DataFrameWriter[T]
def save(): Unit
def saveAsTable(tableName: String): Unit
}Extensive library of built-in SQL functions for data manipulation, aggregation, string processing, date/time operations, and mathematical calculations.
// Column operations
class Column {
def +(other: Any): Column
def ===(other: Any): Column
def isNull: Column
def cast(to: DataType): Column
def alias(alias: String): Column
}
// Built-in functions (from functions object)
def col(colName: String): Column
def lit(literal: Any): Column
def when(condition: Column, value: Any): Column
def sum(e: Column): Column
def avg(e: Column): Column
def count(e: Column): Column
def max(e: Column): Column
def min(e: Column): ColumnPowerful aggregation capabilities with both untyped DataFrame aggregations and type-safe Dataset aggregations for complex analytical operations.
class RelationalGroupedDataset {
def agg(expr: Column, exprs: Column*): DataFrame
def count(): DataFrame
def sum(colNames: String*): DataFrame
def avg(colNames: String*): DataFrame
def pivot(pivotColumn: String): RelationalGroupedDataset
}
class KeyValueGroupedDataset[K, V] {
def agg[U1](column: TypedColumn[V, U1]): Dataset[(K, U1)]
def count(): Dataset[(K, Long)]
def mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U]
def reduceGroups(f: (V, V) => V): Dataset[(K, V)]
}Structured streaming capabilities for processing continuous data streams with the same DataFrame/Dataset APIs used for batch processing.
class DataStreamWriter[T] {
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
def trigger(trigger: Trigger): DataStreamWriter[T]
def start(): StreamingQuery
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
}
class StreamingQuery {
def isActive: Boolean
def awaitTermination(): Unit
def stop(): Unit
def status: StreamingQueryStatus
}Comprehensive metadata management for databases, tables, functions, and cached data with full programmatic access to the Spark catalog.
class Catalog {
def currentDatabase: String
def listDatabases(): Dataset[Database]
def listTables(): Dataset[Table]
def listColumns(tableName: String): Dataset[Column]
def listFunctions(): Dataset[Function]
def cacheTable(tableName: String): Unit
def isCached(tableName: String): Boolean
}Catalog and Metadata Management
// Core data types
abstract class DataType
case object StringType extends DataType
case object IntegerType extends DataType
case object LongType extends DataType
case object DoubleType extends DataType
case object BooleanType extends DataType
case object DateType extends DataType
case object TimestampType extends DataType
// Complex types
case class StructType(fields: Array[StructField]) extends DataType
case class StructField(name: String, dataType: DataType, nullable: Boolean)
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType
case class MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) extends DataType
// Row representation
class Row {
def get(i: Int): Any
def getString(i: Int): String
def getInt(i: Int): Int
def getLong(i: Int): Long
def getDouble(i: Int): Double
def getBoolean(i: Int): Boolean
}
// Save modes
object SaveMode extends Enumeration {
val Overwrite, Append, ErrorIfExists, Ignore = Value
}