Apache Spark's distributed SQL engine and structured data processing framework for manipulating structured data using SQL queries and DataFrame/Dataset APIs
npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql_2-12@2.4.00
# Apache Spark SQL
1
2
Apache Spark SQL is a distributed SQL engine and structured data processing framework that provides a programming interface for manipulating structured data using SQL queries and DataFrame/Dataset APIs. It serves as the foundation for relational query processing in the Apache Spark ecosystem, offering a unified engine that can execute SQL queries, work with various data sources, and integrate seamlessly with Spark's distributed computing capabilities.
3
4
## Package Information
5
6
- **Package Name**: org.apache.spark:spark-sql_2.12
7
- **Package Type**: maven
8
- **Language**: Scala (with Java interoperability)
9
- **Installation**: `libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.8"`
10
11
## Core Imports
12
13
```scala
14
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset, Column, Row}
15
import org.apache.spark.sql.functions._
16
import org.apache.spark.sql.types._
17
```
18
19
For Java:
20
21
```java
22
import org.apache.spark.sql.SparkSession;
23
import org.apache.spark.sql.Dataset;
24
import org.apache.spark.sql.Row;
25
import static org.apache.spark.sql.functions.*;
26
```
27
28
## Basic Usage
29
30
```scala
31
import org.apache.spark.sql.{SparkSession, DataFrame}
32
import org.apache.spark.sql.functions._
33
34
// Create SparkSession (entry point)
35
val spark = SparkSession.builder()
36
.appName("Spark SQL Example")
37
.master("local[*]")
38
.getOrCreate()
39
40
// Create DataFrame from data
41
val df = spark.read
42
.option("header", "true")
43
.option("inferSchema", "true")
44
.csv("path/to/data.csv")
45
46
// SQL operations
47
val result = df
48
.select("name", "age")
49
.filter(col("age") > 18)
50
.groupBy("department")
51
.agg(avg("age").alias("avg_age"))
52
53
// Show results
54
result.show()
55
56
// Execute SQL queries
57
val sqlResult = spark.sql("""
58
SELECT department, AVG(age) as avg_age
59
FROM employees
60
WHERE age > 18
61
GROUP BY department
62
""")
63
64
spark.stop()
65
```
66
67
## Architecture
68
69
Apache Spark SQL is built around several key components:
70
71
- **SparkSession**: Main entry point providing access to all SQL functionality and configuration
72
- **Dataset/DataFrame API**: Strongly-typed (Dataset) and untyped (DataFrame) abstractions for structured data
73
- **Catalyst Optimizer**: Query planning and optimization engine for efficient execution
74
- **SQL Parser**: Supports ANSI SQL with extensions for distributed computing
75
- **Data Sources**: Unified interface for reading/writing various formats (Parquet, JSON, CSV, JDBC, etc.)
76
- **Streaming Engine**: Real-time data processing with micro-batch execution model
77
- **Type System**: Rich type definitions with encoders for JVM object serialization
78
79
## Capabilities
80
81
### Session Management
82
83
Core session management and configuration for Spark SQL applications. SparkSession serves as the main entry point providing access to all functionality.
84
85
```scala { .api }
86
object SparkSession {
87
def builder(): Builder
88
def active: SparkSession
89
}
90
91
class SparkSession {
92
def conf: RuntimeConfig
93
def catalog: Catalog
94
def udf: UDFRegistration
95
def sql(sqlText: String): DataFrame
96
def read: DataFrameReader
97
def readStream: DataStreamReader
98
}
99
```
100
101
[Session Management](./session-management.md)
102
103
### Dataset and DataFrame Operations
104
105
Core data structures and operations for structured data manipulation. Dataset provides type-safe operations while DataFrame offers untyped flexibility.
106
107
```scala { .api }
108
class Dataset[T] {
109
def select(cols: Column*): DataFrame
110
def filter(condition: Column): Dataset[T]
111
def groupBy(cols: Column*): RelationalGroupedDataset
112
def join(right: Dataset[_]): DataFrame
113
def agg(expr: Column, exprs: Column*): DataFrame
114
}
115
116
type DataFrame = Dataset[Row]
117
```
118
119
[Dataset and DataFrame Operations](./dataset-dataframe.md)
120
121
### Column Operations and SQL Functions
122
123
Column expressions and built-in SQL functions for data transformation. Provides both operator overloading and function-based APIs.
124
125
```scala { .api }
126
class Column {
127
def ===(other: Any): Column
128
def >(other: Any): Column
129
def &&(other: Column): Column
130
def as(alias: String): Column
131
def cast(to: DataType): Column
132
}
133
134
object functions {
135
def col(colName: String): Column
136
def lit(literal: Any): Column
137
def when(condition: Column, value: Any): Column
138
def avg(e: Column): Column
139
def sum(e: Column): Column
140
}
141
```
142
143
[Column Operations and Functions](./columns-functions.md)
144
145
### Data I/O Operations
146
147
Reading and writing data from various sources and formats. Supports batch and streaming data with extensive configuration options.
148
149
```scala { .api }
150
class DataFrameReader {
151
def format(source: String): DataFrameReader
152
def option(key: String, value: String): DataFrameReader
153
def schema(schema: StructType): DataFrameReader
154
def load(): DataFrame
155
def json(path: String): DataFrame
156
def parquet(path: String): DataFrame
157
def csv(path: String): DataFrame
158
}
159
160
class DataFrameWriter[T] {
161
def mode(saveMode: SaveMode): DataFrameWriter[T]
162
def format(source: String): DataFrameWriter[T]
163
def save(path: String): Unit
164
def saveAsTable(tableName: String): Unit
165
}
166
```
167
168
[Data I/O Operations](./data-io.md)
169
170
### Streaming Operations
171
172
Real-time data processing with structured streaming. Provides unified batch and streaming APIs with exactly-once processing guarantees.
173
174
```scala { .api }
175
class DataStreamReader {
176
def format(source: String): DataStreamReader
177
def option(key: String, value: String): DataStreamReader
178
def load(): DataFrame
179
def kafka(): DataFrame
180
def socket(host: String, port: Int): DataFrame
181
}
182
183
class DataStreamWriter[T] {
184
def outputMode(outputMode: String): DataStreamWriter[T]
185
def trigger(trigger: Trigger): DataStreamWriter[T]
186
def start(): StreamingQuery
187
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
188
}
189
```
190
191
[Streaming Operations](./streaming.md)
192
193
### Type System and Encoders
194
195
Data type definitions and encoders for converting between JVM objects and Spark SQL internal representations.
196
197
```scala { .api }
198
object DataTypes {
199
val StringType: DataType
200
val IntegerType: DataType
201
val DoubleType: DataType
202
val BooleanType: DataType
203
val DateType: DataType
204
val TimestampType: DataType
205
}
206
207
object Encoders {
208
def STRING: Encoder[String]
209
def INT: Encoder[Int]
210
def product[T <: Product]: Encoder[T]
211
def bean[T](beanClass: Class[T]): Encoder[T]
212
}
213
214
trait Row {
215
def getString(i: Int): String
216
def getInt(i: Int): Int
217
def getDouble(i: Int): Double
218
def isNullAt(i: Int): Boolean
219
}
220
```
221
222
[Type System and Encoders](./types-encoders.md)
223
224
### User-Defined Functions
225
226
Registration and usage of custom user-defined functions (UDFs) and user-defined aggregate functions (UDAFs).
227
228
```scala { .api }
229
class UDFRegistration {
230
def register[RT](name: String, func: () => RT): UserDefinedFunction
231
def register[RT, A1](name: String, func: A1 => RT): UserDefinedFunction
232
def register[RT, A1, A2](name: String, func: (A1, A2) => RT): UserDefinedFunction
233
}
234
235
class UserDefinedFunction {
236
def apply(exprs: Column*): Column
237
}
238
```
239
240
[User-Defined Functions](./udfs.md)
241
242
### Metadata and Catalog Operations
243
244
Database, table, and metadata management through the catalog interface. Provides programmatic access to metastore operations.
245
246
```scala { .api }
247
trait Catalog {
248
def currentDatabase(): String
249
def setCurrentDatabase(dbName: String): Unit
250
def listDatabases(): Dataset[Database]
251
def listTables(): Dataset[Table]
252
def listColumns(tableName: String): Dataset[Column]
253
def tableExists(tableName: String): Boolean
254
def cacheTable(tableName: String): Unit
255
}
256
```
257
258
[Metadata and Catalog](./catalog.md)